• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In
You are now the owner of this repo.

taosdata / TDengine / #5011

03 Apr 2026 03:59PM UTC coverage: 72.3% (+0.008%) from 72.292%
#5011

push

travis-ci

web-flow
merge: from main to 3.0 branch #35067

4053 of 5985 new or added lines in 68 files covered. (67.72%)

732 existing lines in 143 files now uncovered.

257430 of 356056 relevant lines covered (72.3%)

131834103.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.84
/source/libs/executor/src/timewindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "executorInt.h"
16
#include "filter.h"
17
#include "function.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "taoserror.h"
23
#include "tchecksum.h"
24
#include "tcommon.h"
25
#include "tcompare.h"
26
#include "tdatablock.h"
27
#include "tfill.h"
28
#include "tglobal.h"
29
#include "tlog.h"
30
#include "ttime.h"
31

32
typedef enum SResultTsInterpType {
33
  RESULT_ROW_START_INTERP = 1,
34
  RESULT_ROW_END_INTERP = 2,
35
} SResultTsInterpType;
36

37
typedef struct SOpenWindowInfo {
38
  SResultRowPosition pos;
39
  uint64_t           groupId;
40
} SOpenWindowInfo;
41

42
static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo);
43

44
static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult,
45
                                              uint64_t groupId, SExecTaskInfo* pTaskInfo);
46
static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult);
47

48
static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, bool masterscan,
2,147,483,647✔
49
                                      SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
50
                                      int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
51
                                      SExecTaskInfo* pTaskInfo) {
52
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
2,147,483,647✔
53
                                                  masterscan, tableGroupId, pTaskInfo, true, pAggSup, true);
54

55
  if (pResultRow == NULL || pTaskInfo->code != 0) {
2,147,483,647✔
56
    *pResult = NULL;
754,983✔
57
    qError("failed to set result output buffer, error:%s", tstrerror(pTaskInfo->code));
×
58
    return pTaskInfo->code;
×
59
  }
60

61
  // set time window for current result
62
  TAOS_SET_POBJ_ALIGNED(&pResultRow->win, win);
2,147,483,647✔
63
  *pResult = pResultRow;
2,147,483,647✔
64
  return setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
2,147,483,647✔
65
}
66

67
void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, int32_t rowIndex, uint64_t groupId) {
2,147,483,647✔
68
  pRowSup->win.ekey = ts;
2,147,483,647✔
69
  pRowSup->prevTs = ts;
2,147,483,647✔
70
  pRowSup->groupId = groupId;
2,147,483,647✔
71
  pRowSup->numOfRows += 1;
2,147,483,647✔
72
  if (hasContinuousNullRows(pRowSup)) {
2,147,483,647✔
73
    // rows having null state col are wrapped by rows of same state
74
    // these rows can be counted into current window
75
    pRowSup->numOfRows += pRowSup->numNullRows;
226,713,468✔
76
    resetNumNullRows(pRowSup);
226,713,468✔
77
  }
78
}
2,147,483,647✔
79

80
void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, uint64_t groupId) {
2,008,336,859✔
81
  pRowSup->startRowIndex = rowIndex;
2,008,336,859✔
82
  pRowSup->numOfRows = 0;
2,008,336,859✔
83
  pRowSup->win.skey = tsList[rowIndex];
2,008,336,859✔
84
  pRowSup->groupId = groupId;
2,008,336,859✔
85
  resetNumNullRows(pRowSup);
2,008,336,859✔
86
}
2,008,336,859✔
87

88
FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos,
×
89
                                            int32_t order, int64_t* pData) {
90
  int32_t forwardRows = 0;
2,147,483,647✔
91

92
  if (order == TSDB_ORDER_ASC) {
×
93
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
2,147,483,647✔
94
    if (end >= 0) {
2,147,483,647✔
95
      forwardRows = end;
2,147,483,647✔
96

97
      while (pData[end + pos] == ekey) {
2,147,483,647✔
98
        forwardRows += 1;
2,147,483,647✔
99
        ++pos;
2,147,483,647✔
100
      }
101
    }
102
  } else {
103
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
403,829,792✔
104
    if (end >= 0) {
403,970,556✔
105
      forwardRows = end;
404,341,255✔
106

107
      while (pData[end + pos] == ekey) {
798,481,098✔
108
        forwardRows += 1;
394,139,843✔
109
        ++pos;
394,139,843✔
110
      }
111
    }
112
    //    int32_t end = searchFn((char*)pData, pos + 1, ekey, order);
113
    //    if (end >= 0) {
114
    //      forwardRows = pos - end;
115
    //
116
    //      if (pData[end] == ekey) {
117
    //        forwardRows += 1;
118
    //      }
119
    //    }
120
  }
121

122
  return forwardRows;
2,147,483,647✔
123
}
124

125
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
2,147,483,647✔
126
  int32_t midPos = -1;
2,147,483,647✔
127
  int32_t numOfRows;
128

129
  if (num <= 0) {
2,147,483,647✔
130
    return -1;
×
131
  }
132

133
  TSKEY*  keyList = (TSKEY*)pValue;
2,147,483,647✔
134
  int32_t firstPos = 0;
2,147,483,647✔
135
  int32_t lastPos = num - 1;
2,147,483,647✔
136

137
  if (order == TSDB_ORDER_DESC) {
2,147,483,647✔
138
    // find the first position which is smaller than the key
139
    while (1) {
140
      if (key >= keyList[firstPos]) return firstPos;
491,304,978✔
141
      if (key == keyList[lastPos]) return lastPos;
97,272,440✔
142

143
      if (key < keyList[lastPos]) {
96,754,838✔
144
        lastPos += 1;
10,452,673✔
145
        if (lastPos >= num) {
10,452,673✔
146
          return -1;
×
147
        } else {
148
          return lastPos;
10,452,673✔
149
        }
150
      }
151

152
      numOfRows = lastPos - firstPos + 1;
86,310,385✔
153
      midPos = (numOfRows >> 1) + firstPos;
86,310,385✔
154

155
      if (key < keyList[midPos]) {
86,310,385✔
156
        firstPos = midPos + 1;
6,014,011✔
157
      } else if (key > keyList[midPos]) {
80,311,581✔
158
        lastPos = midPos - 1;
79,731,344✔
159
      } else {
160
        break;
579,415✔
161
      }
162
    }
163

164
  } else {
165
    // find the first position which is bigger than the key
166
    while (1) {
167
      if (key <= keyList[firstPos]) return firstPos;
2,147,483,647✔
168
      if (key == keyList[lastPos]) return lastPos;
2,147,483,647✔
169

170
      if (key > keyList[lastPos]) {
2,147,483,647✔
171
        lastPos = lastPos + 1;
2,147,483,647✔
172
        if (lastPos >= num)
2,147,483,647✔
173
          return -1;
754,481✔
174
        else
175
          return lastPos;
2,147,483,647✔
176
      }
177

178
      numOfRows = lastPos - firstPos + 1;
2,147,483,647✔
179
      midPos = (numOfRows >> 1u) + firstPos;
2,147,483,647✔
180

181
      if (key < keyList[midPos]) {
2,147,483,647✔
182
        lastPos = midPos - 1;
2,147,483,647✔
183
      } else if (key > keyList[midPos]) {
2,093,593,796✔
184
        firstPos = midPos + 1;
1,229,392,350✔
185
      } else {
186
        break;
864,231,867✔
187
      }
188
    }
189
  }
190

191
  return midPos;
864,811,282✔
192
}
193

194
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
2,147,483,647✔
195
                                 __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order) {
196
  int32_t num = -1;
2,147,483,647✔
197
  int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
2,147,483,647✔
198

199
  if (order == TSDB_ORDER_ASC) {
2,147,483,647✔
200
    if (ekey < pDataBlockInfo->window.ekey && pPrimaryColumn) {
2,147,483,647✔
201
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
2,147,483,647✔
202
      if (item != NULL) {
2,147,483,647✔
203
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
204
      }
205
    } else {
206
      num = pDataBlockInfo->rows - startPos;
15,884,987✔
207
      if (item != NULL) {
23,486,405✔
208
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
209
      }
210
    }
211
  } else {  // desc
212
    if (ekey > pDataBlockInfo->window.skey && pPrimaryColumn) {
396,804,864✔
213
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
405,204,964✔
214
      if (item != NULL) {
404,780,754✔
215
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
216
      }
217
    } else {
218
      num = pDataBlockInfo->rows - startPos;
660,136✔
219
      if (item != NULL) {
1,620,077✔
220
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
221
      }
222
    }
223
  }
224

225
  return num;
2,147,483,647✔
226
}
227

228
void doTimeWindowInterpolation(SArray* pPrevValues, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs,
10,712,412✔
229
                               int32_t curRowIndex, TSKEY windowKey, int32_t type, SExprSupp* pSup) {
230
  SqlFunctionCtx* pCtx = pSup->pCtx;
10,712,412✔
231

232
  int32_t index = 1;
10,712,412✔
233
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
32,146,591✔
234
    if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) {
21,434,179✔
235
      pCtx[k].start.key = INT64_MIN;
10,721,767✔
236
      continue;
10,721,767✔
237
    }
238

239
    SFunctParam*     pParam = &pCtx[k].param[0];
10,712,412✔
240
    SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, pParam->pCol->slotId);
10,712,412✔
241

242
    double v1 = 0, v2 = 0, v = 0;
10,712,412✔
243
    if (prevRowIndex == -1) {
10,712,412✔
244
      SGroupKeys* p = taosArrayGet(pPrevValues, index);
×
245
      GET_TYPED_DATA(v1, double, pColInfo->info.type, p->pData, typeGetTypeModFromColInfo(&pColInfo->info));
×
246
    } else {
247
      GET_TYPED_DATA(v1, double, pColInfo->info.type, colDataGetData(pColInfo, prevRowIndex),
10,712,412✔
248
                     typeGetTypeModFromColInfo(&pColInfo->info));
249
    }
250

251
    GET_TYPED_DATA(v2, double, pColInfo->info.type, colDataGetData(pColInfo, curRowIndex),
10,712,412✔
252
                   typeGetTypeModFromColInfo(&pColInfo->info));
253

254
#if 0
255
    if (functionId == FUNCTION_INTERP) {
256
      if (type == RESULT_ROW_START_INTERP) {
257
        pCtx[k].start.key = prevTs;
258
        pCtx[k].start.val = v1;
259

260
        pCtx[k].end.key = curTs;
261
        pCtx[k].end.val = v2;
262

263
        if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_VARBINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR ||
264
            pColInfo->info.type == TSDB_DATA_TYPE_GEOMETRY) {
265
          if (prevRowIndex == -1) {
266
            //            pCtx[k].start.ptr = (char*)pRuntimeEnv->prevRow[index];
267
          } else {
268
            pCtx[k].start.ptr = (char*)pColInfo->pData + prevRowIndex * pColInfo->info.bytes;
269
          }
270

271
          pCtx[k].end.ptr = (char*)pColInfo->pData + curRowIndex * pColInfo->info.bytes;
272
        }
273
      }
274
    } else if (functionId == FUNCTION_TWA) {
275
#endif
276

277
    SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
10,712,412✔
278
    SPoint point2 = (SPoint){.key = curTs, .val = &v2};
10,712,412✔
279
    SPoint point = (SPoint){.key = windowKey, .val = &v};
10,712,412✔
280

281
    if (!fmIsElapsedFunc(pCtx[k].functionId)) {
10,712,412✔
282
      taosGetLinearInterpolationVal(&point, TSDB_DATA_TYPE_DOUBLE, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, 0);
10,651,143✔
283
    }
284

285
    if (type == RESULT_ROW_START_INTERP) {
10,712,412✔
286
      pCtx[k].start.key = point.key;
5,324,457✔
287
      pCtx[k].start.val = v;
5,324,457✔
288
    } else {
289
      pCtx[k].end.key = point.key;
5,387,955✔
290
      pCtx[k].end.val = v;
5,387,955✔
291
    }
292

293
    index += 1;
10,712,412✔
294
  }
295
#if 0
296
  }
297
#endif
298
}
10,712,412✔
299

300
static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t type) {
690,216✔
301
  if (type == RESULT_ROW_START_INTERP) {
690,216✔
302
    for (int32_t k = 0; k < numOfOutput; ++k) {
1,105,102✔
303
      pCtx[k].start.key = INT64_MIN;
728,245✔
304
    }
305
  } else {
306
    for (int32_t k = 0; k < numOfOutput; ++k) {
940,501✔
307
      pCtx[k].end.key = INT64_MIN;
627,142✔
308
    }
309
  }
310
}
690,216✔
311

312
static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, int32_t pos, SSDataBlock* pBlock,
5,701,314✔
313
                                              const TSKEY* tsCols, STimeWindow* win, SExprSupp* pSup) {
314
  bool ascQuery = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
5,701,314✔
315

316
  TSKEY curTs = tsCols[pos];
5,701,314✔
317

318
  SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
5,701,314✔
319
  TSKEY       lastTs = *(int64_t*)pTsKey->pData;
5,701,314✔
320

321
  // lastTs == INT64_MIN and pos == 0 means this is the first time window, interpolation is not needed.
322
  // start exactly from this point, no need to do interpolation
323
  TSKEY key = ascQuery ? win->skey : win->ekey;
5,701,314✔
324
  if (key == curTs) {
5,701,314✔
325
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
370,540✔
326
    return true;
370,540✔
327
  }
328

329
  // it is the first time window, no need to do interpolation
330
  if (pTsKey->isNull && pos == 0) {
5,330,774✔
331
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
6,317✔
332
  } else {
333
    TSKEY prevTs = ((pos == 0) ? lastTs : tsCols[pos - 1]);
5,324,457✔
334
    doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, pos - 1, curTs, pos, key,
5,324,457✔
335
                              RESULT_ROW_START_INTERP, pSup);
336
  }
337

338
  return true;
5,330,774✔
339
}
340

341
static int32_t setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex,
5,701,314✔
342
                                               int32_t nextRowIndex, SArray* pDataBlock, const TSKEY* tsCols,
343
                                               TSKEY blockEkey, STimeWindow* win, bool* pRes) {
344
  int32_t code = TSDB_CODE_SUCCESS;
5,701,314✔
345
  int32_t lino = 0;
5,701,314✔
346
  int32_t order = pInfo->binfo.inputTsOrder;
5,701,314✔
347

348
  TSKEY actualEndKey = tsCols[endRowIndex];
5,701,314✔
349
  TSKEY key = (order == TSDB_ORDER_ASC) ? win->ekey : win->skey;
5,701,314✔
350

351
  // not ended in current data block, do not invoke interpolation
352
  if ((key > blockEkey && (order == TSDB_ORDER_ASC)) || (key < blockEkey && (order == TSDB_ORDER_DESC))) {
5,701,314✔
353
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
16,405✔
354
    (*pRes) = false;
16,405✔
355
    return code;
16,405✔
356
  }
357

358
  // there is actual end point of current time window, no interpolation needs
359
  if (key == actualEndKey) {
5,684,909✔
360
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
296,954✔
361
    (*pRes) = true;
296,954✔
362
    return code;
296,954✔
363
  }
364

365
  if (nextRowIndex < 0) {
5,387,955✔
366
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
367
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
368
  }
369

370
  TSKEY nextKey = tsCols[nextRowIndex];
5,387,955✔
371
  doTimeWindowInterpolation(pInfo->pPrevValues, pDataBlock, actualEndKey, endRowIndex, nextKey, nextRowIndex, key,
5,387,955✔
372
                            RESULT_ROW_END_INTERP, pSup);
373
  (*pRes) = true;
5,387,955✔
374
  return code;
5,387,955✔
375
}
376

377
bool inCalSlidingWindow(const SInterval* pInterval, const STimeWindow* pWin, TSKEY calStart, TSKEY calEnd) {
2,147,483,647✔
378
  if (pInterval->interval != pInterval->sliding && (pWin->ekey < calStart || pWin->skey > calEnd)) {
2,147,483,647✔
379
    return false;
×
380
  }
381

382
  return true;
2,147,483,647✔
383
}
384

385
bool inSlidingWindow(const SInterval* pInterval, const STimeWindow* pWin, const SDataBlockInfo* pBlockInfo) {
2,147,483,647✔
386
  return inCalSlidingWindow(pInterval, pWin, pBlockInfo->calWin.skey, pBlockInfo->calWin.ekey);
2,147,483,647✔
387
}
388

389
int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
2,147,483,647✔
390
                               TSKEY* primaryKeys, int32_t prevPosition, int32_t order) {
391
  bool ascQuery = (order != TSDB_ORDER_DESC);
2,147,483,647✔
392

393
  int32_t precision = pInterval->precision;
2,147,483,647✔
394
  getNextTimeWindow(pInterval, pNext, order);
2,147,483,647✔
395

396
  // next time window is not in current block
397
  if ((pNext->skey > pDataBlockInfo->window.ekey && order == TSDB_ORDER_ASC) ||
2,147,483,647✔
398
      (pNext->ekey < pDataBlockInfo->window.skey && order == TSDB_ORDER_DESC)) {
2,147,483,647✔
399
    return -1;
9,696,633✔
400
  }
401

402
  if (!inSlidingWindow(pInterval, pNext, pDataBlockInfo) && order == TSDB_ORDER_ASC) {
2,147,483,647✔
403
    return -1;
×
404
  }
405

406
  TSKEY   skey = ascQuery ? pNext->skey : pNext->ekey;
2,147,483,647✔
407
  int32_t startPos = 0;
2,147,483,647✔
408

409
  // tumbling time window query, a special case of sliding time window query
410
  if (pInterval->sliding == pInterval->interval && prevPosition != -1) {
2,147,483,647✔
411
    startPos = prevPosition + 1;
2,147,483,647✔
412
  } else {
413
    if ((skey <= pDataBlockInfo->window.skey && ascQuery) || (skey >= pDataBlockInfo->window.ekey && !ascQuery)) {
81,370,857✔
414
      startPos = 0;
9,141,473✔
415
    } else {
416
      startPos = binarySearchForKey((char*)primaryKeys, pDataBlockInfo->rows, skey, order);
75,469,571✔
417
    }
418
  }
419
  if(startPos < 0 || startPos >= pDataBlockInfo->rows) {
2,147,483,647✔
420
    return -1;
2,147,483,647✔
421
  }
422

423
  /* interp query with fill should not skip time window */
424
  //  if (pQueryAttr->pointInterpQuery && pQueryAttr->fillType != TSDB_FILL_NONE) {
425
  //    return startPos;
426
  //  }
427

428
  /*
429
   * This time window does not cover any data, try next time window,
430
   * this case may happen when the time window is too small
431
   */
432
  if (primaryKeys != NULL) {
2,147,483,647✔
433
    if (ascQuery && primaryKeys[startPos] > pNext->ekey) {
2,147,483,647✔
434
      TSKEY next = primaryKeys[startPos];
1,631,327,798✔
435
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
1,631,790,178✔
436
        pNext->skey = taosTimeTruncate(next, pInterval);
9,551,404✔
437
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
1,676✔
438
      } else {
439
        pNext->ekey += ((next - pNext->ekey + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
1,632,339,568✔
440
        pNext->skey = pNext->ekey - pInterval->interval + 1;
1,635,235,674✔
441
      }
442
    } else if ((!ascQuery) && primaryKeys[startPos] < pNext->skey) {
705,989,306✔
443
      TSKEY next = primaryKeys[startPos];
397,767,201✔
444
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
398,700,561✔
445
        pNext->skey = taosTimeTruncate(next, pInterval);
798,598✔
446
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
×
447
      } else {
448
        pNext->skey -= ((pNext->skey - next + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
398,006,992✔
449
        pNext->ekey = pNext->skey + pInterval->interval - 1;
397,773,747✔
450
      }
451
    }
452
  }
453

454
  return startPos;
2,147,483,647✔
455
}
456

457
static bool isResultRowInterpolated(SResultRow* pResult, SResultTsInterpType type) {
17,103,942✔
458
  if (type == RESULT_ROW_START_INTERP) {
17,103,942✔
459
    return pResult->startInterp == true;
5,701,314✔
460
  } else {
461
    return pResult->endInterp == true;
11,402,628✔
462
  }
463
}
464

465
static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
11,386,223✔
466
  if (type == RESULT_ROW_START_INTERP) {
11,386,223✔
467
    pResult->startInterp = true;
5,701,314✔
468
  } else {
469
    pResult->endInterp = true;
5,684,909✔
470
  }
471
}
11,386,223✔
472

473
static int32_t doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataBlock* pBlock, SResultRow* pResult,
2,147,483,647✔
474
                                           STimeWindow* win, int32_t startPos, int32_t forwardRows, SExprSupp* pSup) {
475
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
476
  int32_t lino = 0;
2,147,483,647✔
477
  if (!pInfo->timeWindowInterpo) {
2,147,483,647✔
478
    return code;
2,147,483,647✔
479
  }
480

481
  if (pBlock == NULL) {
2,340,995✔
482
    code = TSDB_CODE_INVALID_PARA;
×
483
    return code;
×
484
  }
485

486
  if (pBlock->pDataBlock == NULL) {
2,340,995✔
487
    return code;
×
488
  }
489

490
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
5,701,314✔
491

492
  TSKEY* tsCols = (TSKEY*)(pColInfo->pData);
5,701,314✔
493
  bool   done = isResultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
5,701,314✔
494
  if (!done) {  // it is not interpolated, now start to generated the interpolated value
5,701,314✔
495
    bool interp = setTimeWindowInterpolationStartTs(pInfo, startPos, pBlock, tsCols, win, pSup);
5,701,314✔
496
    if (interp) {
5,701,314✔
497
      setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
5,701,314✔
498
    }
499
  } else {
500
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
×
501
  }
502

503
  // point interpolation does not require the end key time window interpolation.
504
  // interpolation query does not generate the time window end interpolation
505
  done = isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
5,701,314✔
506
  if (!done) {
5,701,314✔
507
    int32_t endRowIndex = startPos + forwardRows - 1;
5,701,314✔
508
    int32_t nextRowIndex = endRowIndex + 1;
5,701,314✔
509

510
    // duplicated ts row does not involve in the interpolation of end value for current time window
511
    int32_t x = endRowIndex;
5,701,314✔
512
    while (x > 0) {
5,717,865✔
513
      if (tsCols[x] == tsCols[x - 1]) {
5,704,871✔
514
        x -= 1;
16,551✔
515
      } else {
516
        endRowIndex = x;
5,688,320✔
517
        break;
5,688,320✔
518
      }
519
    }
520

521
    TSKEY endKey = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey;
5,701,314✔
522
    bool  interp = false;
5,701,314✔
523
    code = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, nextRowIndex, pBlock->pDataBlock, tsCols, endKey,
5,701,314✔
524
                                           win, &interp);
525
    QUERY_CHECK_CODE(code, lino, _end);
5,701,314✔
526
    if (interp) {
5,701,314✔
527
      setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
5,684,909✔
528
    }
529
  } else {
530
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
×
531
  }
532

533
_end:
5,701,314✔
534
  if (code != TSDB_CODE_SUCCESS) {
5,701,314✔
535
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
536
  }
537
  return code;
5,701,314✔
538
}
539

540
static void saveDataBlockLastRow(SArray* pPrevKeys, const SSDataBlock* pBlock, SArray* pCols) {
18,273✔
541
  if (pBlock->pDataBlock == NULL) {
18,273✔
542
    return;
×
543
  }
544

545
  size_t num = taosArrayGetSize(pPrevKeys);
18,273✔
546
  for (int32_t k = 0; k < num; ++k) {
54,819✔
547
    SColumn* pc = taosArrayGet(pCols, k);
36,546✔
548

549
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pc->slotId);
36,546✔
550

551
    SGroupKeys* pkey = taosArrayGet(pPrevKeys, k);
36,546✔
552
    for (int32_t i = pBlock->info.rows - 1; i >= 0; --i) {
36,546✔
553
      if (colDataIsNull_s(pColInfo, i)) {
73,092✔
554
        continue;
×
555
      }
556

557
      char* val = colDataGetData(pColInfo, i);
36,546✔
558
      if (IS_VAR_DATA_TYPE(pkey->type)) {
36,546✔
559
        if (IS_STR_DATA_BLOB(pkey->type)) {
×
560
          memcpy(pkey->pData, val, blobDataTLen(val));
×
561
        } else {
562
          memcpy(pkey->pData, val, varDataTLen(val));
×
563
        }
564
      } else {
565
        memcpy(pkey->pData, val, pkey->bytes);
36,546✔
566
      }
567

568
      break;
36,546✔
569
    }
570
  }
571
}
572

573
static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t numOfExprs, SResultRowInfo* pResultRowInfo,
18,273✔
574
                                       SSDataBlock* pBlock, int32_t scanFlag, int64_t* tsCols, SResultRowPosition* p) {
575
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
18,273✔
576

577
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
18,273✔
578
  SExprSupp*                pSup = &pOperatorInfo->exprSupp;
18,273✔
579

580
  int32_t startPos = 0;
18,273✔
581
  int32_t numOfOutput = pSup->numOfExprs;
18,273✔
582

583
  SResultRow* pResult = NULL;
18,273✔
584

585
  while (1) {
×
586
    SListNode*          pn = tdListGetHead(pResultRowInfo->openWindow);
18,273✔
587
    SOpenWindowInfo*    pOpenWin = (SOpenWindowInfo*)pn->data;
18,273✔
588
    uint64_t            groupId = pOpenWin->groupId;
18,273✔
589
    SResultRowPosition* p1 = &pOpenWin->pos;
18,273✔
590
    if (p->pageId == p1->pageId && p->offset == p1->offset) {
18,273✔
591
      break;
18,273✔
592
    }
593

594
    SResultRow* pr = getResultRowByPos(pInfo->aggSup.pResultBuf, p1, false);
×
595
    if (NULL == pr) {
×
596
      T_LONG_JMP(pTaskInfo->env, terrno);
×
597
    }
598

599
    if (!(pr->offset == p1->offset && pr->pageId == p1->pageId)) {
×
600
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
601
      T_LONG_JMP(pTaskInfo->env, terrno);
×
602
    }
603

604
    if (pr->closed) {
×
605
      if (!(isResultRowInterpolated(pr, RESULT_ROW_START_INTERP) &&
×
606
            isResultRowInterpolated(pr, RESULT_ROW_END_INTERP))) {
×
607
        pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
608
        T_LONG_JMP(pTaskInfo->env, terrno);
×
609
      }
610
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
×
611
      taosMemoryFree(pNode);
×
612
      continue;
×
613
    }
614

615
    STimeWindow w = pr->win;
×
616
    int32_t     ret = setTimeWindowOutputBuf(pResultRowInfo, &w, (scanFlag == MAIN_SCAN), &pResult, groupId, pSup->pCtx,
×
617
                                             numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
618
    if (ret != TSDB_CODE_SUCCESS) {
×
619
      T_LONG_JMP(pTaskInfo->env, ret);
×
620
    }
621

622
    if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
×
623
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
624
      T_LONG_JMP(pTaskInfo->env, terrno);
×
625
    }
626

627
    SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
×
628
    if (!pTsKey) {
×
629
      pTaskInfo->code = terrno;
×
630
      T_LONG_JMP(pTaskInfo->env, terrno);
×
631
    }
632

633
    int64_t prevTs = *(int64_t*)pTsKey->pData;
×
634
    if (groupId == pBlock->info.id.groupId) {
×
635
      TSKEY curTs = pBlock->info.window.skey;
×
636
      if (tsCols != NULL) {
×
637
        curTs = tsCols[startPos];
×
638
      }
639
      doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, curTs, startPos, w.ekey,
×
640
                                RESULT_ROW_END_INTERP, pSup);
641
    }
642

643
    setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
×
644
    setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
×
645

646
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, 1);
×
647
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0,
×
648
                                          pBlock->info.rows, numOfExprs);
×
649
    if (ret != TSDB_CODE_SUCCESS) {
×
650
      T_LONG_JMP(pTaskInfo->env, ret);
×
651
    }
652

653
    if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
×
654
      closeResultRow(pr);
×
655
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
×
656
      taosMemoryFree(pNode);
×
657
    } else {  // the remains are can not be closed yet.
658
      break;
×
659
    }
660
  }
661
}
18,273✔
662

663
static bool tsKeyCompFn(void* l, void* r, void* param) {
1,921,893,736✔
664
  TSKEY*                    lTS = (TSKEY*)l;
1,921,893,736✔
665
  TSKEY*                    rTS = (TSKEY*)r;
1,921,893,736✔
666
  SIntervalAggOperatorInfo* pInfo = param;
1,921,893,736✔
667
  return pInfo->binfo.outputTsOrder == ORDER_ASC ? *lTS < *rTS : *lTS > *rTS;
1,921,893,736✔
668
}
669

670
static bool isCalculatedWin(SIntervalAggOperatorInfo* pInfo, const STimeWindow* win, uint64_t tableGroupId) {
299,120,415✔
671
  char keyBuf[sizeof(TSKEY) + sizeof(uint64_t)] = {0};
299,120,415✔
672
  SET_RES_WINDOW_KEY(keyBuf, (char*)&win->skey, sizeof(TSKEY), tableGroupId);
299,370,934✔
673
  return tSimpleHashGet(pInfo->aggSup.pResultRowHashTable, keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))) != NULL;
300,555,309✔
674
}
675

676
/**
677
 * @brief check if cur window should be filtered out by limit info
678
 * @retval true if should be filtered out
679
 * @retval false if not filtering out
680
 * @note If no limit info, we skip filtering.
681
 *       If input/output ts order mismatch, we skip filtering too.
682
 *       eg. input ts order: desc, and output ts order: asc, limit: 10
683
 *       IntervalOperator should output the first 10 windows, however, we can't find the first 10 windows until we scan
684
 *       every tuple in every block.
685
 *       And the boundedQueue keeps refreshing all records with smaller ts key.
686
 */
687
static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STimeWindow* win, uint64_t groupId,
2,147,483,647✔
688
                                  SExecTaskInfo* pTaskInfo) {
689
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
690
  int32_t lino = 0;
2,147,483,647✔
691
  if (!pOperatorInfo->limited  // if no limit info, no filter will be applied
2,147,483,647✔
692
      || pOperatorInfo->binfo.inputTsOrder != pOperatorInfo->binfo.outputTsOrder
701,350,332✔
693
      // if input/output ts order mismatch, no filter
694
  ) {
695
    return false;
2,147,483,647✔
696
  }
697

698
  if (pOperatorInfo->limit == 0) return true;
301,270,193✔
699

700
  if (pOperatorInfo->pBQ == NULL) {
301,307,752✔
701
    pOperatorInfo->pBQ = createBoundedQueue(pOperatorInfo->limit - 1, tsKeyCompFn, taosAutoMemoryFree, pOperatorInfo);
361,552✔
702
    QUERY_CHECK_NULL(pOperatorInfo->pBQ, code, lino, _end, terrno);
361,552✔
703
  }
704

705
  bool shouldFilter = false;
301,146,298✔
706
  // if BQ has been full, compare it with top of BQ
707
  if (taosBQSize(pOperatorInfo->pBQ) == taosBQMaxSize(pOperatorInfo->pBQ) + 1) {
301,146,298✔
708
    PriorityQueueNode* top = taosBQTop(pOperatorInfo->pBQ);
83,029,684✔
709
    shouldFilter = tsKeyCompFn(top->data, &win->skey, pOperatorInfo);
83,020,967✔
710
  }
711
  if (shouldFilter) {
300,401,942✔
712
    return true;
950,281✔
713
  } else if (isCalculatedWin(pOperatorInfo, win, groupId)) {
299,451,661✔
714
    return false;
122,116,345✔
715
  }
716

717
  // cur win not been filtered out and not been pushed into BQ yet, push it into BQ
718
  PriorityQueueNode node = {.data = taosMemoryMalloc(sizeof(TSKEY))};
178,194,509✔
719
  QUERY_CHECK_NULL(node.data, code, lino, _end, terrno);
177,856,441✔
720

721
  *((TSKEY*)node.data) = win->skey;
177,856,441✔
722

723
  if (NULL == taosBQPush(pOperatorInfo->pBQ, &node)) {
177,916,323✔
724
    taosMemoryFree(node.data);
×
725
    return true;
×
726
  }
727

728
_end:
178,316,168✔
729
  if (code != TSDB_CODE_SUCCESS) {
178,006,146✔
UNCOV
730
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
731
    pTaskInfo->code = code;
×
732
    T_LONG_JMP(pTaskInfo->env, code);
×
733
  }
734
  return false;
178,138,417✔
735
}
736

737
int32_t getNumOfRowsInTimeWinUnsorted(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, STimeWindow* win,
2,147,483,647✔
738
                                      int32_t startPos) {
739
  int32_t rows = pDataBlockInfo->rows;
2,147,483,647✔
740
  for (int32_t i = startPos; i < pDataBlockInfo->rows; ++i) {
2,147,483,647✔
741
    if (pPrimaryColumn[i] >= win->skey && pPrimaryColumn[i] <= win->ekey) {
2,147,483,647✔
742
      continue;
2,147,483,647✔
743
    } else {
744
      return i - startPos;
2,147,483,647✔
745
    }
746
  }
747
  return rows - startPos;
107,201,048✔
748
}
749

750
static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
116,738,439✔
751
                            int32_t scanFlag) {
752
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
116,738,439✔
753

754
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
116,742,282✔
755
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
116,739,261✔
756

757
  int32_t     startPos = 0;
116,740,783✔
758
  int32_t     numOfOutput = pSup->numOfExprs;
116,740,783✔
759
  int64_t*    tsCols = extractTsCol(pBlock, pInfo, pTaskInfo);
116,738,141✔
760
  uint64_t    tableGroupId = pBlock->info.id.groupId;
116,737,895✔
761
  bool        ascScan = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
116,740,031✔
762
  SResultRow* pResult = NULL;
116,737,795✔
763
  bool        sorted = pInfo->binfo.inputTsOrder == ORDER_ASC || pInfo->binfo.inputTsOrder == ORDER_DESC || tsCols == NULL;
116,737,880✔
764
  TSKEY       ts = sorted ? getStartTsKey(&pBlock->info.window, tsCols) : tsCols[startPos];
116,737,250✔
765

766
  if (tableGroupId != pInfo->curGroupId) {
116,734,943✔
767
    pInfo->handledGroupNum += 1;
13,358,289✔
768
    if (pInfo->slimited && pInfo->handledGroupNum > pInfo->slimit) {
13,357,977✔
769
      return true;
35,436✔
770
    } else {
771
      pInfo->curGroupId = tableGroupId;
13,322,541✔
772
      destroyBoundedQueue(pInfo->pBQ);
13,322,541✔
773
      pInfo->pBQ = NULL;
13,321,649✔
774
    }
775
  }
776

777
  STimeWindow win =
116,701,269✔
778
      getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->binfo.inputTsOrder);
116,693,159✔
779
  if (filterWindowWithLimit(pInfo, &win, tableGroupId, pTaskInfo)) return false;
116,694,978✔
780

781
  int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
115,915,741✔
782
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
783
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
115,921,193✔
784
    T_LONG_JMP(pTaskInfo->env, ret);
3,058✔
785
  }
786

787
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
115,918,135✔
788
  int32_t forwardRows = sorted ? getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey,
9,910,240✔
789
                                                          NULL, pInfo->binfo.inputTsOrder)
790
                               : getNumOfRowsInTimeWinUnsorted(&pBlock->info, tsCols, &win, startPos);
115,918,135✔
791

792
  // prev time window not interpolation yet.
793
  if (pInfo->timeWindowInterpo) {
115,921,911✔
794
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
18,273✔
795
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
18,273✔
796

797
    // restore current time window
798
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
18,273✔
799
                                 numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
800
    if (ret != TSDB_CODE_SUCCESS) {
18,273✔
801
      T_LONG_JMP(pTaskInfo->env, ret);
×
802
    }
803

804
    // window start key interpolation
805
    ret = doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup);
18,273✔
806
    if (ret != TSDB_CODE_SUCCESS) {
18,273✔
807
      T_LONG_JMP(pTaskInfo->env, ret);
×
808
    }
809
  }
810
  // qDebug("hashIntervalAgg1 window skey: %lld, ekey:%lld, startPos: %d, forwardRows: %d",
811
  //   win.skey, win.ekey, startPos, forwardRows);
812
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, 1);
115,921,270✔
813
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
115,919,616✔
814
                                        pBlock->info.rows, numOfOutput);
115,916,578✔
815
  if (ret != TSDB_CODE_SUCCESS) {
115,913,639✔
816
    T_LONG_JMP(pTaskInfo->env, ret);
×
817
  }
818

819
  doCloseWindow(pResultRowInfo, pInfo, pResult);
115,913,639✔
820

821
  STimeWindow nextWin = win;
115,915,763✔
822
  int32_t rows = pBlock->info.rows;
115,917,273✔
823

824
  while (startPos < pBlock->info.rows) {
2,147,483,647✔
825
    if (sorted) {
2,147,483,647✔
826
      startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, forwardRows - 1 + startPos,
2,147,483,647✔
827
                                        pInfo->binfo.inputTsOrder);
828
      if (startPos < 0) {
2,147,483,647✔
829
        break;
9,696,633✔
830
      }
831
    } else {
832
      pBlock->info.rows = forwardRows;
2,147,483,647✔
833
      int32_t newStartOff = forwardRows >= 1
2,147,483,647✔
834
                                ? getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols + startPos,
2,147,483,647✔
835
                                                         forwardRows - 1, pInfo->binfo.inputTsOrder)
836
                                : -1;
2,147,483,647✔
837
      pBlock->info.rows = rows;
2,147,483,647✔
838
      if (newStartOff >= 0) {
2,147,483,647✔
839
        startPos += newStartOff;
42,347,180✔
840
      } else if ((startPos += forwardRows) < pBlock->info.rows) {
2,147,483,647✔
841
        getInitialStartTimeWindow(&pInfo->interval, tsCols[startPos], &nextWin, true);
2,147,483,647✔
842
      }
843
      if (startPos >= pBlock->info.rows) {
2,147,483,647✔
844
        break;
106,011,951✔
845
      }
846
    }
847

848
    if (filterWindowWithLimit(pInfo, &nextWin, tableGroupId, pTaskInfo)) {
2,147,483,647✔
849
      break;
216,090✔
850
    }
851

852
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
2,147,483,647✔
853
    forwardRows = sorted ? getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
2,147,483,647✔
854
                                                    pInfo->binfo.inputTsOrder)
855
                         : getNumOfRowsInTimeWinUnsorted(&pBlock->info, tsCols, &nextWin, startPos);
2,147,483,647✔
856
    if (forwardRows == 0) continue;
2,147,483,647✔
857

858
    // null data, failed to allocate more memory buffer
859
    int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
2,147,483,647✔
860
                                          pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
861
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
2,147,483,647✔
862
      T_LONG_JMP(pTaskInfo->env, code);
66✔
863
    }
864

865
    // window start(end) key interpolation
866
    code = doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
2,147,483,647✔
867
    if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
868
      T_LONG_JMP(pTaskInfo->env, code);
×
869
    }
870
    // TODO: add to open window? how to close the open windows after input blocks exhausted?
871
#if 0
872
    if ((ascScan && ekey <= pBlock->info.window.ekey) ||
873
        (!ascScan && ekey >= pBlock->info.window.skey)) {
874
      // window start(end) key interpolation
875
      doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
876
    } else if (pInfo->timeWindowInterpo) {
877
      addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
878
    }
879
#endif
880
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, 1);
2,147,483,647✔
881
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
2,147,483,647✔
882
                                          pBlock->info.rows, numOfOutput);
2,147,483,647✔
883
    if (ret != TSDB_CODE_SUCCESS) {
2,147,483,647✔
884
      T_LONG_JMP(pTaskInfo->env, ret);
×
885
    }
886
    doCloseWindow(pResultRowInfo, pInfo, pResult);
2,147,483,647✔
887
  }
888

889
  if (pInfo->timeWindowInterpo) {
114,835,942✔
890
    saveDataBlockLastRow(pInfo->pPrevValues, pBlock, pInfo->pInterpCols);
18,273✔
891
  }
892
  return false;
115,922,302✔
893
}
894

895
void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult) {
2,147,483,647✔
896
  // current result is done in computing final results.
897
  if (pInfo->timeWindowInterpo && isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
2,147,483,647✔
898
    closeResultRow(pResult);
5,684,909✔
899
    SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
5,684,909✔
900
    taosMemoryFree(pNode);
5,684,909✔
901
  }
902
}
2,147,483,647✔
903

904
SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId,
18,273✔
905
                                       SExecTaskInfo* pTaskInfo) {
906
  int32_t         code = TSDB_CODE_SUCCESS;
18,273✔
907
  int32_t         lino = 0;
18,273✔
908
  SOpenWindowInfo openWin = {0};
18,273✔
909
  openWin.pos.pageId = pResult->pageId;
18,273✔
910
  openWin.pos.offset = pResult->offset;
18,273✔
911
  openWin.groupId = groupId;
18,273✔
912
  SListNode* pn = tdListGetTail(pResultRowInfo->openWindow);
18,273✔
913
  if (pn == NULL) {
18,273✔
914
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
18,273✔
915
    QUERY_CHECK_CODE(code, lino, _end);
18,273✔
916
    return openWin.pos;
18,273✔
917
  }
918

919
  SOpenWindowInfo* px = (SOpenWindowInfo*)pn->data;
×
920
  if (px->pos.pageId != openWin.pos.pageId || px->pos.offset != openWin.pos.offset || px->groupId != openWin.groupId) {
×
921
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
×
922
    QUERY_CHECK_CODE(code, lino, _end);
×
923
  }
924

925
_end:
×
926
  if (code != TSDB_CODE_SUCCESS) {
×
927
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
928
    pTaskInfo->code = code;
×
929
    T_LONG_JMP(pTaskInfo->env, code);
×
930
  }
931
  return openWin.pos;
×
932
}
933

934
int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo) {
120,620,972✔
935
  TSKEY* tsCols = NULL;
120,620,972✔
936

937
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
120,620,972✔
938
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
120,623,007✔
939
    if (!pColDataInfo) {
120,619,318✔
940
      pTaskInfo->code = terrno;
×
941
      T_LONG_JMP(pTaskInfo->env, terrno);
×
942
    }
943

944
    tsCols = (int64_t*)pColDataInfo->pData;
120,619,318✔
945
    if (tsCols[0] == 0) {
120,620,887✔
946
      qWarn("%s at line %d.block start ts:%" PRId64 ",end ts:%" PRId64, __func__, __LINE__, tsCols[0],
418✔
947
            tsCols[pBlock->info.rows - 1]);
948
    }
949

950
    if (tsCols[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
120,627,813✔
951
      int32_t code = blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
7,783,941✔
952
      if (code != TSDB_CODE_SUCCESS) {
7,777,277✔
953
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
954
        pTaskInfo->code = code;
×
955
        T_LONG_JMP(pTaskInfo->env, code);
×
956
      }
957
    }
958
  }
959

960
  return tsCols;
120,620,432✔
961
}
962

963
static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
25,797,952✔
964
  if (OPTR_IS_OPENED(pOperator)) {
25,797,952✔
965
    return TSDB_CODE_SUCCESS;
21,014,729✔
966
  }
967

968
  int32_t        code = TSDB_CODE_SUCCESS;
4,784,841✔
969
  int32_t        lino = 0;
4,784,841✔
970
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
4,784,841✔
971
  SOperatorInfo* downstream = pOperator->pDownstream[0];
4,782,119✔
972

973
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
4,782,533✔
974
  SExprSupp*                pSup = &pOperator->exprSupp;
4,784,841✔
975

976
  int32_t scanFlag = MAIN_SCAN;
4,781,322✔
977

978
  pInfo->cleanGroupResInfo = false;
4,781,322✔
979
  while (1) {
116,702,584✔
980
    SSDataBlock* pBlock = getNextBlockFromDownstreamRemainDetach(pOperator, 0);
121,480,864✔
981
    if (pBlock == NULL) {
121,484,362✔
982
      break;
4,709,531✔
983
    }
984

985
    pInfo->binfo.pRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
116,774,831✔
986

987
    if (pInfo->scalarSupp.pExprInfo != NULL) {
116,778,952✔
988
      SExprSupp* pExprSup = &pInfo->scalarSupp;
10,469,124✔
989
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
10,468,058✔
990
                                   GET_STM_RTINFO(pOperator->pTaskInfo));
10,468,564✔
991
      QUERY_CHECK_CODE(code, lino, _end);
10,468,058✔
992
    }
993

994
    // the pDataBlock are always the same one, no need to call this again
995
    code = setInputDataBlock(pSup, pBlock, pInfo->binfo.inputTsOrder, scanFlag, true);
116,738,216✔
996
    QUERY_CHECK_CODE(code, lino, _end);
116,739,721✔
997
    if (hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag)) break;
116,739,721✔
998
  }
999

1000
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->binfo.outputTsOrder);
4,744,967✔
1001
  QUERY_CHECK_CODE(code, lino, _end);
4,745,222✔
1002
  pInfo->cleanGroupResInfo = true;
4,745,222✔
1003

1004
  OPTR_SET_OPENED(pOperator);
4,745,346✔
1005

1006
_end:
4,783,405✔
1007
  if (code != TSDB_CODE_SUCCESS) {
4,783,405✔
1008
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
38,640✔
1009
    pTaskInfo->code = code;
38,640✔
1010
    T_LONG_JMP(pTaskInfo->env, code);
38,640✔
1011
  }
1012
  return code;
4,744,765✔
1013
}
1014

1015
// start a new state window and record the start info
1016
void doKeepNewStateWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList,
2,147,483,647✔
1017
  int32_t rowIndex, uint64_t groupId, const EStateWinExtendOption* extendOption, bool hasPrevWin) {
1018
  pRowSup->groupId = groupId;
2,147,483,647✔
1019
  if (*extendOption == STATE_WIN_EXTEND_OPTION_DEFAULT ||
2,147,483,647✔
1020
      *extendOption == STATE_WIN_EXTEND_OPTION_BACKWARD) {
2,147,483,647✔
1021
    pRowSup->win.skey = tsList[rowIndex];
2,147,483,647✔
1022
    pRowSup->startRowIndex = rowIndex;
2,147,483,647✔
1023
    pRowSup->numOfRows = 0;  // does not include the current row yet
2,147,483,647✔
1024
  } else {
1025
    pRowSup->startRowIndex = hasContinuousNullRows(pRowSup) ?
2,147,483,647✔
1026
      rowIndex - pRowSup->numNullRows : rowIndex;
1,367,592,084✔
1027
    pRowSup->win.skey = hasPrevWin ?
1,367,592,084✔
1028
                        pRowSup->win.ekey + 1 : tsList[pRowSup->startRowIndex];
1,367,592,084✔
1029
    pRowSup->numOfRows = rowIndex - pRowSup->startRowIndex;
1,367,592,084✔
1030
  }
1031
  resetNumNullRows(pRowSup);
2,147,483,647✔
1032
}
2,147,483,647✔
1033

1034
// close a state window and record its end info
1035
// this functions is called when a new state row appears
1036
// @param rowIndex the index of the first row of next window
1037
void doKeepCurStateWindowEndInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, 
2,147,483,647✔
1038
                                 int32_t rowIndex,
1039
                                 const EStateWinExtendOption* extendOption,
1040
                                 bool hasNextWin) {
1041
  if (*extendOption == STATE_WIN_EXTEND_OPTION_BACKWARD) {
2,147,483,647✔
1042
      pRowSup->win.ekey = hasNextWin?
1,379,830,808✔
1043
                          tsList[rowIndex] - 1 : pRowSup->prevTs;
1,379,830,397✔
1044
      // continuous rows having null state col should be included in this window
1045
      pRowSup->numOfRows += hasContinuousNullRows(pRowSup) ?
2,147,483,647✔
1046
        pRowSup->numNullRows : 0;
1,379,830,808✔
1047
      resetNumNullRows(pRowSup);
1,379,830,808✔
1048
  }
1049
}
2,147,483,647✔
1050

1051
void doKeepStateWindowNullInfo(SWindowRowsSup* pRowSup, TSKEY nullRowTs) {
1,137,339,206✔
1052
  pRowSup->numNullRows += 1;
1,137,339,206✔
1053
  pRowSup->prevTs = nullRowTs;
1,137,339,206✔
1054
}
1,137,339,206✔
1055

1056
/**
1057
  @brief Process the closed state window and do aggregation on the tuples
1058
  within the window. Partial results are stored in the output buffer. If window
1059
  has no valid rows, return success.
1060
*/
1061
static int32_t processClosedStateWindow(SStateWindowOperatorInfo* pInfo,
2,147,483,647✔
1062
                                        SWindowRowsSup* pRowSup,
1063
                                        SExecTaskInfo* pTaskInfo,
1064
                                        SExprSupp* pSup,
1065
                                        int32_t numOfOutput) {
1066
  if (pRowSup->numOfRows == 0) {
2,147,483,647✔
1067
    // no valid rows in the window
1068
    return TSDB_CODE_SUCCESS;
26,039,192✔
1069
  }
1070
  int32_t     code = TSDB_CODE_SUCCESS;
2,147,483,647✔
1071
  int32_t     lino = 0;
2,147,483,647✔
1072
  SResultRow* pResult = NULL;
2,147,483,647✔
1073
  code = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win,
2,147,483,647✔
1074
    true, &pResult, pRowSup->groupId, pSup->pCtx, numOfOutput,
2,147,483,647✔
1075
    pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1076
  QUERY_CHECK_CODE(code, lino, _return);
2,147,483,647✔
1077

1078
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
2,147,483,647✔
1079
  pResult->nOrigRows += pRowSup->numOfRows;
2,147,483,647✔
1080
  code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx,
2,147,483,647✔
1081
    &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
1082
    pRowSup->numOfRows, 0, numOfOutput);
1083
  QUERY_CHECK_CODE(code, lino, _return);
2,147,483,647✔
1084

1085
_return:
2,147,483,647✔
1086
  if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
1087
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
471✔
1088
  }
1089
  return code;
2,147,483,647✔
1090
}
1091

1092
// process a data block for state window aggregation
1093
// scan from startIndex to endIndex
1094
// numPartialCalcRows returns the number of rows that have been
1095
// partially calculated within the block
1096
static void doStateWindowAggImpl(SOperatorInfo* pOperator,
43,855,449✔
1097
                                 SStateWindowOperatorInfo* pInfo,
1098
                                 SSDataBlock* pBlock, int32_t* startIndex,
1099
                                 int32_t* endIndex,
1100
                                 int32_t* numPartialCalcRows) {
1101
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
43,855,449✔
1102
  SExprSupp*     pExprSup = &pOperator->exprSupp;
43,855,449✔
1103

1104
  SColumnInfoData* pStateColInfoData = 
1105
    taosArrayGet(pBlock->pDataBlock, pInfo->stateCol.slotId);
43,855,449✔
1106
  if (!pStateColInfoData) {
43,855,449✔
1107
    pTaskInfo->code = terrno;
×
1108
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1109
  }
1110
  uint64_t gid = pBlock->info.id.groupId;
43,855,449✔
1111
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
43,855,449✔
1112
  int32_t bytes = pStateColInfoData->info.bytes;
43,855,449✔
1113

1114
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock,
43,855,449✔
1115
                                               pInfo->tsSlotId);
43,855,038✔
1116
  if (NULL == pColInfoData) {
43,855,038✔
1117
    pTaskInfo->code = terrno;
×
1118
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1119
  }
1120
  TSKEY* tsList = (TSKEY*)pColInfoData->pData;
43,855,038✔
1121

1122
  struct SColumnDataAgg* pAgg = (pBlock->pBlockAgg != NULL) ?
43,855,449✔
1123
                                &pBlock->pBlockAgg[pInfo->stateCol.slotId] :
43,855,449✔
1124
                                NULL;
1125
  EStateWinExtendOption  extendOption = pInfo->extendOption;
43,855,449✔
1126
  SWindowRowsSup*        pRowSup = &pInfo->winSup;
43,855,449✔
1127

1128
  if (pRowSup->groupId != gid) {
43,855,449✔
1129
    /*
1130
      group changed, process the previous group's unclosed state window first
1131
    */
1132
    doKeepCurStateWindowEndInfo(pRowSup, tsList, 0, &extendOption, false);
16,387,193✔
1133
    int32_t code = processClosedStateWindow(pInfo, pRowSup, pTaskInfo,
16,386,782✔
1134
                                            pExprSup, numOfOutput);
1135
    if (TSDB_CODE_SUCCESS != code) T_LONG_JMP(pTaskInfo->env, code);
16,387,193✔
1136
    *numPartialCalcRows = pRowSup->startRowIndex + pRowSup->numOfRows;
16,387,193✔
1137

1138
    /*
1139
      unhandled null rows should be ignored, since they belong to previous group
1140
    */
1141
    *numPartialCalcRows += pRowSup->numNullRows;
16,387,193✔
1142

1143
    /*
1144
      reset state window info for new group
1145
    */
1146
    pInfo->hasKey = false;
16,386,782✔
1147
    resetWindowRowsSup(pRowSup);
16,387,193✔
1148
  }
1149

1150
  for (int32_t j = *startIndex; j < *endIndex; ++j) {
2,147,483,647✔
1151
    if (pBlock->info.scanFlag != PRE_SCAN) {
2,147,483,647✔
1152
      if (pInfo->winSup.lastTs == INT64_MIN || gid != pRowSup->groupId || !pInfo->hasKey) {
2,147,483,647✔
1153
        pInfo->winSup.lastTs = tsList[j];
519,542,439✔
1154
      } else {
1155
        if (tsList[j] == pInfo->winSup.lastTs) {
2,147,483,647✔
1156
          // forbid duplicated ts rows
1157
          qError("%s:%d duplicated ts found in state window aggregation", __FILE__, __LINE__);
22,610✔
1158
          pTaskInfo->code = TSDB_CODE_QRY_WINDOW_DUP_TIMESTAMP;
22,610✔
1159
          T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_WINDOW_DUP_TIMESTAMP);
22,610✔
1160
        } else {
1161
          pInfo->winSup.lastTs = tsList[j];
2,147,483,647✔
1162
        }
1163
      }
1164
    }
1165
    if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pAgg)) {
2,147,483,647✔
1166
      doKeepStateWindowNullInfo(pRowSup, tsList[j]);
1,137,339,206✔
1167
      continue;
1,137,339,206✔
1168
    }
1169
    if (pStateColInfoData->pData == NULL) {
2,147,483,647✔
1170
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1171
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1172
    }
1173
    char* val = colDataGetData(pStateColInfoData, j);
2,147,483,647✔
1174

1175
    if (!pInfo->hasKey) {
2,147,483,647✔
1176
      assignVal(pInfo->stateKey.pData, val, bytes, pInfo->stateKey.type);
5,911,776✔
1177
      pInfo->hasKey = true;
5,911,776✔
1178
      doKeepNewStateWindowStartInfo(
5,911,776✔
1179
        pRowSup, tsList, j, gid, &extendOption, false);
1180
      doKeepTuple(pRowSup, tsList[j], j, gid);
5,911,776✔
1181
    } else if (!compareVal(val, &pInfo->stateKey)) {
2,147,483,647✔
1182
      doKeepCurStateWindowEndInfo(pRowSup, tsList, j, &extendOption, true);
2,147,483,647✔
1183
      int32_t code = processClosedStateWindow(pInfo, pRowSup, pTaskInfo,
2,147,483,647✔
1184
                                              pExprSup, numOfOutput);
1185
      if (TSDB_CODE_SUCCESS != code) {
2,147,483,647✔
1186
        T_LONG_JMP(pTaskInfo->env, code);
×
1187
      }
1188
      *numPartialCalcRows = pRowSup->startRowIndex + pRowSup->numOfRows;
2,147,483,647✔
1189

1190
      doKeepNewStateWindowStartInfo(pRowSup, tsList, j, gid,
2,147,483,647✔
1191
                                    &extendOption, true);
1192
      doKeepTuple(pRowSup, tsList[j], j, gid);
2,147,483,647✔
1193
      assignVal(pInfo->stateKey.pData, val, bytes, pInfo->stateKey.type);
2,147,483,647✔
1194
    } else {
1195
      doKeepTuple(pRowSup, tsList[j], j, gid);
2,147,483,647✔
1196
    }
1197
  }
1198

1199
  if (!pInfo->hasKey && extendOption != STATE_WIN_EXTEND_OPTION_FORWARD) {
43,832,839✔
1200
    /*
1201
      No valid state rows within the block and we don't care about
1202
      null rows before valid state window, mark them as processed and drop them
1203
    */
1204
    *numPartialCalcRows = pBlock->info.rows;
7,906,336✔
1205
    resetNumNullRows(pRowSup);
7,906,336✔
1206
    return;
7,906,336✔
1207
  }
1208
  if (pRowSup->numOfRows == 0 && 
35,926,503✔
1209
      extendOption != STATE_WIN_EXTEND_OPTION_BACKWARD) {
6,979,428✔
1210
    /*
1211
      If no valid state window or we don't know the belonging of
1212
      null rows in the end of the block, handle them with next block
1213
    */
1214
    return;
5,820,360✔
1215
  }
1216
  doKeepCurStateWindowEndInfo(pRowSup, tsList, *endIndex, &extendOption, false);
30,106,143✔
1217
  int32_t code = processClosedStateWindow(pInfo, pRowSup, pTaskInfo,
30,106,143✔
1218
                                          pExprSup, numOfOutput);
1219
  if (TSDB_CODE_SUCCESS != code) {
30,106,143✔
1220
    pTaskInfo->code = code;
471✔
1221
    T_LONG_JMP(pTaskInfo->env, code);
471✔
1222
  }
1223
  *numPartialCalcRows = pRowSup->startRowIndex + pRowSup->numOfRows;
30,105,672✔
1224
  // reset part of pRowSup after doing agg calculation
1225
  pRowSup->startRowIndex = 0;
30,105,672✔
1226
  pRowSup->numOfRows = 0;
30,105,672✔
1227
}
1228

1229
static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) {
3,945,763✔
1230
  if (OPTR_IS_OPENED(pOperator)) {
3,945,763✔
1231
    return TSDB_CODE_SUCCESS;
2,820,049✔
1232
  }
1233

1234
  int32_t                   code = TSDB_CODE_SUCCESS;
1,125,714✔
1235
  int32_t                   lino = 0;
1,125,714✔
1236
  SStateWindowOperatorInfo* pInfo = pOperator->info;
1,125,714✔
1237
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
1,125,714✔
1238

1239
  SExprSupp* pSup = &pOperator->exprSupp;
1,125,714✔
1240
  int32_t    order = pInfo->binfo.inputTsOrder;
1,125,714✔
1241

1242
  SOperatorInfo* downstream = pOperator->pDownstream[0];
1,125,714✔
1243
  pInfo->cleanGroupResInfo = false;
1,125,714✔
1244

1245
  SSDataBlock* pUnfinishedBlock = NULL;
1,125,714✔
1246
  int32_t      startIndex = 0;
1,125,714✔
1247
  int32_t      endIndex = 0;
1,125,714✔
1248
  int32_t      numPartialCalcRows = 0;
1,125,714✔
1249
  while (1) {
43,832,368✔
1250
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
44,958,082✔
1251
    if (pBlock == NULL) {
44,956,852✔
1252
      if (pUnfinishedBlock != NULL) {
1,086,283✔
1253
        blockDataDestroy(pUnfinishedBlock);
29,888✔
1254
        pUnfinishedBlock = NULL;
29,888✔
1255
        resetWindowRowsSup(&pInfo->winSup);
29,888✔
1256
      }
1257
      break;
1,086,283✔
1258
    }
1259
    
1260
    // mark whether pUnfinishedBlock is a reference to pBlock
1261
    bool isRef = false;
43,870,569✔
1262
    startIndex = 0;
43,870,569✔
1263
    if (pUnfinishedBlock != NULL) {
43,870,569✔
1264
      startIndex = pUnfinishedBlock->info.rows;
10,741,387✔
1265
      // merge unfinished block with current block
1266
      code = blockDataMerge(pUnfinishedBlock, pBlock);
10,741,387✔
1267
      // reset id to current block id
1268
      pUnfinishedBlock->info.id = pBlock->info.id;
10,741,387✔
1269
      QUERY_CHECK_CODE(code, lino, _end);
10,741,387✔
1270
    } else {
1271
      pUnfinishedBlock = pBlock;
33,129,182✔
1272
      isRef = true;
33,129,182✔
1273
    }
1274
    endIndex = pUnfinishedBlock->info.rows;
43,870,569✔
1275

1276
    pInfo->binfo.pRes->info.scanFlag = pUnfinishedBlock->info.scanFlag;
43,870,569✔
1277
    pInfo->binfo.pRes->info.dataLoad = 1;
43,870,569✔
1278
    code = setInputDataBlock(
43,870,569✔
1279
      pSup, pUnfinishedBlock, order, pUnfinishedBlock->info.scanFlag, true);
43,870,569✔
1280
    QUERY_CHECK_CODE(code, lino, _end);
43,870,569✔
1281

1282
    code = blockDataUpdateTsWindow(pUnfinishedBlock, pInfo->tsSlotId);
43,870,569✔
1283
    QUERY_CHECK_CODE(code, lino, _end);
43,870,569✔
1284

1285
    // there is an scalar expression that 
1286
    // needs to be calculated right before apply the group aggregation.
1287
    if (pInfo->scalarSup.pExprInfo != NULL) {
43,870,569✔
1288
      pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo,
340,849✔
1289
        pUnfinishedBlock, pUnfinishedBlock, pInfo->scalarSup.pCtx,
1290
        pInfo->scalarSup.numOfExprs, NULL,
1291
        GET_STM_RTINFO(pOperator->pTaskInfo));
340,849✔
1292
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
340,849✔
1293
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
15,120✔
1294
      }
1295
    }
1296

1297
    doStateWindowAggImpl(pOperator, pInfo, pUnfinishedBlock, 
43,855,449✔
1298
      &startIndex, &endIndex, &numPartialCalcRows);
1299
    if (numPartialCalcRows < pUnfinishedBlock->info.rows) {
43,832,368✔
1300
      // save unfinished block for next round processing
1301
      if (isRef) {
10,771,275✔
1302
        code = createOneDataBlock(pBlock, true, &pUnfinishedBlock);
4,256,152✔
1303
        QUERY_CHECK_CODE(code, lino, _end);
4,256,152✔
1304
      }
1305
      code = blockDataTrimFirstRows(pUnfinishedBlock, numPartialCalcRows);
10,771,275✔
1306
      QUERY_CHECK_NULL(pUnfinishedBlock, code, lino, _end, terrno);
10,771,275✔
1307
    } else {
1308
      if (!isRef) {
33,061,093✔
1309
        blockDataDestroy(pUnfinishedBlock);
4,226,264✔
1310
      }
1311
      pUnfinishedBlock = NULL;
33,061,093✔
1312
    }
1313
    numPartialCalcRows = 0;
43,832,368✔
1314
  }
1315

1316
  code = initGroupedResultInfo(
1,086,283✔
1317
    &pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
1318
  QUERY_CHECK_CODE(code, lino, _end);
1,086,283✔
1319
  pInfo->cleanGroupResInfo = true;
1,086,283✔
1320
  pOperator->status = OP_RES_TO_RETURN;
1,086,283✔
1321

1322
_end:
1,086,283✔
1323
  if (code != TSDB_CODE_SUCCESS) {
1,086,283✔
1324
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1325
    pTaskInfo->code = code;
×
1326
    T_LONG_JMP(pTaskInfo->env, code);
×
1327
  }
1328
  return code;
1,086,283✔
1329
}
1330

1331
static int32_t doStateWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
4,861,328✔
1332
  if (pOperator->status == OP_EXEC_DONE) {
4,861,328✔
1333
    (*ppRes) = NULL;
915,565✔
1334
    return TSDB_CODE_SUCCESS;
915,565✔
1335
  }
1336

1337
  int32_t                   code = TSDB_CODE_SUCCESS;
3,945,763✔
1338
  int32_t                   lino = 0;
3,945,763✔
1339
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
3,945,763✔
1340
  SStateWindowOperatorInfo* pInfo = pOperator->info;
3,945,763✔
1341
  SOptrBasicInfo*           pBInfo = &pInfo->binfo;
3,945,763✔
1342

1343
  code = pOperator->fpSet._openFn(pOperator);
3,945,763✔
1344
  QUERY_CHECK_CODE(code, lino, _end);
3,906,332✔
1345

1346
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
3,906,332✔
1347
  QUERY_CHECK_CODE(code, lino, _end);
3,906,332✔
1348

1349
  while (1) {
×
1350
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
3,906,332✔
1351
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
3,906,332✔
1352
    QUERY_CHECK_CODE(code, lino, _end);
3,906,332✔
1353

1354
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
3,906,332✔
1355
    if (!hasRemain) {
3,906,332✔
1356
      setOperatorCompleted(pOperator);
1,036,543✔
1357
      break;
1,036,543✔
1358
    }
1359

1360
    if (pBInfo->pRes->info.rows > 0) {
2,869,789✔
1361
      break;
2,869,789✔
1362
    }
1363
  }
1364

1365
_end:
3,906,332✔
1366
  if (code != TSDB_CODE_SUCCESS) {
3,906,332✔
1367
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1368
    pTaskInfo->code = code;
×
1369
    T_LONG_JMP(pTaskInfo->env, code);
×
1370
  }
1371
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
3,906,332✔
1372
  return code;
3,906,332✔
1373
}
1374

1375
static int32_t doBuildIntervalResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
29,284,473✔
1376
  int32_t                   code = TSDB_CODE_SUCCESS;
29,284,473✔
1377
  int32_t                   lino = 0;
29,284,473✔
1378
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
29,284,473✔
1379
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
29,285,434✔
1380

1381
  if (pOperator->status == OP_EXEC_DONE && !pOperator->pOperatorGetParam) {
29,283,207✔
1382
    (*ppRes) = NULL;
3,485,938✔
1383
    return code;
3,485,938✔
1384
  }
1385

1386
  if (pOperator->pOperatorGetParam) {
25,797,428✔
1387
    if (pOperator->status == OP_EXEC_DONE && pOperator->fpSet.resetStateFn) {
105,218✔
1388
      code = pOperator->fpSet.resetStateFn(pOperator);
×
1389
      QUERY_CHECK_CODE(code, lino, _end);
×
1390
    }
1391
    freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
105,218✔
1392
    pOperator->pOperatorGetParam = NULL;
105,218✔
1393
  }
1394

1395
  SSDataBlock* pBlock = pInfo->binfo.pRes;
25,797,829✔
1396
  code = pOperator->fpSet._openFn(pOperator);
25,797,894✔
1397
  QUERY_CHECK_CODE(code, lino, _end);
25,759,476✔
1398

1399
  while (1) {
6,255✔
1400
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
25,765,731✔
1401
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL, NULL);
25,765,764✔
1402
    QUERY_CHECK_CODE(code, lino, _end);
25,766,330✔
1403

1404
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
25,766,330✔
1405
    if (!hasRemain) {
25,766,330✔
1406
      setOperatorCompleted(pOperator);
4,733,275✔
1407
      break;
4,733,399✔
1408
    }
1409

1410
    if (pBlock->info.rows > 0) {
21,033,055✔
1411
      break;
21,026,800✔
1412
    }
1413
  }
1414

1415
_end:
25,760,199✔
1416
  if (code != TSDB_CODE_SUCCESS) {
25,760,199✔
1417
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1418
    pTaskInfo->code = code;
×
1419
    T_LONG_JMP(pTaskInfo->env, code);
×
1420
  }
1421
  (*ppRes) = (pBlock->info.rows == 0) ? NULL : pBlock;
25,760,199✔
1422
  return code;
25,760,075✔
1423
}
1424

1425
static void destroyStateWindowOperatorInfo(void* param) {
1,072,126✔
1426
  if (param == NULL) {
1,072,126✔
1427
    return;
×
1428
  }
1429
  SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
1,072,126✔
1430
  cleanupBasicInfo(&pInfo->binfo);
1,072,126✔
1431
  taosMemoryFreeClear(pInfo->stateKey.pData);
1,072,126✔
1432
  if (pInfo->pOperator) {
1,072,126✔
1433
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
1,072,126✔
1434
                      pInfo->cleanGroupResInfo);
1,072,126✔
1435
    pInfo->pOperator = NULL;
1,072,126✔
1436
  }
1437

1438
  cleanupExprSupp(&pInfo->scalarSup);
1,072,126✔
1439
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
1,072,126✔
1440
  cleanupAggSup(&pInfo->aggSup);
1,072,126✔
1441
  cleanupGroupResInfo(&pInfo->groupResInfo);
1,072,126✔
1442

1443
  taosMemoryFreeClear(param);
1,072,126✔
1444
}
1445

1446
static void freeItem(void* param) {
34,302✔
1447
  SGroupKeys* pKey = (SGroupKeys*)param;
34,302✔
1448
  taosMemoryFree(pKey->pData);
34,302✔
1449
}
34,302✔
1450

1451
void destroyIntervalOperatorInfo(void* param) {
5,755,944✔
1452
  if (param == NULL) {
5,755,944✔
1453
    return;
×
1454
  }
1455

1456
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
5,755,944✔
1457

1458
  cleanupBasicInfo(&pInfo->binfo);
5,755,944✔
1459
  if (pInfo->pOperator) {
5,755,403✔
1460
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
5,718,984✔
1461
                      pInfo->cleanGroupResInfo);
5,718,989✔
1462
    pInfo->pOperator = NULL;
5,718,443✔
1463
  }
1464

1465
  cleanupAggSup(&pInfo->aggSup);
5,756,324✔
1466
  cleanupExprSupp(&pInfo->scalarSupp);
5,755,857✔
1467

1468
  pInfo->binfo.resultRowInfo.openWindow = tdListFree(pInfo->binfo.resultRowInfo.openWindow);
5,756,789✔
1469

1470
  taosArrayDestroy(pInfo->pInterpCols);
5,757,037✔
1471
  pInfo->pInterpCols = NULL;
5,755,993✔
1472

1473
  taosArrayDestroyEx(pInfo->pPrevValues, freeItem);
5,756,541✔
1474
  pInfo->pPrevValues = NULL;
5,755,602✔
1475

1476
  cleanupGroupResInfo(&pInfo->groupResInfo);
5,755,850✔
1477
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
5,756,353✔
1478
  destroyBoundedQueue(pInfo->pBQ);
5,755,016✔
1479
  taosMemoryFreeClear(param);
5,754,806✔
1480
}
1481

1482
static int32_t initWindowInterpPrevVal(SIntervalAggOperatorInfo* pInfo) {
17,151✔
1483
  int32_t code = TSDB_CODE_SUCCESS;
17,151✔
1484
  int32_t lino = 0;
17,151✔
1485
  void*   tmp = NULL;
17,151✔
1486

1487
  pInfo->pInterpCols = taosArrayInit(4, sizeof(SColumn));
17,151✔
1488
  QUERY_CHECK_NULL(pInfo->pInterpCols, code, lino, _end, terrno);
17,151✔
1489

1490
  pInfo->pPrevValues = taosArrayInit(4, sizeof(SGroupKeys));
17,151✔
1491
  QUERY_CHECK_NULL(pInfo->pPrevValues, code, lino, _end, terrno);
17,151✔
1492

1493
  {  // ts column
1494
    SColumn c = {0};
17,151✔
1495
    c.colId = 1;
17,151✔
1496
    c.slotId = pInfo->primaryTsIndex;
17,151✔
1497
    c.type = TSDB_DATA_TYPE_TIMESTAMP;
17,151✔
1498
    c.bytes = sizeof(int64_t);
17,151✔
1499
    tmp = taosArrayPush(pInfo->pInterpCols, &c);
17,151✔
1500
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
17,151✔
1501

1502
    SGroupKeys key;
17,151✔
1503
    key.bytes = c.bytes;
17,151✔
1504
    key.type = c.type;
17,151✔
1505
    key.isNull = true;  // to denote no value is assigned yet
17,151✔
1506
    key.pData = taosMemoryCalloc(1, c.bytes);
17,151✔
1507
    QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
17,151✔
1508

1509
    tmp = taosArrayPush(pInfo->pPrevValues, &key);
17,151✔
1510
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
17,151✔
1511
  }
1512
_end:
17,151✔
1513
  if (code != TSDB_CODE_SUCCESS) {
17,151✔
1514
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1515
  }
1516
  return code;
17,151✔
1517
}
1518

1519
static int32_t timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SIntervalAggOperatorInfo* pInfo,
5,718,066✔
1520
                                      bool* pRes) {
1521
  // the primary timestamp column
1522
  bool    needed = false;
5,718,066✔
1523
  int32_t code = TSDB_CODE_SUCCESS;
5,718,066✔
1524
  int32_t lino = 0;
5,718,066✔
1525
  void*   tmp = NULL;
5,718,066✔
1526

1527
  for (int32_t i = 0; i < numOfCols; ++i) {
27,061,932✔
1528
    SExprInfo* pExpr = pCtx[i].pExpr;
21,366,265✔
1529
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
21,363,661✔
1530
      needed = true;
17,151✔
1531
      break;
17,151✔
1532
    }
1533
  }
1534

1535
  if (needed) {
5,712,818✔
1536
    code = initWindowInterpPrevVal(pInfo);
17,151✔
1537
    QUERY_CHECK_CODE(code, lino, _end);
17,151✔
1538
  }
1539

1540
  for (int32_t i = 0; i < numOfCols; ++i) {
27,073,672✔
1541
    SExprInfo* pExpr = pCtx[i].pExpr;
21,358,162✔
1542

1543
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
21,365,095✔
1544
      SFunctParam* pParam = &pExpr->base.pParam[0];
17,151✔
1545

1546
      SColumn c = *pParam->pCol;
17,151✔
1547
      tmp = taosArrayPush(pInfo->pInterpCols, &c);
17,151✔
1548
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
17,151✔
1549

1550
      SGroupKeys key = {0};
17,151✔
1551
      key.bytes = c.bytes;
17,151✔
1552
      key.type = c.type;
17,151✔
1553
      key.isNull = false;
17,151✔
1554
      key.pData = taosMemoryCalloc(1, c.bytes);
17,151✔
1555
      QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
17,151✔
1556

1557
      tmp = taosArrayPush(pInfo->pPrevValues, &key);
17,151✔
1558
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
17,151✔
1559
    }
1560
  }
1561

1562
_end:
5,715,510✔
1563
  if (code != TSDB_CODE_SUCCESS) {
5,716,959✔
1564
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1565
  }
1566
  *pRes = needed;
5,716,959✔
1567
  return code;
5,714,418✔
1568
}
1569

1570
static int32_t resetInterval(SOperatorInfo* pOper, SIntervalAggOperatorInfo* pIntervalInfo){
9,621✔
1571
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
9,621✔
1572
  SIntervalPhysiNode* pPhynode = (SIntervalPhysiNode*)pOper->pPhyNode;
9,621✔
1573
  pOper->status = OP_NOT_OPENED;
9,621✔
1574

1575
  resetBasicOperatorState(&pIntervalInfo->binfo);
9,621✔
1576
  cleanupResultInfo(pIntervalInfo->pOperator->pTaskInfo, &pIntervalInfo->pOperator->exprSupp, &pIntervalInfo->groupResInfo, &pIntervalInfo->aggSup,
9,621✔
1577
    pIntervalInfo->cleanGroupResInfo);
9,621✔
1578

1579
  colDataDestroy(&pIntervalInfo->twAggSup.timeWindowData);
9,621✔
1580
  int32_t code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pTaskInfo->window);
9,621✔
1581
  if (code == 0) {
9,621✔
1582
    code = resetAggSup(&pOper->exprSupp, &pIntervalInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
19,242✔
1583
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, NULL,
9,621✔
1584
                       &pTaskInfo->storageAPI.functionStore);
1585
  }
1586
  if (code == 0) {
9,621✔
1587
    code = resetExprSupp(&pIntervalInfo->scalarSupp, pTaskInfo, pPhynode->window.pExprs, NULL,
9,621✔
1588
                         &pTaskInfo->storageAPI.functionStore);
1589
  }
1590

1591
  if (pIntervalInfo->binfo.resultRowInfo.openWindow != NULL){
9,621✔
1592
    tdListEmpty(pIntervalInfo->binfo.resultRowInfo.openWindow);
×
1593
  }
1594

1595
  if (pPhynode->window.node.pSlimit && ((SLimitNode*)pPhynode->window.node.pSlimit)->limit) {
9,621✔
1596
    pIntervalInfo->curGroupId = UINT64_MAX;
×
1597
  }
1598

1599
  pIntervalInfo->cleanGroupResInfo = false;
9,621✔
1600
  pIntervalInfo->handledGroupNum = 0;
9,621✔
1601
  pIntervalInfo->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder;
9,621✔
1602
  pIntervalInfo->binfo.outputTsOrder = pPhynode->window.node.outputTsOrder;
9,621✔
1603

1604
  taosArrayDestroy(pIntervalInfo->pInterpCols);
9,621✔
1605
  pIntervalInfo->pInterpCols = NULL;
9,621✔
1606

1607
  if (pIntervalInfo->pPrevValues != NULL) {
9,621✔
1608
    taosArrayDestroyEx(pIntervalInfo->pPrevValues, freeItem);
×
1609
    pIntervalInfo->pPrevValues = NULL;
×
1610
    code = initWindowInterpPrevVal(pIntervalInfo);
×
1611
  }
1612

1613
  cleanupGroupResInfo(&pIntervalInfo->groupResInfo);
9,621✔
1614
  destroyBoundedQueue(pIntervalInfo->pBQ);
9,621✔
1615
  pIntervalInfo->pBQ = NULL;
9,621✔
1616
  return code;
9,621✔
1617
}
1618

1619
static int32_t resetIntervalOperState(SOperatorInfo* pOper) {
8,105✔
1620
  SIntervalAggOperatorInfo* pInfo = pOper->info;
8,105✔
1621
  return resetInterval(pOper, pInfo);
8,105✔
1622
}
1623

1624
int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
4,935,443✔
1625
                                   SOperatorInfo** pOptrInfo) {
1626
  QRY_PARAM_CHECK(pOptrInfo);
4,935,443✔
1627

1628
  int32_t                   code = TSDB_CODE_SUCCESS;
4,935,844✔
1629
  int32_t                   lino = 0;
4,935,844✔
1630
  SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
4,935,844✔
1631
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
4,927,809✔
1632
  if (pInfo == NULL || pOperator == NULL) {
4,932,497✔
UNCOV
1633
    code = terrno;
×
1634
    lino = __LINE__;
×
1635
    goto _error;
×
1636
  }
1637
  initOperatorCostInfo(pOperator);
4,932,497✔
1638

1639
  pOperator->pPhyNode = pPhyNode;
4,935,968✔
1640
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->window.node.pOutputDataBlockDesc);
4,936,652✔
1641
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
4,937,088✔
1642
  initBasicInfo(&pInfo->binfo, pResBlock);
4,937,088✔
1643

1644
  SExprSupp* pSup = &pOperator->exprSupp;
4,936,652✔
1645
  pSup->hasWindowOrGroup = true;
4,936,528✔
1646
  pSup->hasWindow = true;
4,935,844✔
1647

1648
  pInfo->primaryTsIndex = ((SColumnNode*)pPhyNode->window.pTspk)->slotId;
4,936,528✔
1649

1650
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
4,934,328✔
1651
  initResultSizeInfo(&pOperator->resultInfo, 512);
4,934,328✔
1652
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
4,936,528✔
1653
  QUERY_CHECK_CODE(code, lino, _error);
4,937,088✔
1654

1655
  int32_t    num = 0;
4,937,088✔
1656
  SExprInfo* pExprInfo = NULL;
4,937,212✔
1657
  code = createExprInfo(pPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
4,937,088✔
1658
  QUERY_CHECK_CODE(code, lino, _error);
4,936,105✔
1659

1660
  code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, NULL,
4,936,105✔
1661
                    &pTaskInfo->storageAPI.functionStore);
1662
  QUERY_CHECK_CODE(code, lino, _error);
4,936,964✔
1663

1664
  SInterval interval = {.interval = pPhyNode->interval,
14,804,719✔
1665
                        .sliding = pPhyNode->sliding,
4,936,280✔
1666
                        .intervalUnit = pPhyNode->intervalUnit,
4,937,088✔
1667
                        .slidingUnit = pPhyNode->slidingUnit,
4,936,528✔
1668
                        .offset = pPhyNode->offset,
4,933,943✔
1669
                        .precision = ((SColumnNode*)pPhyNode->window.pTspk)->node.resType.precision,
4,934,288✔
1670
                        .timeRange = pPhyNode->timeRange};
1671
  calcIntervalAutoOffset(&interval);
4,930,942✔
1672

1673
  STimeWindowAggSupp as = {
4,936,404✔
1674
      .maxTs = INT64_MIN,
1675
  };
1676

1677
  pInfo->win = pTaskInfo->window;
4,936,404✔
1678
  pInfo->binfo.inputTsOrder = pPhyNode->window.node.inputTsOrder;
4,932,374✔
1679
  pInfo->binfo.outputTsOrder = pPhyNode->window.node.outputTsOrder;
4,936,404✔
1680
  pInfo->interval = interval;
4,934,600✔
1681
  pInfo->twAggSup = as;
4,933,108✔
1682
  pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock;
4,934,600✔
1683
  if (pPhyNode->window.node.pLimit && ((SLimitNode*)pPhyNode->window.node.pLimit)->limit) {
4,929,517✔
1684
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pLimit;
1,120,318✔
1685
    pInfo->limited = true;
1,119,785✔
1686
    pInfo->limit = pLimit->limit->datum.i + (pLimit->offset ? pLimit->offset->datum.i : 0);
1,120,318✔
1687
  }
1688
  if (pPhyNode->window.node.pSlimit && ((SLimitNode*)pPhyNode->window.node.pSlimit)->limit) {
4,935,720✔
1689
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pSlimit;
109,211✔
1690
    pInfo->slimited = true;
109,211✔
1691
    pInfo->slimit = pLimit->limit->datum.i + (pLimit->offset ? pLimit->offset->datum.i : 0);
109,211✔
1692
    pInfo->curGroupId = UINT64_MAX;
109,211✔
1693
  }
1694

1695
  if (pPhyNode->window.pExprs != NULL) {
4,935,567✔
1696
    int32_t    numOfScalar = 0;
475,336✔
1697
    SExprInfo* pScalarExprInfo = NULL;
475,336✔
1698
    code = createExprInfo(pPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
473,150✔
1699
    QUERY_CHECK_CODE(code, lino, _error);
474,776✔
1700

1701
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
437,816✔
1702
    if (code != TSDB_CODE_SUCCESS) {
437,256✔
1703
      goto _error;
×
1704
    }
1705
  }
1706

1707
  code = filterInitFromNode((SNode*)pPhyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
4,897,363✔
1708
                            pTaskInfo->pStreamRuntimeInfo);
4,899,291✔
1709
  if (code != TSDB_CODE_SUCCESS) {
4,893,542✔
1710
    goto _error;
×
1711
  }
1712

1713
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
4,893,542✔
1714
  QUERY_CHECK_CODE(code, lino, _error);
4,894,258✔
1715

1716
  pInfo->timeWindowInterpo = false;
4,894,258✔
1717
  code = timeWindowinterpNeeded(pSup->pCtx, num, pInfo, &pInfo->timeWindowInterpo);
4,896,997✔
1718
  QUERY_CHECK_CODE(code, lino, _error);
4,892,819✔
1719
  if (pInfo->timeWindowInterpo) {
4,892,819✔
1720
    pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
17,151✔
1721
    if (pInfo->binfo.resultRowInfo.openWindow == NULL) {
17,151✔
1722
      goto _error;
×
1723
    }
1724
  }
1725

1726
  pInfo->pOperator = pOperator;
4,895,207✔
1727
  pInfo->cleanGroupResInfo = false;
4,899,405✔
1728
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
4,894,339✔
1729
  setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED,
4,894,391✔
1730
                  pInfo, pTaskInfo);
1731

1732
  pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResultNext, NULL, destroyIntervalOperatorInfo,
4,898,540✔
1733
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1734
  setOperatorResetStateFn(pOperator, resetIntervalOperState);
4,898,569✔
1735
  code = appendDownstream(pOperator, &downstream, 1);
4,894,926✔
1736
  if (code != TSDB_CODE_SUCCESS) {
4,898,714✔
1737
    goto _error;
×
1738
  }
1739

1740
  *pOptrInfo = pOperator;
4,898,714✔
1741
  return TSDB_CODE_SUCCESS;
4,898,714✔
1742

1743
_error:
36,400✔
1744
  if (pInfo != NULL) {
36,960✔
1745
    destroyIntervalOperatorInfo(pInfo);
36,960✔
1746
  }
1747

1748
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
36,960✔
1749
  pTaskInfo->code = code;
36,960✔
1750
  qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code));
36,960✔
1751
  return code;
36,960✔
1752
}
1753

1754
// todo handle multiple timeline cases. assume no timeline interweaving
1755
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) {
20,142,948✔
1756
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
20,142,948✔
1757
  SExprSupp*     pSup = &pOperator->exprSupp;
20,142,948✔
1758

1759
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
20,142,948✔
1760
  if (!pColInfoData) {
20,142,948✔
1761
    pTaskInfo->code = terrno;
×
1762
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1763
  }
1764

1765
  bool    masterScan = true;
20,142,948✔
1766
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
20,142,948✔
1767
  int64_t gid = pBlock->info.id.groupId;
20,142,948✔
1768

1769
  int64_t gap = pInfo->gap;
20,142,948✔
1770

1771
  if (!pInfo->reptScan) {
20,142,948✔
1772
    pInfo->reptScan = true;
533,501✔
1773
    pInfo->winSup.prevTs = INT64_MIN;
533,501✔
1774
  }
1775

1776
  SWindowRowsSup* pRowSup = &pInfo->winSup;
20,142,948✔
1777
  pRowSup->numOfRows = 0;
20,142,948✔
1778
  pRowSup->startRowIndex = 0;
20,142,948✔
1779

1780
  // In case of ascending or descending order scan data, only one time window needs to be kepted for each table.
1781
  TSKEY* tsList = (TSKEY*)pColInfoData->pData;
20,142,948✔
1782
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
2,147,483,647✔
1783
    if (gid != pRowSup->groupId || pInfo->winSup.prevTs == INT64_MIN) {
2,147,483,647✔
1784
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
5,361,098✔
1785
      doKeepTuple(pRowSup, tsList[j], j, gid);
5,361,098✔
1786
    } else if (((tsList[j] - pRowSup->prevTs >= 0) && (tsList[j] - pRowSup->prevTs <= gap)) ||
2,147,483,647✔
1787
               ((pRowSup->prevTs - tsList[j] >= 0) && (pRowSup->prevTs - tsList[j] <= gap))) {
1,319,414,988✔
1788
      // The gap is less than the threshold, so it belongs to current session window that has been opened already.
1789
      doKeepTuple(pRowSup, tsList[j], j, gid);
2,147,483,647✔
1790
    } else {  // start a new session window
1791
      // start a new session window
1792
      if (pRowSup->numOfRows > 0) {  // handled data that belongs to the previous session window
1,319,323,118✔
1793
        SResultRow* pResult = NULL;
1,315,686,487✔
1794

1795
        // keep the time window for the closed time window.
1796
        STimeWindow window = pRowSup->win;
1,315,686,487✔
1797

1798
        int32_t ret =
1799
            setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
1,315,686,487✔
1800
                                   numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1801
        if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
1,315,686,487✔
1802
          T_LONG_JMP(pTaskInfo->env, ret);
×
1803
        }
1804

1805
        // pInfo->numOfRows data belong to the current session window
1806
        updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
1,315,686,487✔
1807
        ret =
1808
            applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData,
1,315,686,487✔
1809
                                            pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
1,315,686,487✔
1810
        if (ret != TSDB_CODE_SUCCESS) {
1,315,686,487✔
1811
          T_LONG_JMP(pTaskInfo->env, ret);
×
1812
        }
1813
      }
1814

1815
      // here we start a new session window
1816
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
1,319,323,118✔
1817
      doKeepTuple(pRowSup, tsList[j], j, gid);
1,319,323,118✔
1818
    }
1819
  }
1820

1821
  SResultRow* pResult = NULL;
20,142,948✔
1822
  pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
20,142,948✔
1823
  int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
20,142,948✔
1824
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1825
  if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
20,142,948✔
1826
    T_LONG_JMP(pTaskInfo->env, ret);
×
1827
  }
1828

1829
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
20,142,948✔
1830
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
20,142,948✔
1831
                                        pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
20,142,948✔
1832
  if (ret != TSDB_CODE_SUCCESS) {
20,142,948✔
1833
    T_LONG_JMP(pTaskInfo->env, ret);
×
1834
  }
1835
}
20,142,948✔
1836

1837
static int32_t doSessionWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
2,951,725✔
1838
  if (pOperator->status == OP_EXEC_DONE) {
2,951,725✔
1839
    (*ppRes) = NULL;
397,019✔
1840
    return TSDB_CODE_SUCCESS;
397,019✔
1841
  }
1842

1843
  int32_t                  code = TSDB_CODE_SUCCESS;
2,554,706✔
1844
  int32_t                  lino = 0;
2,554,706✔
1845
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
2,554,706✔
1846
  SSessionAggOperatorInfo* pInfo = pOperator->info;
2,554,706✔
1847
  SOptrBasicInfo*          pBInfo = &pInfo->binfo;
2,554,706✔
1848
  SExprSupp*               pSup = &pOperator->exprSupp;
2,554,706✔
1849

1850
  if (pOperator->status == OP_RES_TO_RETURN) {
2,554,706✔
1851
    while (1) {
×
1852
      doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
1,981,466✔
1853
      code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
1,981,466✔
1854
      QUERY_CHECK_CODE(code, lino, _end);
1,981,466✔
1855

1856
      bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
1,981,466✔
1857
      if (!hasRemain) {
1,981,466✔
1858
        setOperatorCompleted(pOperator);
88,633✔
1859
        break;
88,633✔
1860
      }
1861

1862
      if (pBInfo->pRes->info.rows > 0) {
1,892,833✔
1863
        break;
1,892,833✔
1864
      }
1865
    }
1866
    (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
1,981,466✔
1867
    return code;
1,981,466✔
1868
  }
1869

1870
  int32_t order = pInfo->binfo.inputTsOrder;
573,240✔
1871

1872
  SOperatorInfo* downstream = pOperator->pDownstream[0];
573,240✔
1873

1874
  pInfo->cleanGroupResInfo = false;
573,240✔
1875
  while (1) {
20,142,948✔
1876
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
20,716,188✔
1877
    if (pBlock == NULL) {
20,716,188✔
1878
      break;
573,240✔
1879
    }
1880

1881
    pBInfo->pRes->info.scanFlag = pBlock->info.scanFlag;
20,142,948✔
1882
    pBInfo->pRes->info.dataLoad = 1;
20,142,948✔
1883
    if (pInfo->scalarSupp.pExprInfo != NULL) {
20,142,948✔
1884
      SExprSupp* pExprSup = &pInfo->scalarSupp;
1,425✔
1885
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
1,425✔
1886
                                   GET_STM_RTINFO(pOperator->pTaskInfo));
1,425✔
1887
      QUERY_CHECK_CODE(code, lino, _end);
1,425✔
1888
    }
1889
    // the pDataBlock are always the same one, no need to call this again
1890
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
20,142,948✔
1891
    QUERY_CHECK_CODE(code, lino, _end);
20,142,948✔
1892

1893
    code = blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
20,142,948✔
1894
    QUERY_CHECK_CODE(code, lino, _end);
20,142,948✔
1895

1896
    doSessionWindowAggImpl(pOperator, pInfo, pBlock);
20,142,948✔
1897
  }
1898

1899
  // restore the value
1900
  pOperator->status = OP_RES_TO_RETURN;
573,240✔
1901

1902
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
573,240✔
1903
  QUERY_CHECK_CODE(code, lino, _end);
573,240✔
1904
  pInfo->cleanGroupResInfo = true;
573,240✔
1905

1906
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
573,240✔
1907
  QUERY_CHECK_CODE(code, lino, _end);
573,240✔
1908
  while (1) {
×
1909
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
573,240✔
1910
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
573,240✔
1911
    QUERY_CHECK_CODE(code, lino, _end);
573,240✔
1912

1913
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
573,240✔
1914
    if (!hasRemain) {
573,240✔
1915
      setOperatorCompleted(pOperator);
439,403✔
1916
      break;
439,403✔
1917
    }
1918

1919
    if (pBInfo->pRes->info.rows > 0) {
133,837✔
1920
      break;
133,837✔
1921
    }
1922
  }
1923

1924
_end:
573,240✔
1925
  if (code != TSDB_CODE_SUCCESS) {
573,240✔
1926
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1927
    pTaskInfo->code = code;
×
1928
    T_LONG_JMP(pTaskInfo->env, code);
×
1929
  }
1930
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
573,240✔
1931
  return code;
573,240✔
1932
}
1933

1934
static int32_t resetStatewindowOperState(SOperatorInfo* pOper) {
57,832✔
1935
  SStateWindowOperatorInfo* pInfo = pOper->info;
57,832✔
1936
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
57,832✔
1937
  SStateWindowPhysiNode* pPhynode = (SStateWindowPhysiNode*)pOper->pPhyNode;
57,832✔
1938
  pOper->status = OP_NOT_OPENED;
57,832✔
1939

1940
  resetBasicOperatorState(&pInfo->binfo);
57,832✔
1941
  cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
57,832✔
1942
                    pInfo->cleanGroupResInfo);
57,832✔
1943

1944
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
57,832✔
1945
  int32_t code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
57,422✔
1946
  if (code == 0) {
57,832✔
1947
    code = resetAggSup(&pOper->exprSupp, &pInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
115,664✔
1948
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, NULL,
57,832✔
1949
                       &pTaskInfo->storageAPI.functionStore);
1950
  }
1951
  if (code == 0) {
57,832✔
1952
    code = resetExprSupp(&pInfo->scalarSup, pTaskInfo, pPhynode->window.pExprs, NULL,
57,832✔
1953
                         &pTaskInfo->storageAPI.functionStore);
1954
  }
1955

1956
  pInfo->cleanGroupResInfo = false;
57,832✔
1957
  pInfo->hasKey = false;
57,832✔
1958
  pInfo->winSup.lastTs = INT64_MIN;
57,832✔
1959
  cleanupGroupResInfo(&pInfo->groupResInfo);
57,832✔
1960
  memset(pInfo->stateKey.pData, 0, pInfo->stateKey.bytes);
57,832✔
1961
  return code;
57,832✔
1962
}
1963

1964
// todo make this as an non-blocking operator
1965
int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWindowPhysiNode* pStateNode,
1,072,126✔
1966
                                      SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
1967
  QRY_PARAM_CHECK(pOptrInfo);
1,072,126✔
1968

1969
  int32_t                   code = TSDB_CODE_SUCCESS;
1,072,126✔
1970
  int32_t                   lino = 0;
1,072,126✔
1971
  SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
1,072,126✔
1972
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1,072,126✔
1973
  if (pInfo == NULL || pOperator == NULL) {
1,072,126✔
UNCOV
1974
    code = terrno;
×
1975
    goto _error;
×
1976
  }
1977
  initOperatorCostInfo(pOperator);
1,072,126✔
1978

1979
  pOperator->pPhyNode = pStateNode;
1,072,126✔
1980
  pOperator->exprSupp.hasWindowOrGroup = true;
1,072,126✔
1981
  pOperator->exprSupp.hasWindow = true;
1,072,126✔
1982
  int32_t      tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
1,072,126✔
1983
  SColumnNode* pColNode = (SColumnNode*)(pStateNode->pStateKey);
1,072,126✔
1984

1985
  if (pStateNode->window.pExprs != NULL) {
1,072,126✔
1986
    int32_t    numOfScalarExpr = 0;
298,056✔
1987
    SExprInfo* pScalarExprInfo = NULL;
298,056✔
1988
    code = createExprInfo(pStateNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
298,056✔
1989
    QUERY_CHECK_CODE(code, lino, _error);
298,056✔
1990

1991
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
298,056✔
1992
    if (code != TSDB_CODE_SUCCESS) {
298,056✔
1993
      goto _error;
×
1994
    }
1995
  }
1996

1997
  pInfo->stateCol = extractColumnFromColumnNode(pColNode);
1,072,126✔
1998
  pInfo->stateKey.type = pInfo->stateCol.type;
1,072,126✔
1999
  pInfo->stateKey.bytes = pInfo->stateCol.bytes;
1,072,126✔
2000
  pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes);
1,072,126✔
2001
  if (pInfo->stateKey.pData == NULL) {
1,072,126✔
2002
    goto _error;
×
2003
  }
2004
  pInfo->binfo.inputTsOrder = pStateNode->window.node.inputTsOrder;
1,072,126✔
2005
  pInfo->binfo.outputTsOrder = pStateNode->window.node.outputTsOrder;
1,072,126✔
2006

2007
  code = filterInitFromNode((SNode*)pStateNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
1,072,126✔
2008
                            pTaskInfo->pStreamRuntimeInfo);
1,072,126✔
2009
  if (code != TSDB_CODE_SUCCESS) {
1,072,126✔
2010
    goto _error;
×
2011
  }
2012

2013
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
1,072,126✔
2014

2015
  int32_t    num = 0;
1,072,126✔
2016
  SExprInfo* pExprInfo = NULL;
1,072,126✔
2017
  code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &num);
1,072,126✔
2018
  QUERY_CHECK_CODE(code, lino, _error);
1,072,126✔
2019

2020
  initResultSizeInfo(&pOperator->resultInfo, 4096);
1,072,126✔
2021

2022
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
1,072,126✔
2023
                    NULL, &pTaskInfo->storageAPI.functionStore);
2024
  if (code != TSDB_CODE_SUCCESS) {
1,072,126✔
2025
    goto _error;
×
2026
  }
2027

2028
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pStateNode->window.node.pOutputDataBlockDesc);
1,072,126✔
2029
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
1,072,126✔
2030
  initBasicInfo(&pInfo->binfo, pResBlock);
1,072,126✔
2031
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
1,072,126✔
2032

2033
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
1,072,126✔
2034
  QUERY_CHECK_CODE(code, lino, _error);
1,072,126✔
2035

2036
  pInfo->tsSlotId = tsSlotId;
1,072,126✔
2037
  pInfo->pOperator = pOperator;
1,072,126✔
2038
  pInfo->cleanGroupResInfo = false;
1,072,126✔
2039
  pInfo->extendOption = pStateNode->extendOption;
1,072,126✔
2040
  pInfo->trueForInfo.trueForType = pStateNode->trueForType;
1,072,126✔
2041
  pInfo->trueForInfo.count = pStateNode->trueForCount;
1,072,126✔
2042
  pInfo->trueForInfo.duration = pStateNode->trueForDuration;
1,072,126✔
2043
  pInfo->winSup.lastTs = INT64_MIN;
1,072,126✔
2044

2045
  setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
1,072,126✔
2046
                  pTaskInfo);
2047
  pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAggNext, NULL, destroyStateWindowOperatorInfo,
1,072,126✔
2048
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2049
  setOperatorResetStateFn(pOperator, resetStatewindowOperState);
1,072,126✔
2050

2051
  code = appendDownstream(pOperator, &downstream, 1);
1,072,126✔
2052
  if (code != TSDB_CODE_SUCCESS) {
1,072,126✔
2053
    goto _error;
×
2054
  }
2055

2056
  *pOptrInfo = pOperator;
1,072,126✔
2057
  return TSDB_CODE_SUCCESS;
1,072,126✔
2058

2059
_error:
×
2060
  if (pInfo != NULL) {
×
2061
    destroyStateWindowOperatorInfo(pInfo);
×
2062
  }
2063

2064
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2065
  pTaskInfo->code = code;
×
2066
  return code;
×
2067
}
2068

2069
void destroySWindowOperatorInfo(void* param) {
587,252✔
2070
  SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param;
587,252✔
2071
  if (pInfo == NULL) {
587,252✔
2072
    return;
×
2073
  }
2074

2075
  cleanupBasicInfo(&pInfo->binfo);
587,252✔
2076
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
587,252✔
2077
  if (pInfo->pOperator) {
587,252✔
2078
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
587,252✔
2079
                      pInfo->cleanGroupResInfo);
587,252✔
2080
    pInfo->pOperator = NULL;
587,252✔
2081
  }
2082

2083
  cleanupAggSup(&pInfo->aggSup);
587,252✔
2084
  cleanupExprSupp(&pInfo->scalarSupp);
587,252✔
2085

2086
  cleanupGroupResInfo(&pInfo->groupResInfo);
587,252✔
2087
  taosMemoryFreeClear(param);
587,252✔
2088
}
2089

2090
static int32_t resetSessionAggOperState(SOperatorInfo* pOper) {
1,662✔
2091
  SSessionAggOperatorInfo* pInfo = pOper->info;
1,662✔
2092
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
1,662✔
2093
  SSessionWinodwPhysiNode* pPhynode = (SSessionWinodwPhysiNode*)pOper->pPhyNode;
1,662✔
2094
  pOper->status = OP_NOT_OPENED;
1,662✔
2095

2096
  resetBasicOperatorState(&pInfo->binfo);
1,662✔
2097
  cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
1,662✔
2098
                    pInfo->cleanGroupResInfo);
1,662✔
2099

2100
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
1,662✔
2101
  int32_t code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
1,662✔
2102
  if (code == 0) {
1,662✔
2103
    code = resetAggSup(&pOper->exprSupp, &pInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
3,324✔
2104
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, NULL,
1,662✔
2105
                       &pTaskInfo->storageAPI.functionStore);
2106
  }
2107
  if (code == 0) {
1,662✔
2108
    code = resetExprSupp(&pInfo->scalarSupp, pTaskInfo, pPhynode->window.pExprs, NULL,
1,662✔
2109
                         &pTaskInfo->storageAPI.functionStore);
2110
  }
2111

2112
  pInfo->cleanGroupResInfo = false;
1,662✔
2113
  pInfo->winSup = (SWindowRowsSup){0};
1,662✔
2114
  pInfo->winSup.prevTs = INT64_MIN;
1,662✔
2115
  pInfo->reptScan = false;
1,662✔
2116

2117
  cleanupGroupResInfo(&pInfo->groupResInfo);
1,662✔
2118
  return code;
1,662✔
2119
}
2120

2121
int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
587,252✔
2122
                                     SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2123
  QRY_PARAM_CHECK(pOptrInfo);
587,252✔
2124

2125
  int32_t                  code = TSDB_CODE_SUCCESS;
587,252✔
2126
  int32_t                  lino = 0;
587,252✔
2127
  SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
587,252✔
2128
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
587,252✔
2129
  if (pInfo == NULL || pOperator == NULL) {
587,252✔
2130
    code = terrno;
×
2131
    goto _error;
×
2132
  }
2133
  initOperatorCostInfo(pOperator);
587,252✔
2134

2135
  pOperator->pPhyNode = pSessionNode;
587,252✔
2136
  pOperator->exprSupp.hasWindowOrGroup = true;
587,252✔
2137
  pOperator->exprSupp.hasWindow = true;
587,252✔
2138

2139
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
587,252✔
2140
  initResultSizeInfo(&pOperator->resultInfo, 4096);
587,252✔
2141

2142
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc);
587,252✔
2143
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
587,252✔
2144
  initBasicInfo(&pInfo->binfo, pResBlock);
587,252✔
2145

2146
  int32_t    numOfCols = 0;
587,252✔
2147
  SExprInfo* pExprInfo = NULL;
587,252✔
2148
  code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
587,252✔
2149
  QUERY_CHECK_CODE(code, lino, _error);
587,252✔
2150

2151
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
587,252✔
2152
                    NULL, &pTaskInfo->storageAPI.functionStore);
2153
  QUERY_CHECK_CODE(code, lino, _error);
587,252✔
2154

2155
  pInfo->gap = pSessionNode->gap;
587,252✔
2156

2157
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
587,252✔
2158
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
587,252✔
2159
  QUERY_CHECK_CODE(code, lino, _error);
587,252✔
2160

2161
  pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
587,252✔
2162
  pInfo->binfo.pRes = pResBlock;
587,252✔
2163
  pInfo->winSup.prevTs = INT64_MIN;
587,252✔
2164
  pInfo->reptScan = false;
587,252✔
2165
  pInfo->binfo.inputTsOrder = pSessionNode->window.node.inputTsOrder;
587,252✔
2166
  pInfo->binfo.outputTsOrder = pSessionNode->window.node.outputTsOrder;
587,252✔
2167

2168
  if (pSessionNode->window.pExprs != NULL) {
586,841✔
2169
    int32_t    numOfScalar = 0;
475✔
2170
    SExprInfo* pScalarExprInfo = NULL;
475✔
2171
    code = createExprInfo(pSessionNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
475✔
2172
    QUERY_CHECK_CODE(code, lino, _error);
475✔
2173

2174
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
475✔
2175
    QUERY_CHECK_CODE(code, lino, _error);
475✔
2176
  }
2177

2178
  code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
586,841✔
2179
                            pTaskInfo->pStreamRuntimeInfo);
587,252✔
2180
  QUERY_CHECK_CODE(code, lino, _error);
586,841✔
2181

2182
  pInfo->pOperator = pOperator;
586,841✔
2183
  pInfo->cleanGroupResInfo = false;
587,252✔
2184
  setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED,
587,252✔
2185
                  pInfo, pTaskInfo);
2186
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSessionWindowAggNext, NULL, destroySWindowOperatorInfo,
587,252✔
2187
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2188
  pOperator->pTaskInfo = pTaskInfo;
587,252✔
2189
  setOperatorResetStateFn(pOperator, resetSessionAggOperState);
587,252✔
2190

2191
  code = appendDownstream(pOperator, &downstream, 1);
586,841✔
2192
  QUERY_CHECK_CODE(code, lino, _error);
587,252✔
2193

2194
  *pOptrInfo = pOperator;
587,252✔
2195
  return TSDB_CODE_SUCCESS;
587,252✔
2196

2197
_error:
×
2198
  if (pInfo != NULL) destroySWindowOperatorInfo(pInfo);
×
2199
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2200
  pTaskInfo->code = code;
×
2201
  return code;
×
2202
}
2203

2204
void destroyMAIOperatorInfo(void* param) {
819,825✔
2205
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = (SMergeAlignedIntervalAggOperatorInfo*)param;
819,825✔
2206
  destroyIntervalOperatorInfo(miaInfo->intervalAggOperatorInfo);
819,825✔
2207
  taosMemoryFreeClear(param);
819,825✔
2208
}
819,825✔
2209

2210
static SResultRow* doSetSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, SAggSupporter* pSup) {
719,052✔
2211
  SResultRow* pResult = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
719,052✔
2212
  if (NULL == pResult) {
719,052✔
2213
    return pResult;
×
2214
  }
2215
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
719,052✔
2216
  return pResult;
719,052✔
2217
}
2218

2219
static int32_t setSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult,
943,410,651✔
2220
                                       SExprSupp* pExprSup, SAggSupporter* pAggSup) {
2221
  if (*pResult == NULL) {
943,410,651✔
2222
    *pResult = doSetSingleOutputTupleBuf(pResultRowInfo, pAggSup);
719,052✔
2223
    if (*pResult == NULL) {
719,052✔
2224
      return terrno;
×
2225
    }
2226
  }
2227

2228
  // set time window for current result
2229
  (*pResult)->win = (*win);
943,411,902✔
2230
  return setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
943,416,906✔
2231
}
2232

2233
static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,
3,885,766✔
2234
                                          SSDataBlock* pBlock, SSDataBlock* pResultBlock) {
2235
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
3,885,766✔
2236
  SIntervalAggOperatorInfo*             iaInfo = miaInfo->intervalAggOperatorInfo;
3,885,766✔
2237

2238
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
3,885,766✔
2239
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
3,885,766✔
2240
  SInterval*     pInterval = &iaInfo->interval;
3,885,766✔
2241

2242
  int32_t  startPos = 0;
3,885,766✔
2243
  int64_t* tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
3,885,766✔
2244

2245
  TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
3,885,766✔
2246

2247
  // there is an result exists
2248
  if (miaInfo->curTs != INT64_MIN) {
3,885,766✔
2249
    if (ts != miaInfo->curTs) {
1,308,109✔
2250
      finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
1,241,218✔
2251
      resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
1,241,218✔
2252
      miaInfo->curTs = ts;
1,241,218✔
2253
    }
2254
  } else {
2255
    miaInfo->curTs = ts;
2,577,657✔
2256
  }
2257

2258
  STimeWindow win = {0};
3,885,766✔
2259
  win.skey = miaInfo->curTs;
3,885,766✔
2260
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
3,885,766✔
2261

2262
  int32_t ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
3,885,766✔
2263
  if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
3,885,766✔
2264
    T_LONG_JMP(pTaskInfo->env, ret);
×
2265
  }
2266

2267
  int32_t currPos = startPos;
3,885,766✔
2268

2269
  STimeWindow currWin = win;
3,885,766✔
2270
  while (++currPos < pBlock->info.rows) {
2,009,800,214✔
2271
    if (tsCols[currPos] == miaInfo->curTs) {
2,005,868,578✔
2272
      continue;
1,066,393,316✔
2273
    }
2274

2275
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
939,515,294✔
2276
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
1,878,914,245✔
2277
                                          currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
939,528,638✔
2278
    if (ret != TSDB_CODE_SUCCESS) {
939,515,294✔
2279
      T_LONG_JMP(pTaskInfo->env, ret);
×
2280
    }
2281

2282
    finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
939,515,294✔
2283
    resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
939,524,885✔
2284
    miaInfo->curTs = tsCols[currPos];
939,531,974✔
2285

2286
    currWin.skey = miaInfo->curTs;
939,531,974✔
2287
    currWin.ekey =
939,528,638✔
2288
        taosTimeAdd(currWin.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
939,531,974✔
2289

2290
    startPos = currPos;
939,528,638✔
2291
    ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
939,528,638✔
2292
    if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
939,521,549✔
2293
      T_LONG_JMP(pTaskInfo->env, ret);
×
2294
    }
2295

2296
    miaInfo->curTs = currWin.skey;
939,521,132✔
2297
  }
2298

2299
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
3,885,766✔
2300
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
7,771,532✔
2301
                                        currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
3,885,766✔
2302
  if (ret != TSDB_CODE_SUCCESS) {
3,885,766✔
2303
    T_LONG_JMP(pTaskInfo->env, ret);
×
2304
  }
2305
}
3,885,766✔
2306

2307
static void cleanupAfterGroupResultGen(SMergeAlignedIntervalAggOperatorInfo* pMiaInfo, SSDataBlock* pRes) {
2,568,204✔
2308
  pRes->info.id.groupId = pMiaInfo->groupId;
2,568,204✔
2309
  pMiaInfo->curTs = INT64_MIN;
2,568,204✔
2310
  pMiaInfo->groupId = 0;
2,568,204✔
2311
}
2,568,204✔
2312

2313
static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
3,518,523✔
2314
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
3,518,523✔
2315
  int32_t                               code = TSDB_CODE_SUCCESS;
3,518,523✔
2316
  int32_t                               lino = 0;
3,518,523✔
2317
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
3,518,523✔
2318
  SIntervalAggOperatorInfo*             pIaInfo = pMiaInfo->intervalAggOperatorInfo;
3,518,523✔
2319

2320
  SExprSupp*      pSup = &pOperator->exprSupp;
3,518,523✔
2321
  SSDataBlock*    pRes = pIaInfo->binfo.pRes;
3,518,523✔
2322
  SResultRowInfo* pResultRowInfo = &pIaInfo->binfo.resultRowInfo;
3,518,523✔
2323
  SOperatorInfo*  downstream = pOperator->pDownstream[0];
3,518,523✔
2324

2325
  while (1) {
3,032,877✔
2326
    SSDataBlock* pBlock = NULL;
6,551,400✔
2327
    if (pMiaInfo->prefetchedBlock == NULL) {
6,551,400✔
2328
      pBlock = getNextBlockFromDownstreamRemainDetach(pOperator, 0);
4,692,795✔
2329
    } else {
2330
      pBlock = pMiaInfo->prefetchedBlock;
1,858,605✔
2331
      pMiaInfo->prefetchedBlock = NULL;
1,858,605✔
2332

2333
      pMiaInfo->groupId = pBlock->info.id.groupId;
1,858,605✔
2334
    }
2335

2336
    // no data exists, all query processing is done
2337
    if (pBlock == NULL) {
6,550,780✔
2338
      // close last unclosed time window
2339
      if (pMiaInfo->curTs != INT64_MIN) {
806,409✔
2340
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
709,599✔
2341
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
709,599✔
2342
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
709,599✔
2343
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
709,599✔
2344
        QUERY_CHECK_CODE(code, lino, _end);
709,599✔
2345
      }
2346

2347
      setOperatorCompleted(pOperator);
806,409✔
2348
      break;
806,409✔
2349
    }
2350

2351
    if (pMiaInfo->groupId == 0) {
5,744,371✔
2352
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
1,403,733✔
2353
        pMiaInfo->groupId = pBlock->info.id.groupId;
166,041✔
2354
        pRes->info.id.groupId = pMiaInfo->groupId;
166,041✔
2355
      }
2356
    } else {
2357
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
4,340,638✔
2358
        // if there are unclosed time window, close it firstly.
2359
        if (pMiaInfo->curTs == INT64_MIN) {
1,858,605✔
2360
          pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2361
          T_LONG_JMP(pTaskInfo->env, terrno);
×
2362
        }
2363
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
1,858,605✔
2364
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
1,858,605✔
2365

2366
        pMiaInfo->prefetchedBlock = pBlock;
1,858,605✔
2367
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
1,858,605✔
2368
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
1,858,605✔
2369
        QUERY_CHECK_CODE(code, lino, _end);
1,858,605✔
2370
        if (pRes->info.rows == 0) {
1,858,605✔
2371
          // After filtering for last group, the result is empty, so we need to continue to process next group
2372
          continue;
14,178✔
2373
        } else {
2374
          break;
1,844,427✔
2375
        }
2376
      } else {
2377
        // continue
2378
        pRes->info.id.groupId = pMiaInfo->groupId;
2,482,033✔
2379
      }
2380
    }
2381

2382
    pRes->info.scanFlag = pBlock->info.scanFlag;
3,885,766✔
2383
    code = setInputDataBlock(pSup, pBlock, pIaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
3,885,766✔
2384
    QUERY_CHECK_CODE(code, lino, _end);
3,885,766✔
2385

2386
    doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes);
3,885,766✔
2387

2388
    code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
3,885,766✔
2389
    QUERY_CHECK_CODE(code, lino, _end);
3,885,766✔
2390

2391
    if (pRes->info.rows >= pOperator->resultInfo.capacity) {
3,885,766✔
2392
      break;
867,067✔
2393
    }
2394
  }
2395

2396
_end:
3,517,903✔
2397
  if (code != TSDB_CODE_SUCCESS) {
3,517,903✔
2398
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2399
    pTaskInfo->code = code;
×
2400
    T_LONG_JMP(pTaskInfo->env, code);
×
2401
  }
2402
}
3,517,903✔
2403

2404
static int32_t mergeAlignedIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
3,797,317✔
2405
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
3,797,317✔
2406
  int32_t                               code = TSDB_CODE_SUCCESS;
3,797,317✔
2407
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
3,797,317✔
2408
  SIntervalAggOperatorInfo*             iaInfo = pMiaInfo->intervalAggOperatorInfo;
3,797,317✔
2409
  if (pOperator->status == OP_EXEC_DONE) {
3,797,317✔
2410
    (*ppRes) = NULL;
762,871✔
2411
    return code;
762,871✔
2412
  }
2413

2414
  SSDataBlock* pRes = iaInfo->binfo.pRes;
3,034,446✔
2415
  blockDataCleanup(pRes);
3,034,446✔
2416

2417
  if (iaInfo->binfo.mergeResultBlock) {
3,034,446✔
2418
    while (1) {
2419
      if (pOperator->status == OP_EXEC_DONE) {
2,990,075✔
2420
        break;
392,528✔
2421
      }
2422

2423
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
2,597,547✔
2424
        break;
860,161✔
2425
      }
2426

2427
      doMergeAlignedIntervalAgg(pOperator);
1,737,386✔
2428
    }
2429
  } else {
2430
    doMergeAlignedIntervalAgg(pOperator);
1,781,137✔
2431
  }
2432

2433
  (*ppRes) = (pRes->info.rows == 0) ? NULL : pRes;
3,033,826✔
2434
  return code;
3,033,826✔
2435
}
2436

2437
static int32_t resetMergeAlignedIntervalOperState(SOperatorInfo* pOper) {
1,516✔
2438
  SMergeAlignedIntervalAggOperatorInfo* pInfo = pOper->info;
1,516✔
2439
  
2440
  uint64_t     groupId;  // current groupId
2441
  int64_t      curTs;    // current ts
2442
  SSDataBlock* prefetchedBlock;
2443
  SResultRow*  pResultRow;
2444

2445
  pInfo->groupId = 0;
1,516✔
2446
  pInfo->curTs = INT64_MIN;
1,516✔
2447
  pInfo->prefetchedBlock = NULL;
1,516✔
2448
  pInfo->pResultRow = NULL;
1,516✔
2449

2450
  return resetInterval(pOper, pInfo->intervalAggOperatorInfo);
1,516✔
2451
}
2452

2453
int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
819,825✔
2454
                                               SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2455
  QRY_PARAM_CHECK(pOptrInfo);
819,825✔
2456

2457
  int32_t                               code = TSDB_CODE_SUCCESS;
819,825✔
2458
  int32_t                               lino = 0;
819,825✔
2459
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedIntervalAggOperatorInfo));
819,825✔
2460
  SOperatorInfo*                        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
819,825✔
2461
  if (miaInfo == NULL || pOperator == NULL) {
819,825✔
2462
    code = terrno;
×
2463
    goto _error;
×
2464
  }
2465
  initOperatorCostInfo(pOperator);
819,825✔
2466

2467
  pOperator->pPhyNode = pNode;
819,825✔
2468
  miaInfo->intervalAggOperatorInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
819,825✔
2469
  if (miaInfo->intervalAggOperatorInfo == NULL) {
819,825✔
2470
    code = terrno;
×
2471
    goto _error;
×
2472
  }
2473

2474
  SInterval interval = {.interval = pNode->interval,
2,459,475✔
2475
                        .sliding = pNode->sliding,
819,825✔
2476
                        .intervalUnit = pNode->intervalUnit,
819,825✔
2477
                        .slidingUnit = pNode->slidingUnit,
819,825✔
2478
                        .offset = pNode->offset,
819,825✔
2479
                        .precision = ((SColumnNode*)pNode->window.pTspk)->node.resType.precision,
819,825✔
2480
                        .timeRange = pNode->timeRange};
2481
  calcIntervalAutoOffset(&interval);
819,825✔
2482

2483
  SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
819,825✔
2484
  SExprSupp*                pSup = &pOperator->exprSupp;
819,825✔
2485
  pSup->hasWindowOrGroup = true;
819,825✔
2486
  pSup->hasWindow = true;
819,825✔
2487

2488
  code = filterInitFromNode((SNode*)pNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
819,825✔
2489
                            pTaskInfo->pStreamRuntimeInfo);
819,825✔
2490
  QUERY_CHECK_CODE(code, lino, _error);
819,825✔
2491

2492
  miaInfo->curTs = INT64_MIN;
819,825✔
2493
  iaInfo->win = pTaskInfo->window;
819,825✔
2494
  iaInfo->binfo.inputTsOrder = pNode->window.node.inputTsOrder;
819,825✔
2495
  iaInfo->binfo.outputTsOrder = pNode->window.node.outputTsOrder;
819,825✔
2496
  iaInfo->interval = interval;
819,825✔
2497
  iaInfo->primaryTsIndex = ((SColumnNode*)pNode->window.pTspk)->slotId;
819,825✔
2498
  iaInfo->binfo.mergeResultBlock = pNode->window.mergeDataBlock;
819,825✔
2499

2500
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
819,825✔
2501
  initResultSizeInfo(&pOperator->resultInfo, 512);
819,825✔
2502

2503
  int32_t    num = 0;
819,825✔
2504
  SExprInfo* pExprInfo = NULL;
819,825✔
2505
  code = createExprInfo(pNode->window.pFuncs, NULL, &pExprInfo, &num);
819,825✔
2506
  QUERY_CHECK_CODE(code, lino, _error);
819,825✔
2507

2508
  code = initAggSup(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
819,825✔
2509
                    NULL, &pTaskInfo->storageAPI.functionStore);
2510
  QUERY_CHECK_CODE(code, lino, _error);
819,825✔
2511

2512
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pNode->window.node.pOutputDataBlockDesc);
819,825✔
2513
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
819,825✔
2514
  initBasicInfo(&iaInfo->binfo, pResBlock);
819,825✔
2515
  code = initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
819,825✔
2516
  QUERY_CHECK_CODE(code, lino, _error);
819,825✔
2517

2518
  iaInfo->timeWindowInterpo = false;
819,825✔
2519
  code = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo, &iaInfo->timeWindowInterpo);
819,825✔
2520
  QUERY_CHECK_CODE(code, lino, _error);
819,825✔
2521
  if (iaInfo->timeWindowInterpo) {
819,825✔
2522
    iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
2523
  }
2524

2525
  initResultRowInfo(&iaInfo->binfo.resultRowInfo);
819,825✔
2526
  code = blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity);
819,825✔
2527
  QUERY_CHECK_CODE(code, lino, _error);
819,825✔
2528
  iaInfo->pOperator = pOperator;
819,825✔
2529
  setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL,
819,825✔
2530
                  false, OP_NOT_OPENED, miaInfo, pTaskInfo);
2531

2532
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAggNext, NULL, destroyMAIOperatorInfo,
819,825✔
2533
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2534
  setOperatorResetStateFn(pOperator, resetMergeAlignedIntervalOperState);
819,825✔
2535

2536
  code = appendDownstream(pOperator, &downstream, 1);
819,825✔
2537
  QUERY_CHECK_CODE(code, lino, _error);
819,825✔
2538

2539
  *pOptrInfo = pOperator;
819,825✔
2540
  return TSDB_CODE_SUCCESS;
819,825✔
2541

2542
_error:
×
2543
  if (miaInfo != NULL) destroyMAIOperatorInfo(miaInfo);
×
2544
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2545
  pTaskInfo->code = code;
×
2546
  return code;
×
2547
}
2548

2549
//=====================================================================================================================
2550
// merge interval operator
2551
typedef struct SMergeIntervalAggOperatorInfo {
2552
  SIntervalAggOperatorInfo intervalAggOperatorInfo;
2553
  SList*                   groupIntervals;
2554
  SListIter                groupIntervalsIter;
2555
  bool                     hasGroupId;
2556
  uint64_t                 groupId;
2557
  SSDataBlock*             prefetchedBlock;
2558
  bool                     inputBlocksFinished;
2559
} SMergeIntervalAggOperatorInfo;
2560

2561
typedef struct SGroupTimeWindow {
2562
  uint64_t    groupId;
2563
  STimeWindow window;
2564
} SGroupTimeWindow;
2565

2566
void destroyMergeIntervalOperatorInfo(void* param) {
×
2567
  SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
×
2568
  miaInfo->groupIntervals = tdListFree(miaInfo->groupIntervals);
×
2569
  destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo);
×
2570

2571
  taosMemoryFreeClear(param);
×
2572
}
×
2573

2574
static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock,
×
2575
                                        STimeWindow* newWin) {
2576
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
2577
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2578
  bool                           ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
2579

2580
  SGroupTimeWindow groupTimeWindow = {.groupId = tableGroupId, .window = *newWin};
×
2581
  int32_t          code = tdListAppend(miaInfo->groupIntervals, &groupTimeWindow);
×
2582
  if (code != TSDB_CODE_SUCCESS) {
×
2583
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2584
    return code;
×
2585
  }
2586

2587
  SListIter iter = {0};
×
2588
  tdListInitIter(miaInfo->groupIntervals, &iter, TD_LIST_FORWARD);
×
2589
  SListNode* listNode = NULL;
×
2590
  while ((listNode = tdListNext(&iter)) != NULL) {
×
2591
    SGroupTimeWindow* prevGrpWin = (SGroupTimeWindow*)listNode->data;
×
2592
    if (prevGrpWin->groupId != tableGroupId) {
×
2593
      continue;
×
2594
    }
2595

2596
    STimeWindow* prevWin = &prevGrpWin->window;
×
2597
    if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) {
×
2598
      SListNode* tmp = tdListPopNode(miaInfo->groupIntervals, listNode);
×
2599
      taosMemoryFreeClear(tmp);
×
2600
    }
2601
  }
2602

2603
  return TSDB_CODE_SUCCESS;
×
2604
}
2605

2606
static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
×
2607
                                   int32_t scanFlag, SSDataBlock* pResultBlock) {
2608
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
2609
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2610

2611
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
×
2612
  SExprSupp*     pExprSup = &pOperatorInfo->exprSupp;
×
2613

2614
  int32_t     startPos = 0;
×
2615
  int32_t     numOfOutput = pExprSup->numOfExprs;
×
2616
  int64_t*    tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
×
2617
  uint64_t    tableGroupId = pBlock->info.id.groupId;
×
2618
  bool        ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
2619
  TSKEY       blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
×
2620
  SResultRow* pResult = NULL;
×
2621

2622
  STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
×
2623
                                        iaInfo->binfo.inputTsOrder);
2624

2625
  int32_t ret =
2626
      setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
2627
                             numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2628
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
×
2629
    T_LONG_JMP(pTaskInfo->env, ret);
×
2630
  }
2631

2632
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
×
2633
  int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
2634
                                                 iaInfo->binfo.inputTsOrder);
2635
  if (forwardRows <= 0) {
×
2636
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2637
  }
2638

2639
  // prev time window not interpolation yet.
2640
  if (iaInfo->timeWindowInterpo) {
×
2641
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
×
2642
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
×
2643

2644
    // restore current time window
2645
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
2646
                                 numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2647
    if (ret != TSDB_CODE_SUCCESS) {
×
2648
      T_LONG_JMP(pTaskInfo->env, ret);
×
2649
    }
2650

2651
    // window start key interpolation
2652
    ret = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &win, startPos, forwardRows, pExprSup);
×
2653
    if (ret != TSDB_CODE_SUCCESS) {
×
2654
      T_LONG_JMP(pTaskInfo->env, ret);
×
2655
    }
2656
  }
2657

2658
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, 1);
×
2659
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
×
2660
                                        forwardRows, pBlock->info.rows, numOfOutput);
×
2661
  if (ret != TSDB_CODE_SUCCESS) {
×
2662
    T_LONG_JMP(pTaskInfo->env, ret);
×
2663
  }
2664
  doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
2665

2666
  // output previous interval results after this interval (&win) is closed
2667
  int32_t code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &win);
×
2668
  if (code != TSDB_CODE_SUCCESS) {
×
2669
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2670
    T_LONG_JMP(pTaskInfo->env, code);
×
2671
  }
2672

2673
  STimeWindow nextWin = win;
×
2674
  while (1) {
×
2675
    int32_t prevEndPos = forwardRows - 1 + startPos;
×
2676
    startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos,
×
2677
                                      iaInfo->binfo.inputTsOrder);
2678
    if (startPos < 0) {
×
2679
      break;
×
2680
    }
2681

2682
    // null data, failed to allocate more memory buffer
2683
    code =
2684
        setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
×
2685
                               pExprSup->pCtx, numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2686
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
×
2687
      T_LONG_JMP(pTaskInfo->env, code);
×
2688
    }
2689

2690
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
×
2691
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
2692
                                           iaInfo->binfo.inputTsOrder);
2693

2694
    // window start(end) key interpolation
2695
    code = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup);
×
2696
    if (code != TSDB_CODE_SUCCESS) {
×
2697
      T_LONG_JMP(pTaskInfo->env, code);
×
2698
    }
2699

2700
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, 1);
×
2701
    code = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
×
2702
                                           forwardRows, pBlock->info.rows, numOfOutput);
×
2703
    if (code != TSDB_CODE_SUCCESS) {
×
2704
      T_LONG_JMP(pTaskInfo->env, code);
×
2705
    }
2706
    doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
2707

2708
    // output previous interval results after this interval (&nextWin) is closed
2709
    code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &nextWin);
×
2710
    if (code != TSDB_CODE_SUCCESS) {
×
2711
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2712
      T_LONG_JMP(pTaskInfo->env, code);
×
2713
    }
2714
  }
2715

2716
  if (iaInfo->timeWindowInterpo) {
×
2717
    saveDataBlockLastRow(iaInfo->pPrevValues, pBlock, iaInfo->pInterpCols);
×
2718
  }
2719
}
×
2720

2721
static int32_t doMergeIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
2722
  int32_t        code = TSDB_CODE_SUCCESS;
×
2723
  int32_t        lino = 0;
×
2724
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
×
2725

2726
  SMergeIntervalAggOperatorInfo* miaInfo = pOperator->info;
×
2727
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2728
  SExprSupp*                     pExpSupp = &pOperator->exprSupp;
×
2729

2730
  if (pOperator->status == OP_EXEC_DONE) {
×
2731
    (*ppRes) = NULL;
×
2732
    return code;
×
2733
  }
2734

2735
  SSDataBlock* pRes = iaInfo->binfo.pRes;
×
2736
  blockDataCleanup(pRes);
×
2737
  code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
×
2738
  QUERY_CHECK_CODE(code, lino, _end);
×
2739

2740
  if (!miaInfo->inputBlocksFinished) {
×
2741
    SOperatorInfo* downstream = pOperator->pDownstream[0];
×
2742
    while (1) {
×
2743
      SSDataBlock* pBlock = NULL;
×
2744
      if (miaInfo->prefetchedBlock == NULL) {
×
2745
        pBlock = getNextBlockFromDownstream(pOperator, 0);
×
2746
      } else {
2747
        pBlock = miaInfo->prefetchedBlock;
×
2748
        miaInfo->groupId = pBlock->info.id.groupId;
×
2749
        miaInfo->prefetchedBlock = NULL;
×
2750
      }
2751

2752
      if (pBlock == NULL) {
×
2753
        tdListInitIter(miaInfo->groupIntervals, &miaInfo->groupIntervalsIter, TD_LIST_FORWARD);
×
2754
        miaInfo->inputBlocksFinished = true;
×
2755
        break;
×
2756
      }
2757

2758
      if (!miaInfo->hasGroupId) {
×
2759
        miaInfo->hasGroupId = true;
×
2760
        miaInfo->groupId = pBlock->info.id.groupId;
×
2761
      } else if (miaInfo->groupId != pBlock->info.id.groupId) {
×
2762
        miaInfo->prefetchedBlock = pBlock;
×
2763
        break;
×
2764
      }
2765

2766
      pRes->info.scanFlag = pBlock->info.scanFlag;
×
2767
      code = setInputDataBlock(pExpSupp, pBlock, iaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
×
2768
      QUERY_CHECK_CODE(code, lino, _end);
×
2769

2770
      doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, pBlock->info.scanFlag, pRes);
×
2771

2772
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
×
2773
        break;
×
2774
      }
2775
    }
2776

2777
    pRes->info.id.groupId = miaInfo->groupId;
×
2778
  }
2779

2780
  if (miaInfo->inputBlocksFinished) {
×
2781
    SListNode* listNode = tdListNext(&miaInfo->groupIntervalsIter);
×
2782

2783
    if (listNode != NULL) {
×
2784
      SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data);
×
2785
      pRes->info.id.groupId = grpWin->groupId;
×
2786
    }
2787
  }
2788

2789
  if (pRes->info.rows == 0) {
×
2790
    setOperatorCompleted(pOperator);
×
2791
  }
2792

2793
_end:
×
2794
  if (code != TSDB_CODE_SUCCESS) {
×
2795
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2796
    pTaskInfo->code = code;
×
2797
    T_LONG_JMP(pTaskInfo->env, code);
×
2798
  }
2799
  (*ppRes) = (pRes->info.rows == 0) ? NULL : pRes;
×
2800
  return code;
×
2801
}
2802

2803
static int32_t resetMergeIntervalOperState(SOperatorInfo* pOper) {
×
2804
  SMergeIntervalAggOperatorInfo* pInfo = pOper->info;
×
2805

2806
  pInfo->hasGroupId = false;
×
2807
  pInfo->groupId = 0;
×
2808
  pInfo->prefetchedBlock = NULL;
×
2809
  pInfo->inputBlocksFinished = false;
×
2810
  tdListEmpty(pInfo->groupIntervals);
×
2811
  
2812
  SIntervalAggOperatorInfo* pIntervalInfo = &pInfo->intervalAggOperatorInfo;
×
2813
  return resetInterval(pOper, pIntervalInfo);
×
2814
}
2815

2816
int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
×
2817
                                        SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2818
  QRY_PARAM_CHECK(pOptrInfo);
×
2819

2820
  int32_t                        code = TSDB_CODE_SUCCESS;
×
2821
  int32_t                        lino = 0;
×
2822
  SMergeIntervalAggOperatorInfo* pMergeIntervalInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo));
×
2823
  SOperatorInfo*                 pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
2824
  if (pMergeIntervalInfo == NULL || pOperator == NULL) {
×
2825
    code = terrno;
×
2826
    goto _error;
×
2827
  }
2828
  initOperatorCostInfo(pOperator);
×
2829

2830
  pOperator->pPhyNode = pIntervalPhyNode;
×
2831
  SInterval interval = {.interval = pIntervalPhyNode->interval,
×
2832
                        .sliding = pIntervalPhyNode->sliding,
×
2833
                        .intervalUnit = pIntervalPhyNode->intervalUnit,
×
2834
                        .slidingUnit = pIntervalPhyNode->slidingUnit,
×
2835
                        .offset = pIntervalPhyNode->offset,
×
2836
                        .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision,
×
2837
                        .timeRange = pIntervalPhyNode->timeRange};
2838
  calcIntervalAutoOffset(&interval);
×
2839

2840
  pMergeIntervalInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow));
×
2841

2842
  SIntervalAggOperatorInfo* pIntervalInfo = &pMergeIntervalInfo->intervalAggOperatorInfo;
×
2843
  pIntervalInfo->win = pTaskInfo->window;
×
2844
  pIntervalInfo->binfo.inputTsOrder = pIntervalPhyNode->window.node.inputTsOrder;
×
2845
  pIntervalInfo->interval = interval;
×
2846
  pIntervalInfo->binfo.mergeResultBlock = pIntervalPhyNode->window.mergeDataBlock;
×
2847
  pIntervalInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
×
2848
  pIntervalInfo->binfo.outputTsOrder = pIntervalPhyNode->window.node.outputTsOrder;
×
2849

2850
  SExprSupp* pExprSupp = &pOperator->exprSupp;
×
2851
  pExprSupp->hasWindowOrGroup = true;
×
2852
  pExprSupp->hasWindow = true;
×
2853

2854
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
×
2855
  initResultSizeInfo(&pOperator->resultInfo, 4096);
×
2856

2857
  int32_t    num = 0;
×
2858
  SExprInfo* pExprInfo = NULL;
×
2859
  code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
×
2860
  QUERY_CHECK_CODE(code, lino, _error);
×
2861

2862
  code = initAggSup(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
×
2863
                    NULL, &pTaskInfo->storageAPI.functionStore);
2864
  if (code != TSDB_CODE_SUCCESS) {
×
2865
    goto _error;
×
2866
  }
2867

2868
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pIntervalPhyNode->window.node.pOutputDataBlockDesc);
×
2869
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
×
2870
  initBasicInfo(&pIntervalInfo->binfo, pResBlock);
×
2871
  code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win);
×
2872
  QUERY_CHECK_CODE(code, lino, _error);
×
2873

2874
  pIntervalInfo->timeWindowInterpo = false;
×
2875
  code = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo, &pIntervalInfo->timeWindowInterpo);
×
2876
  QUERY_CHECK_CODE(code, lino, _error);
×
2877
  if (pIntervalInfo->timeWindowInterpo) {
×
2878
    pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
2879
    if (pIntervalInfo->binfo.resultRowInfo.openWindow == NULL) {
×
2880
      goto _error;
×
2881
    }
2882
  }
2883

2884
  pIntervalInfo->pOperator = pOperator;
×
2885
  initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo);
×
2886
  setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false,
×
2887
                  OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo);
2888
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAggNext, NULL, destroyMergeIntervalOperatorInfo,
×
2889
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2890
  setOperatorResetStateFn(pOperator, resetMergeIntervalOperState);
×
2891

2892
  code = appendDownstream(pOperator, &downstream, 1);
×
2893
  if (code != TSDB_CODE_SUCCESS) {
×
2894
    goto _error;
×
2895
  }
2896

2897
  *pOptrInfo = pOperator;
×
2898
  return TSDB_CODE_SUCCESS;
×
2899
_error:
×
2900
  if (pMergeIntervalInfo != NULL) {
×
2901
    destroyMergeIntervalOperatorInfo(pMergeIntervalInfo);
×
2902
  }
2903
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2904
  pTaskInfo->code = code;
×
2905
  return code;
×
2906
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc