• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4864

26 Nov 2025 05:46AM UTC coverage: 64.548% (+0.009%) from 64.539%
#4864

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

769 of 945 new or added lines in 33 files covered. (81.38%)

3006 existing lines in 116 files now uncovered.

158227 of 245129 relevant lines covered (64.55%)

111826500.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.32
/source/libs/executor/src/timewindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "executorInt.h"
16
#include "filter.h"
17
#include "function.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "taoserror.h"
23
#include "tchecksum.h"
24
#include "tcommon.h"
25
#include "tcompare.h"
26
#include "tdatablock.h"
27
#include "tfill.h"
28
#include "tglobal.h"
29
#include "tlog.h"
30
#include "ttime.h"
31

32
typedef enum SResultTsInterpType {
33
  RESULT_ROW_START_INTERP = 1,
34
  RESULT_ROW_END_INTERP = 2,
35
} SResultTsInterpType;
36

37
typedef struct SOpenWindowInfo {
38
  SResultRowPosition pos;
39
  uint64_t           groupId;
40
} SOpenWindowInfo;
41

42
static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo);
43

44
static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult,
45
                                              uint64_t groupId, SExecTaskInfo* pTaskInfo);
46
static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult);
47

48
static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, bool masterscan,
2,147,483,647✔
49
                                      SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
50
                                      int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
51
                                      SExecTaskInfo* pTaskInfo) {
52
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
2,147,483,647✔
53
                                                  masterscan, tableGroupId, pTaskInfo, true, pAggSup, true);
54

55
  if (pResultRow == NULL || pTaskInfo->code != 0) {
2,147,483,647✔
56
    *pResult = NULL;
163✔
57
    qError("failed to set result output buffer, error:%s", tstrerror(pTaskInfo->code));
×
58
    return pTaskInfo->code;
×
59
  }
60

61
  // set time window for current result
62
  TAOS_SET_POBJ_ALIGNED(&pResultRow->win, win);
2,147,483,647✔
63
  *pResult = pResultRow;
2,147,483,647✔
64
  return setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
2,147,483,647✔
65
}
66

67
void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, int32_t rowIndex, uint64_t groupId) {
2,147,483,647✔
68
  pRowSup->win.ekey = ts;
2,147,483,647✔
69
  pRowSup->prevTs = ts;
2,147,483,647✔
70
  pRowSup->groupId = groupId;
2,147,483,647✔
71
  pRowSup->numOfRows += 1;
2,147,483,647✔
72
  if (hasContinuousNullRows(pRowSup)) {
2,147,483,647✔
73
    // rows having null state col are wrapped by rows of same state
74
    // these rows can be counted into current window
75
    pRowSup->numOfRows += pRowSup->numNullRows;
66,135,204✔
76
    resetNumNullRows(pRowSup);
66,135,204✔
77
  }
78
}
2,147,483,647✔
79

80
void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, uint64_t groupId) {
827,789,760✔
81
  pRowSup->startRowIndex = rowIndex;
827,789,760✔
82
  pRowSup->numOfRows = 0;
827,789,760✔
83
  pRowSup->win.skey = tsList[rowIndex];
827,789,760✔
84
  pRowSup->groupId = groupId;
827,789,760✔
85
  resetNumNullRows(pRowSup);
827,789,760✔
86
}
827,789,760✔
87

88
FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos,
×
89
                                            int32_t order, int64_t* pData) {
90
  int32_t forwardRows = 0;
2,147,483,647✔
91

92
  if (order == TSDB_ORDER_ASC) {
×
93
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
2,147,483,647✔
94
    if (end >= 0) {
2,147,483,647✔
95
      forwardRows = end;
2,147,483,647✔
96

97
      while (pData[end + pos] == ekey) {
2,147,483,647✔
98
        forwardRows += 1;
9,037,080✔
99
        ++pos;
9,037,080✔
100
      }
101
    }
102
  } else {
103
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
456,830,433✔
104
    if (end >= 0) {
456,901,426✔
105
      forwardRows = end;
457,076,146✔
106

107
      while (pData[end + pos] == ekey) {
907,014,818✔
108
        forwardRows += 1;
449,938,672✔
109
        ++pos;
449,938,672✔
110
      }
111
    }
112
    //    int32_t end = searchFn((char*)pData, pos + 1, ekey, order);
113
    //    if (end >= 0) {
114
    //      forwardRows = pos - end;
115
    //
116
    //      if (pData[end] == ekey) {
117
    //        forwardRows += 1;
118
    //      }
119
    //    }
120
  }
121

122
  return forwardRows;
2,147,483,647✔
123
}
124

125
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
2,147,483,647✔
126
  int32_t midPos = -1;
2,147,483,647✔
127
  int32_t numOfRows;
128

129
  if (num <= 0) {
2,147,483,647✔
130
    return -1;
×
131
  }
132

133
  TSKEY*  keyList = (TSKEY*)pValue;
2,147,483,647✔
134
  int32_t firstPos = 0;
2,147,483,647✔
135
  int32_t lastPos = num - 1;
2,147,483,647✔
136

137
  if (order == TSDB_ORDER_DESC) {
2,147,483,647✔
138
    // find the first position which is smaller than the key
139
    while (1) {
140
      if (key >= keyList[firstPos]) return firstPos;
717,803,528✔
141
      if (key == keyList[lastPos]) return lastPos;
287,824,747✔
142

143
      if (key < keyList[lastPos]) {
280,551,210✔
144
        lastPos += 1;
6,888,035✔
145
        if (lastPos >= num) {
6,888,035✔
146
          return -1;
×
147
        } else {
148
          return lastPos;
6,888,035✔
149
        }
150
      }
151

152
      numOfRows = lastPos - firstPos + 1;
273,601,555✔
153
      midPos = (numOfRows >> 1) + firstPos;
273,601,555✔
154

155
      if (key < keyList[midPos]) {
273,601,555✔
156
        firstPos = midPos + 1;
14,113,746✔
157
      } else if (key > keyList[midPos]) {
259,586,869✔
158
        lastPos = midPos - 1;
246,446,208✔
159
      } else {
160
        break;
13,260,001✔
161
      }
162
    }
163

164
  } else {
165
    // find the first position which is bigger than the key
166
    while (1) {
167
      if (key <= keyList[firstPos]) return firstPos;
2,147,483,647✔
168
      if (key == keyList[lastPos]) return lastPos;
2,147,483,647✔
169

170
      if (key > keyList[lastPos]) {
2,147,483,647✔
171
        lastPos = lastPos + 1;
2,124,872,767✔
172
        if (lastPos >= num)
2,124,872,767✔
173
          return -1;
×
174
        else
175
          return lastPos;
2,124,872,767✔
176
      }
177

178
      numOfRows = lastPos - firstPos + 1;
2,147,483,647✔
179
      midPos = (numOfRows >> 1u) + firstPos;
2,147,483,647✔
180

181
      if (key < keyList[midPos]) {
2,147,483,647✔
182
        lastPos = midPos - 1;
2,147,483,647✔
183
      } else if (key > keyList[midPos]) {
1,305,139,762✔
184
        firstPos = midPos + 1;
1,289,441,885✔
185
      } else {
186
        break;
15,688,779✔
187
      }
188
    }
189
  }
190

191
  return midPos;
28,948,780✔
192
}
193

194
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
2,147,483,647✔
195
                                 __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order) {
196
  int32_t num = -1;
2,147,483,647✔
197
  int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
2,147,483,647✔
198

199
  if (order == TSDB_ORDER_ASC) {
2,147,483,647✔
200
    if (ekey < pDataBlockInfo->window.ekey && pPrimaryColumn) {
2,147,483,647✔
201
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
2,147,483,647✔
202
      if (item != NULL) {
2,147,483,647✔
203
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
204
      }
205
    } else {
206
      num = pDataBlockInfo->rows - startPos;
48,157,200✔
207
      if (item != NULL) {
48,896,361✔
208
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
209
      }
210
    }
211
  } else {  // desc
212
    if (ekey > pDataBlockInfo->window.skey && pPrimaryColumn) {
453,022,552✔
213
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
456,632,656✔
214
      if (item != NULL) {
456,939,036✔
215
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
216
      }
217
    } else {
218
      num = pDataBlockInfo->rows - startPos;
574,308✔
219
      if (item != NULL) {
1,188,067✔
220
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
221
      }
222
    }
223
  }
224

225
  return num;
2,147,483,647✔
226
}
227

228
void doTimeWindowInterpolation(SArray* pPrevValues, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs,
11,056,985✔
229
                               int32_t curRowIndex, TSKEY windowKey, int32_t type, SExprSupp* pSup) {
230
  SqlFunctionCtx* pCtx = pSup->pCtx;
11,056,985✔
231

232
  int32_t index = 1;
11,056,985✔
233
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
33,176,619✔
234
    if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) {
22,119,634✔
235
      pCtx[k].start.key = INT64_MIN;
11,062,649✔
236
      continue;
11,062,649✔
237
    }
238

239
    SFunctParam*     pParam = &pCtx[k].param[0];
11,056,985✔
240
    SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, pParam->pCol->slotId);
11,056,985✔
241

242
    double v1 = 0, v2 = 0, v = 0;
11,056,985✔
243
    if (prevRowIndex == -1) {
11,056,985✔
244
      SGroupKeys* p = taosArrayGet(pPrevValues, index);
×
245
      GET_TYPED_DATA(v1, double, pColInfo->info.type, p->pData, typeGetTypeModFromColInfo(&pColInfo->info));
×
246
    } else {
247
      GET_TYPED_DATA(v1, double, pColInfo->info.type, colDataGetData(pColInfo, prevRowIndex),
11,056,985✔
248
                     typeGetTypeModFromColInfo(&pColInfo->info));
249
    }
250

251
    GET_TYPED_DATA(v2, double, pColInfo->info.type, colDataGetData(pColInfo, curRowIndex),
11,056,985✔
252
                   typeGetTypeModFromColInfo(&pColInfo->info));
253

254
#if 0
255
    if (functionId == FUNCTION_INTERP) {
256
      if (type == RESULT_ROW_START_INTERP) {
257
        pCtx[k].start.key = prevTs;
258
        pCtx[k].start.val = v1;
259

260
        pCtx[k].end.key = curTs;
261
        pCtx[k].end.val = v2;
262

263
        if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_VARBINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR ||
264
            pColInfo->info.type == TSDB_DATA_TYPE_GEOMETRY) {
265
          if (prevRowIndex == -1) {
266
            //            pCtx[k].start.ptr = (char*)pRuntimeEnv->prevRow[index];
267
          } else {
268
            pCtx[k].start.ptr = (char*)pColInfo->pData + prevRowIndex * pColInfo->info.bytes;
269
          }
270

271
          pCtx[k].end.ptr = (char*)pColInfo->pData + curRowIndex * pColInfo->info.bytes;
272
        }
273
      }
274
    } else if (functionId == FUNCTION_TWA) {
275
#endif
276

277
    SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
11,056,985✔
278
    SPoint point2 = (SPoint){.key = curTs, .val = &v2};
11,056,985✔
279
    SPoint point = (SPoint){.key = windowKey, .val = &v};
11,056,985✔
280

281
    if (!fmIsElapsedFunc(pCtx[k].functionId)) {
11,056,985✔
282
      taosGetLinearInterpolationVal(&point, TSDB_DATA_TYPE_DOUBLE, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, 0);
10,993,817✔
283
    }
284

285
    if (type == RESULT_ROW_START_INTERP) {
11,056,985✔
286
      pCtx[k].start.key = point.key;
5,495,687✔
287
      pCtx[k].start.val = v;
5,495,687✔
288
    } else {
289
      pCtx[k].end.key = point.key;
5,561,298✔
290
      pCtx[k].end.val = v;
5,561,298✔
291
    }
292

293
    index += 1;
11,056,985✔
294
  }
295
#if 0
296
  }
297
#endif
298
}
11,056,985✔
299

300
static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t type) {
711,107✔
301
  if (type == RESULT_ROW_START_INTERP) {
711,107✔
302
    for (int32_t k = 0; k < numOfOutput; ++k) {
1,130,682✔
303
      pCtx[k].start.key = INT64_MIN;
742,323✔
304
    }
305
  } else {
306
    for (int32_t k = 0; k < numOfOutput; ++k) {
967,257✔
307
      pCtx[k].end.key = INT64_MIN;
644,509✔
308
    }
309
  }
310
}
711,107✔
311

312
static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, int32_t pos, SSDataBlock* pBlock,
5,884,046✔
313
                                              const TSKEY* tsCols, STimeWindow* win, SExprSupp* pSup) {
314
  bool ascQuery = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
5,884,046✔
315

316
  TSKEY curTs = tsCols[pos];
5,884,046✔
317

318
  SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
5,884,046✔
319
  TSKEY       lastTs = *(int64_t*)pTsKey->pData;
5,884,046✔
320

321
  // lastTs == INT64_MIN and pos == 0 means this is the first time window, interpolation is not needed.
322
  // start exactly from this point, no need to do interpolation
323
  TSKEY key = ascQuery ? win->skey : win->ekey;
5,884,046✔
324
  if (key == curTs) {
5,884,046✔
325
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
384,768✔
326
    return true;
384,768✔
327
  }
328

329
  // it is the first time window, no need to do interpolation
330
  if (pTsKey->isNull && pos == 0) {
5,499,278✔
331
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
3,591✔
332
  } else {
333
    TSKEY prevTs = ((pos == 0) ? lastTs : tsCols[pos - 1]);
5,495,687✔
334
    doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, pos - 1, curTs, pos, key,
5,495,687✔
335
                              RESULT_ROW_START_INTERP, pSup);
336
  }
337

338
  return true;
5,499,278✔
339
}
340

341
static int32_t setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex,
5,884,046✔
342
                                               int32_t nextRowIndex, SArray* pDataBlock, const TSKEY* tsCols,
343
                                               TSKEY blockEkey, STimeWindow* win, bool* pRes) {
344
  int32_t code = TSDB_CODE_SUCCESS;
5,884,046✔
345
  int32_t lino = 0;
5,884,046✔
346
  int32_t order = pInfo->binfo.inputTsOrder;
5,884,046✔
347

348
  TSKEY actualEndKey = tsCols[endRowIndex];
5,884,046✔
349
  TSKEY key = (order == TSDB_ORDER_ASC) ? win->ekey : win->skey;
5,884,046✔
350

351
  // not ended in current data block, do not invoke interpolation
352
  if ((key > blockEkey && (order == TSDB_ORDER_ASC)) || (key < blockEkey && (order == TSDB_ORDER_DESC))) {
5,884,046✔
353
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
18,194✔
354
    (*pRes) = false;
18,194✔
355
    return code;
18,194✔
356
  }
357

358
  // there is actual end point of current time window, no interpolation needs
359
  if (key == actualEndKey) {
5,865,852✔
360
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
304,554✔
361
    (*pRes) = true;
304,554✔
362
    return code;
304,554✔
363
  }
364

365
  if (nextRowIndex < 0) {
5,561,298✔
366
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
367
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
368
  }
369

370
  TSKEY nextKey = tsCols[nextRowIndex];
5,561,298✔
371
  doTimeWindowInterpolation(pInfo->pPrevValues, pDataBlock, actualEndKey, endRowIndex, nextKey, nextRowIndex, key,
5,561,298✔
372
                            RESULT_ROW_END_INTERP, pSup);
373
  (*pRes) = true;
5,561,298✔
374
  return code;
5,561,298✔
375
}
376

377
bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd, EStreamType blockType) {
2,147,483,647✔
378
  if (pInterval->interval != pInterval->sliding &&
2,147,483,647✔
379
      ((pWin->ekey < calStart || pWin->skey > calEnd) || (blockType == STREAM_PULL_DATA && pWin->skey < calStart))) {
24,217,765✔
380
    return false;
×
381
  }
382

383
  return true;
2,147,483,647✔
384
}
385

386
bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo) {
2,147,483,647✔
387
  return inCalSlidingWindow(pInterval, pWin, pBlockInfo->calWin.skey, pBlockInfo->calWin.ekey, pBlockInfo->type);
2,147,483,647✔
388
}
389

390
int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
2,147,483,647✔
391
                               TSKEY* primaryKeys, int32_t prevPosition, int32_t order) {
392
  bool ascQuery = (order == TSDB_ORDER_ASC);
2,147,483,647✔
393

394
  int32_t precision = pInterval->precision;
2,147,483,647✔
395
  getNextTimeWindow(pInterval, pNext, order);
2,147,483,647✔
396

397
  // next time window is not in current block
398
  if ((pNext->skey > pDataBlockInfo->window.ekey && order == TSDB_ORDER_ASC) ||
2,147,483,647✔
399
      (pNext->ekey < pDataBlockInfo->window.skey && order == TSDB_ORDER_DESC)) {
2,147,483,647✔
400
    return -1;
43,016,214✔
401
  }
402

403
  if (!inSlidingWindow(pInterval, pNext, pDataBlockInfo) && order == TSDB_ORDER_ASC) {
2,147,483,647✔
404
    return -1;
×
405
  }
406

407
  TSKEY   skey = ascQuery ? pNext->skey : pNext->ekey;
2,147,483,647✔
408
  int32_t startPos = 0;
2,147,483,647✔
409

410
  // tumbling time window query, a special case of sliding time window query
411
  if (pInterval->sliding == pInterval->interval && prevPosition != -1) {
2,147,483,647✔
412
    startPos = prevPosition + 1;
2,147,483,647✔
413
  } else {
414
    if ((skey <= pDataBlockInfo->window.skey && ascQuery) || (skey >= pDataBlockInfo->window.ekey && !ascQuery)) {
22,380,821✔
415
      startPos = 0;
3,218,394✔
416
    } else {
417
      startPos = binarySearchForKey((char*)primaryKeys, pDataBlockInfo->rows, skey, order);
20,999,371✔
418
    }
419
  }
420

421
  /* interp query with fill should not skip time window */
422
  //  if (pQueryAttr->pointInterpQuery && pQueryAttr->fillType != TSDB_FILL_NONE) {
423
  //    return startPos;
424
  //  }
425

426
  /*
427
   * This time window does not cover any data, try next time window,
428
   * this case may happen when the time window is too small
429
   */
430
  if (primaryKeys != NULL) {
2,147,483,647✔
431
    if (ascQuery && primaryKeys[startPos] > pNext->ekey) {
2,147,483,647✔
432
      TSKEY next = primaryKeys[startPos];
1,650,125,873✔
433
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
1,650,821,649✔
434
        pNext->skey = taosTimeTruncate(next, pInterval);
548✔
435
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
1,876✔
436
      } else {
437
        pNext->ekey += ((next - pNext->ekey + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
1,652,178,921✔
438
        pNext->skey = pNext->ekey - pInterval->interval + 1;
1,652,280,349✔
439
      }
440
    } else if ((!ascQuery) && primaryKeys[startPos] < pNext->skey) {
1,395,309,965✔
441
      TSKEY next = primaryKeys[startPos];
428,680,510✔
442
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
429,194,332✔
443
        pNext->skey = taosTimeTruncate(next, pInterval);
2✔
444
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
×
445
      } else {
446
        pNext->skey -= ((pNext->skey - next + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
428,930,912✔
447
        pNext->ekey = pNext->skey + pInterval->interval - 1;
429,545,251✔
448
      }
449
    }
450
  }
451

452
  return startPos;
2,147,483,647✔
453
}
454

455
static bool isResultRowInterpolated(SResultRow* pResult, SResultTsInterpType type) {
17,652,138✔
456
  if (type == RESULT_ROW_START_INTERP) {
17,652,138✔
457
    return pResult->startInterp == true;
5,884,046✔
458
  } else {
459
    return pResult->endInterp == true;
11,768,092✔
460
  }
461
}
462

463
static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
11,749,898✔
464
  if (type == RESULT_ROW_START_INTERP) {
11,749,898✔
465
    pResult->startInterp = true;
5,884,046✔
466
  } else {
467
    pResult->endInterp = true;
5,865,852✔
468
  }
469
}
11,749,898✔
470

471
static int32_t doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataBlock* pBlock, SResultRow* pResult,
2,147,483,647✔
472
                                           STimeWindow* win, int32_t startPos, int32_t forwardRows, SExprSupp* pSup) {
473
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
474
  int32_t lino = 0;
2,147,483,647✔
475
  if (!pInfo->timeWindowInterpo) {
2,147,483,647✔
476
    return code;
2,147,483,647✔
477
  }
478

479
  if (pBlock == NULL) {
4,979,487✔
480
    code = TSDB_CODE_INVALID_PARA;
×
481
    return code;
×
482
  }
483

484
  if (pBlock->pDataBlock == NULL) {
4,979,487✔
485
    return code;
×
486
  }
487

488
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
5,884,046✔
489

490
  TSKEY* tsCols = (TSKEY*)(pColInfo->pData);
5,884,046✔
491
  bool   done = isResultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
5,884,046✔
492
  if (!done) {  // it is not interpolated, now start to generated the interpolated value
5,884,046✔
493
    bool interp = setTimeWindowInterpolationStartTs(pInfo, startPos, pBlock, tsCols, win, pSup);
5,884,046✔
494
    if (interp) {
5,884,046✔
495
      setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
5,884,046✔
496
    }
497
  } else {
498
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
×
499
  }
500

501
  // point interpolation does not require the end key time window interpolation.
502
  // interpolation query does not generate the time window end interpolation
503
  done = isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
5,884,046✔
504
  if (!done) {
5,884,046✔
505
    int32_t endRowIndex = startPos + forwardRows - 1;
5,884,046✔
506
    int32_t nextRowIndex = endRowIndex + 1;
5,884,046✔
507

508
    // duplicated ts row does not involve in the interpolation of end value for current time window
509
    int32_t x = endRowIndex;
5,884,046✔
510
    while (x > 0) {
5,897,167✔
511
      if (tsCols[x] == tsCols[x - 1]) {
5,883,977✔
512
        x -= 1;
13,121✔
513
      } else {
514
        endRowIndex = x;
5,870,856✔
515
        break;
5,870,856✔
516
      }
517
    }
518

519
    TSKEY endKey = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey;
5,884,046✔
520
    bool  interp = false;
5,884,046✔
521
    code = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, nextRowIndex, pBlock->pDataBlock, tsCols, endKey,
5,884,046✔
522
                                           win, &interp);
523
    QUERY_CHECK_CODE(code, lino, _end);
5,884,046✔
524
    if (interp) {
5,884,046✔
525
      setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
5,865,852✔
526
    }
527
  } else {
528
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
×
529
  }
530

531
_end:
5,884,046✔
532
  if (code != TSDB_CODE_SUCCESS) {
5,884,046✔
533
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
534
  }
535
  return code;
5,884,046✔
536
}
537

538
static void saveDataBlockLastRow(SArray* pPrevKeys, const SSDataBlock* pBlock, SArray* pCols) {
18,194✔
539
  if (pBlock->pDataBlock == NULL) {
18,194✔
540
    return;
×
541
  }
542

543
  size_t num = taosArrayGetSize(pPrevKeys);
18,194✔
544
  for (int32_t k = 0; k < num; ++k) {
54,582✔
545
    SColumn* pc = taosArrayGet(pCols, k);
36,388✔
546

547
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pc->slotId);
36,388✔
548

549
    SGroupKeys* pkey = taosArrayGet(pPrevKeys, k);
36,388✔
550
    for (int32_t i = pBlock->info.rows - 1; i >= 0; --i) {
36,388✔
551
      if (colDataIsNull_s(pColInfo, i)) {
72,776✔
552
        continue;
×
553
      }
554

555
      char* val = colDataGetData(pColInfo, i);
36,388✔
556
      if (IS_VAR_DATA_TYPE(pkey->type)) {
36,388✔
557
        if (IS_STR_DATA_BLOB(pkey->type)) {
×
558
          memcpy(pkey->pData, val, blobDataTLen(val));
×
559
        } else {
560
          memcpy(pkey->pData, val, varDataTLen(val));
×
561
        }
562
      } else {
563
        memcpy(pkey->pData, val, pkey->bytes);
36,388✔
564
      }
565

566
      break;
36,388✔
567
    }
568
  }
569
}
570

571
static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t numOfExprs, SResultRowInfo* pResultRowInfo,
18,194✔
572
                                       SSDataBlock* pBlock, int32_t scanFlag, int64_t* tsCols, SResultRowPosition* p) {
573
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
18,194✔
574

575
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
18,194✔
576
  SExprSupp*                pSup = &pOperatorInfo->exprSupp;
18,194✔
577

578
  int32_t startPos = 0;
18,194✔
579
  int32_t numOfOutput = pSup->numOfExprs;
18,194✔
580

581
  SResultRow* pResult = NULL;
18,194✔
582

583
  while (1) {
×
584
    SListNode*          pn = tdListGetHead(pResultRowInfo->openWindow);
18,194✔
585
    SOpenWindowInfo*    pOpenWin = (SOpenWindowInfo*)pn->data;
18,194✔
586
    uint64_t            groupId = pOpenWin->groupId;
18,194✔
587
    SResultRowPosition* p1 = &pOpenWin->pos;
18,194✔
588
    if (p->pageId == p1->pageId && p->offset == p1->offset) {
18,194✔
589
      break;
18,194✔
590
    }
591

592
    SResultRow* pr = getResultRowByPos(pInfo->aggSup.pResultBuf, p1, false);
×
593
    if (NULL == pr) {
×
594
      T_LONG_JMP(pTaskInfo->env, terrno);
×
595
    }
596

597
    if (!(pr->offset == p1->offset && pr->pageId == p1->pageId)) {
×
598
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
599
      T_LONG_JMP(pTaskInfo->env, terrno);
×
600
    }
601

602
    if (pr->closed) {
×
603
      if (!(isResultRowInterpolated(pr, RESULT_ROW_START_INTERP) &&
×
604
            isResultRowInterpolated(pr, RESULT_ROW_END_INTERP))) {
×
605
        pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
606
        T_LONG_JMP(pTaskInfo->env, terrno);
×
607
      }
608
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
×
609
      taosMemoryFree(pNode);
×
610
      continue;
×
611
    }
612

613
    STimeWindow w = pr->win;
×
614
    int32_t     ret = setTimeWindowOutputBuf(pResultRowInfo, &w, (scanFlag == MAIN_SCAN), &pResult, groupId, pSup->pCtx,
×
615
                                             numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
616
    if (ret != TSDB_CODE_SUCCESS) {
×
617
      T_LONG_JMP(pTaskInfo->env, ret);
×
618
    }
619

620
    if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
×
621
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
622
      T_LONG_JMP(pTaskInfo->env, terrno);
×
623
    }
624

625
    SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
×
626
    if (!pTsKey) {
×
627
      pTaskInfo->code = terrno;
×
628
      T_LONG_JMP(pTaskInfo->env, terrno);
×
629
    }
630

631
    int64_t prevTs = *(int64_t*)pTsKey->pData;
×
632
    if (groupId == pBlock->info.id.groupId) {
×
633
      TSKEY curTs = pBlock->info.window.skey;
×
634
      if (tsCols != NULL) {
×
635
        curTs = tsCols[startPos];
×
636
      }
637
      doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, curTs, startPos, w.ekey,
×
638
                                RESULT_ROW_END_INTERP, pSup);
639
    }
640

641
    setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
×
642
    setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
×
643

644
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, 1);
×
645
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0,
×
646
                                          pBlock->info.rows, numOfExprs);
×
647
    if (ret != TSDB_CODE_SUCCESS) {
×
648
      T_LONG_JMP(pTaskInfo->env, ret);
×
649
    }
650

651
    if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
×
652
      closeResultRow(pr);
×
653
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
×
654
      taosMemoryFree(pNode);
×
655
    } else {  // the remains are can not be closed yet.
656
      break;
×
657
    }
658
  }
659
}
18,194✔
660

661
static bool tsKeyCompFn(void* l, void* r, void* param) {
1,225,285,981✔
662
  TSKEY*                    lTS = (TSKEY*)l;
1,225,285,981✔
663
  TSKEY*                    rTS = (TSKEY*)r;
1,225,285,981✔
664
  SIntervalAggOperatorInfo* pInfo = param;
1,225,285,981✔
665
  return pInfo->binfo.outputTsOrder == ORDER_ASC ? *lTS < *rTS : *lTS > *rTS;
1,225,285,981✔
666
}
667

668
static bool isCalculatedWin(SIntervalAggOperatorInfo* pInfo, const STimeWindow* win, uint64_t tableGroupId) {
179,115,678✔
669
  char keyBuf[sizeof(TSKEY) + sizeof(uint64_t)] = {0};
179,115,678✔
670
  SET_RES_WINDOW_KEY(keyBuf, (char*)&win->skey, sizeof(TSKEY), tableGroupId);
179,181,393✔
671
  return tSimpleHashGet(pInfo->aggSup.pResultRowHashTable, keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))) != NULL;
179,411,018✔
672
}
673

674
/**
675
 * @brief check if cur window should be filtered out by limit info
676
 * @retval true if should be filtered out
677
 * @retval false if not filtering out
678
 * @note If no limit info, we skip filtering.
679
 *       If input/output ts order mismatch, we skip filtering too.
680
 *       eg. input ts order: desc, and output ts order: asc, limit: 10
681
 *       IntervalOperator should output the first 10 windows, however, we can't find the first 10 windows until we scan
682
 *       every tuple in every block.
683
 *       And the boundedQueue keeps refreshing all records with smaller ts key.
684
 */
685
static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STimeWindow* win, uint64_t groupId,
2,147,483,647✔
686
                                  SExecTaskInfo* pTaskInfo) {
687
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
688
  int32_t lino = 0;
2,147,483,647✔
689
  if (!pOperatorInfo->limited  // if no limit info, no filter will be applied
2,147,483,647✔
690
      || pOperatorInfo->binfo.inputTsOrder != pOperatorInfo->binfo.outputTsOrder
369,734,084✔
691
      // if input/output ts order mismatch, no filter
692
  ) {
693
    return false;
2,147,483,647✔
694
  }
695

696
  if (pOperatorInfo->limit == 0) return true;
182,355,285✔
697

698
  if (pOperatorInfo->pBQ == NULL) {
182,313,413✔
699
    pOperatorInfo->pBQ = createBoundedQueue(pOperatorInfo->limit - 1, tsKeyCompFn, taosAutoMemoryFree, pOperatorInfo);
935,550✔
700
    QUERY_CHECK_NULL(pOperatorInfo->pBQ, code, lino, _end, terrno);
935,745✔
701
  }
702

703
  bool shouldFilter = false;
182,337,593✔
704
  // if BQ has been full, compare it with top of BQ
705
  if (taosBQSize(pOperatorInfo->pBQ) == taosBQMaxSize(pOperatorInfo->pBQ) + 1) {
182,337,593✔
706
    PriorityQueueNode* top = taosBQTop(pOperatorInfo->pBQ);
45,209,491✔
707
    shouldFilter = tsKeyCompFn(top->data, &win->skey, pOperatorInfo);
45,200,911✔
708
  }
709
  if (shouldFilter) {
182,068,078✔
710
    return true;
2,941,200✔
711
  } else if (isCalculatedWin(pOperatorInfo, win, groupId)) {
179,126,878✔
712
    return false;
63,865,588✔
713
  }
714

715
  // cur win not been filtered out and not been pushed into BQ yet, push it into BQ
716
  PriorityQueueNode node = {.data = taosMemoryMalloc(sizeof(TSKEY))};
115,458,070✔
717
  QUERY_CHECK_NULL(node.data, code, lino, _end, terrno);
115,460,300✔
718

719
  *((TSKEY*)node.data) = win->skey;
115,460,300✔
720

721
  if (NULL == taosBQPush(pOperatorInfo->pBQ, &node)) {
115,455,815✔
722
    taosMemoryFree(node.data);
×
723
    return true;
×
724
  }
725

726
_end:
115,545,125✔
727
  if (code != TSDB_CODE_SUCCESS) {
115,525,235✔
728
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
25,350✔
729
    pTaskInfo->code = code;
25,350✔
730
    T_LONG_JMP(pTaskInfo->env, code);
×
731
  }
732
  return false;
115,499,885✔
733
}
734

735
static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
46,012,515✔
736
                            int32_t scanFlag) {
737
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
46,012,515✔
738

739
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
46,012,886✔
740
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
46,011,716✔
741

742
  int32_t     startPos = 0;
46,013,047✔
743
  int32_t     numOfOutput = pSup->numOfExprs;
46,013,047✔
744
  int64_t*    tsCols = extractTsCol(pBlock, pInfo, pTaskInfo);
46,012,642✔
745
  uint64_t    tableGroupId = pBlock->info.id.groupId;
46,011,257✔
746
  bool        ascScan = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
46,011,257✔
747
  TSKEY       ts = getStartTsKey(&pBlock->info.window, tsCols);
46,012,226✔
748
  SResultRow* pResult = NULL;
46,011,403✔
749

750
  if (tableGroupId != pInfo->curGroupId) {
46,011,864✔
751
    pInfo->handledGroupNum += 1;
5,909,307✔
752
    if (pInfo->slimited && pInfo->handledGroupNum > pInfo->slimit) {
5,907,845✔
753
      return true;
29,130✔
754
    } else {
755
      pInfo->curGroupId = tableGroupId;
5,879,503✔
756
      destroyBoundedQueue(pInfo->pBQ);
5,879,984✔
757
      pInfo->pBQ = NULL;
5,879,043✔
758
    }
759
  }
760

761
  STimeWindow win =
45,981,729✔
762
      getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->binfo.inputTsOrder);
45,980,975✔
763
  if (filterWindowWithLimit(pInfo, &win, tableGroupId, pTaskInfo)) return false;
45,981,525✔
764

765
  int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
43,386,899✔
766
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
767
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
43,387,511✔
768
    T_LONG_JMP(pTaskInfo->env, ret);
136✔
769
  }
770

771
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
43,387,585✔
772
  int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
43,387,585✔
773
                                                 pInfo->binfo.inputTsOrder);
774

775
  // prev time window not interpolation yet.
776
  if (pInfo->timeWindowInterpo) {
43,384,171✔
777
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
18,194✔
778
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
18,194✔
779

780
    // restore current time window
781
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
18,194✔
782
                                 numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
783
    if (ret != TSDB_CODE_SUCCESS) {
18,194✔
784
      T_LONG_JMP(pTaskInfo->env, ret);
×
785
    }
786

787
    // window start key interpolation
788
    ret = doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup);
18,194✔
789
    if (ret != TSDB_CODE_SUCCESS) {
18,194✔
790
      T_LONG_JMP(pTaskInfo->env, ret);
×
791
    }
792
  }
793
  // qDebug("hashIntervalAgg1 window skey: %lld, ekey:%lld, startPos: %d, forwardRows: %d",
794
  //   win.skey, win.ekey, startPos, forwardRows);
795
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, 1);
43,384,542✔
796
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
43,385,031✔
797
                                        pBlock->info.rows, numOfOutput);
43,385,184✔
798
  if (ret != TSDB_CODE_SUCCESS) {
43,381,491✔
799
    T_LONG_JMP(pTaskInfo->env, ret);
×
800
  }
801

802
  doCloseWindow(pResultRowInfo, pInfo, pResult);
43,381,491✔
803

804
  STimeWindow nextWin = win;
43,382,395✔
805
  while (1) {
2,147,483,647✔
806
    int32_t prevEndPos = forwardRows - 1 + startPos;
2,147,483,647✔
807
    startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos,
2,147,483,647✔
808
                                      pInfo->binfo.inputTsOrder);
809
    if (startPos < 0 || filterWindowWithLimit(pInfo, &nextWin, tableGroupId, pTaskInfo)) {
2,147,483,647✔
810
      break;
811
    }
812
    // null data, failed to allocate more memory buffer
813
    int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
2,147,483,647✔
814
                                          pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
815
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
2,147,483,647✔
816
      T_LONG_JMP(pTaskInfo->env, code);
1,336✔
817
    }
818

819
    // qDebug("hashIntervalAgg2 window skey: %lld, ekey:%lld, startPos: %d, forwardRows: %d",
820
      // nextWin.skey, nextWin.ekey, startPos, forwardRows);
821

822
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
2,147,483,647✔
823
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
2,147,483,647✔
824
                                           pInfo->binfo.inputTsOrder);
825
    // window start(end) key interpolation
826
    code = doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
2,147,483,647✔
827
    if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
828
      T_LONG_JMP(pTaskInfo->env, code);
×
829
    }
830
    // TODO: add to open window? how to close the open windows after input blocks exhausted?
831
#if 0
832
    if ((ascScan && ekey <= pBlock->info.window.ekey) ||
833
        (!ascScan && ekey >= pBlock->info.window.skey)) {
834
      // window start(end) key interpolation
835
      doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
836
    } else if (pInfo->timeWindowInterpo) {
837
      addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
838
    }
839
#endif
840
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, 1);
2,147,483,647✔
841
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
2,147,483,647✔
842
                                          pBlock->info.rows, numOfOutput);
2,147,483,647✔
843
    if (ret != TSDB_CODE_SUCCESS) {
2,147,483,647✔
844
      T_LONG_JMP(pTaskInfo->env, ret);
×
845
    }
846
    doCloseWindow(pResultRowInfo, pInfo, pResult);
2,147,483,647✔
847
  }
848

849
  if (pInfo->timeWindowInterpo) {
41,834,867✔
850
    saveDataBlockLastRow(pInfo->pPrevValues, pBlock, pInfo->pInterpCols);
18,194✔
851
  }
852
  return false;
43,385,517✔
853
}
854

855
void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult) {
2,147,483,647✔
856
  // current result is done in computing final results.
857
  if (pInfo->timeWindowInterpo && isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
2,147,483,647✔
858
    closeResultRow(pResult);
5,865,852✔
859
    SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
5,865,852✔
860
    taosMemoryFree(pNode);
5,865,852✔
861
  }
862
}
2,147,483,647✔
863

864
SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId,
18,194✔
865
                                       SExecTaskInfo* pTaskInfo) {
866
  int32_t         code = TSDB_CODE_SUCCESS;
18,194✔
867
  int32_t         lino = 0;
18,194✔
868
  SOpenWindowInfo openWin = {0};
18,194✔
869
  openWin.pos.pageId = pResult->pageId;
18,194✔
870
  openWin.pos.offset = pResult->offset;
18,194✔
871
  openWin.groupId = groupId;
18,194✔
872
  SListNode* pn = tdListGetTail(pResultRowInfo->openWindow);
18,194✔
873
  if (pn == NULL) {
18,194✔
874
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
18,194✔
875
    QUERY_CHECK_CODE(code, lino, _end);
18,194✔
876
    return openWin.pos;
18,194✔
877
  }
878

879
  SOpenWindowInfo* px = (SOpenWindowInfo*)pn->data;
×
880
  if (px->pos.pageId != openWin.pos.pageId || px->pos.offset != openWin.pos.offset || px->groupId != openWin.groupId) {
×
881
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
×
882
    QUERY_CHECK_CODE(code, lino, _end);
×
883
  }
884

885
_end:
×
886
  if (code != TSDB_CODE_SUCCESS) {
×
887
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
888
    pTaskInfo->code = code;
×
889
    T_LONG_JMP(pTaskInfo->env, code);
×
890
  }
891
  return openWin.pos;
×
892
}
893

894
int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo) {
47,734,651✔
895
  TSKEY* tsCols = NULL;
47,734,651✔
896

897
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
47,734,651✔
898
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
47,734,560✔
899
    if (!pColDataInfo) {
47,733,653✔
900
      pTaskInfo->code = terrno;
×
901
      T_LONG_JMP(pTaskInfo->env, terrno);
×
902
    }
903

904
    tsCols = (int64_t*)pColDataInfo->pData;
47,733,653✔
905
    if (tsCols[0] == 0) {
47,733,296✔
906
      qWarn("%s at line %d.block start ts:%" PRId64 ",end ts:%" PRId64, __func__, __LINE__, tsCols[0],
469✔
907
            tsCols[pBlock->info.rows - 1]);
908
    }
909

910
    if (tsCols[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
47,734,998✔
911
      int32_t code = blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
3,437,850✔
912
      if (code != TSDB_CODE_SUCCESS) {
3,435,742✔
913
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
914
        pTaskInfo->code = code;
×
915
        T_LONG_JMP(pTaskInfo->env, code);
×
916
      }
917
    }
918
  }
919

920
  return tsCols;
47,733,425✔
921
}
922

923
static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
11,200,296✔
924
  if (OPTR_IS_OPENED(pOperator)) {
11,200,296✔
925
    return TSDB_CODE_SUCCESS;
8,997,796✔
926
  }
927

928
  int32_t        code = TSDB_CODE_SUCCESS;
2,203,674✔
929
  int32_t        lino = 0;
2,203,674✔
930
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2,203,674✔
931
  SOperatorInfo* downstream = pOperator->pDownstream[0];
2,203,674✔
932

933
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
2,203,674✔
934
  SExprSupp*                pSup = &pOperator->exprSupp;
2,203,674✔
935

936
  int32_t scanFlag = MAIN_SCAN;
2,203,674✔
937
  int64_t st = taosGetTimestampUs();
2,203,674✔
938

939
  pInfo->cleanGroupResInfo = false;
2,203,674✔
940
  while (1) {
45,981,457✔
941
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
48,185,131✔
942
    if (pBlock == NULL) {
48,184,705✔
943
      break;
2,174,076✔
944
    }
945

946
    pInfo->binfo.pRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
46,010,629✔
947

948
    if (pInfo->scalarSupp.pExprInfo != NULL) {
46,011,806✔
949
      SExprSupp* pExprSup = &pInfo->scalarSupp;
2,497,153✔
950
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
2,497,176✔
951
                                   GET_STM_RTINFO(pOperator->pTaskInfo));
2,497,281✔
952
      QUERY_CHECK_CODE(code, lino, _end);
2,497,432✔
953
    }
954

955
    // the pDataBlock are always the same one, no need to call this again
956
    code = setInputDataBlock(pSup, pBlock, pInfo->binfo.inputTsOrder, scanFlag, true);
46,010,451✔
957
    QUERY_CHECK_CODE(code, lino, _end);
46,012,597✔
958
    if (hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag)) break;
46,012,597✔
959
  }
960

961
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->binfo.outputTsOrder);
2,203,206✔
962
  QUERY_CHECK_CODE(code, lino, _end);
2,203,209✔
963
  pInfo->cleanGroupResInfo = true;
2,203,209✔
964

965
  OPTR_SET_OPENED(pOperator);
2,203,209✔
966

967
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
2,203,206✔
968

969
_end:
2,202,919✔
970
  if (code != TSDB_CODE_SUCCESS) {
2,202,919✔
971
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
972
    pTaskInfo->code = code;
×
973
    T_LONG_JMP(pTaskInfo->env, code);
×
974
  }
975
  return code;
2,202,919✔
976
}
977

978
// start a new state window and record the start info
979
void doKeepNewStateWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList,
169,618,840✔
980
  int32_t rowIndex, uint64_t groupId, const EStateWinExtendOption* extendOption, bool hasPrevWin) {
981
  pRowSup->groupId = groupId;
169,618,840✔
982
  if (*extendOption == STATE_WIN_EXTEND_OPTION_DEFAULT ||
169,618,995✔
983
      *extendOption == STATE_WIN_EXTEND_OPTION_BACKWARD) {
36,024✔
984
    pRowSup->win.skey = tsList[rowIndex];
169,601,239✔
985
    pRowSup->startRowIndex = rowIndex;
169,601,239✔
986
    pRowSup->numOfRows = 0;
169,601,239✔
987
  } else {
988
    pRowSup->win.skey = hasPrevWin ? pRowSup->win.ekey + 1 : tsList[0];
17,756✔
989
    pRowSup->startRowIndex = hasContinuousNullRows(pRowSup) ?
35,512✔
990
      rowIndex - pRowSup->numNullRows : rowIndex;
17,756✔
991
    pRowSup->numOfRows = rowIndex - pRowSup->startRowIndex;
17,756✔
992
  }
993
  resetNumNullRows(pRowSup);
169,618,995✔
994
}
169,618,995✔
995

996
// close a state window and record its end info
997
// this functions is called when a new state row appears
998
// @param rowIndex the index of the first row of next window
999
void doKeepCurStateWindowEndInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, 
174,456,810✔
1000
  int32_t rowIndex, const EStateWinExtendOption* extendOption, bool hasNextWin) {
1001
  if (*extendOption == STATE_WIN_EXTEND_OPTION_BACKWARD) {
174,456,810✔
1002
      pRowSup->win.ekey = hasNextWin? tsList[rowIndex] - 1 : tsList[rowIndex - 1];
24,096✔
1003
      // continuous rows having null state col should be included in this window
1004
      pRowSup->numOfRows += hasContinuousNullRows(pRowSup) ?
48,192✔
1005
        pRowSup->numNullRows : 0;
24,096✔
1006
      resetNumNullRows(pRowSup);
24,096✔
1007
  }
1008
}
174,456,810✔
1009

1010
void doKeepStateWindowNullInfo(SWindowRowsSup* pRowSup, int32_t nullRowIndex) {
334,835,727✔
1011
  pRowSup->numNullRows += 1;
334,835,727✔
1012
}
334,835,727✔
1013

1014
// process a closed state window
1015
// do aggregation on the tuples within the window
1016
// partial aggregation results are stored in the output buffer
1017
static int32_t processClosedStateWindow(SStateWindowOperatorInfo* pInfo,
174,456,810✔
1018
  SWindowRowsSup* pRowSup, SExecTaskInfo* pTaskInfo,
1019
  SExprSupp* pSup, int32_t numOfOutput) {
1020
  int32_t     code = 0;
174,456,810✔
1021
  int32_t     lino = 0;
174,456,810✔
1022
  SResultRow* pResult = NULL;
174,456,810✔
1023
  code = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win,
348,913,620✔
1024
    true, &pResult, pRowSup->groupId, pSup->pCtx, numOfOutput,
174,456,810✔
1025
    pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1026
  QUERY_CHECK_CODE(code, lino, _return);
174,456,810✔
1027

1028
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
174,456,810✔
1029
  code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx,
174,456,810✔
1030
    &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
1031
    pRowSup->numOfRows, 0, numOfOutput);
1032
  QUERY_CHECK_CODE(code, lino, _return);
174,456,655✔
1033

1034
_return:
174,456,655✔
1035
  if (code != TSDB_CODE_SUCCESS) {
174,456,655✔
1036
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
512✔
1037
  }
1038
  return code;
174,456,655✔
1039
}
1040

1041
// process a data block for state window aggregation
1042
// scan from startIndex to endIndex
1043
// numPartialCalcRows returns the number of rows that have been
1044
// partially calculated within the block
1045
static void doStateWindowAggImpl(SOperatorInfo* pOperator,
11,514,582✔
1046
  SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t* startIndex,
1047
  int32_t* endIndex, int32_t* numPartialCalcRows) {
1048
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
11,514,582✔
1049
  SExprSupp*     pSup = &pOperator->exprSupp;
11,514,737✔
1050

1051
  SColumnInfoData* pStateColInfoData = 
1052
    taosArrayGet(pBlock->pDataBlock, pInfo->stateCol.slotId);
11,514,737✔
1053
  if (!pStateColInfoData) {
11,514,737✔
1054
    pTaskInfo->code = terrno;
×
1055
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1056
  }
1057
  int64_t gid = pBlock->info.id.groupId;
11,514,737✔
1058
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
11,514,737✔
1059
  int32_t bytes = pStateColInfoData->info.bytes;
11,514,582✔
1060

1061
  SColumnInfoData* pColInfoData =
1062
    taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
11,514,582✔
1063
  if (NULL == pColInfoData) {
11,514,582✔
1064
    pTaskInfo->code = terrno;
×
1065
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1066
  }
1067
  TSKEY* tsList = (TSKEY*)pColInfoData->pData;
11,514,582✔
1068

1069
  struct SColumnDataAgg* pAgg = NULL;
11,514,582✔
1070
  EStateWinExtendOption  extendOption = pInfo->extendOption;
11,514,582✔
1071
  SWindowRowsSup*        pRowSup = &pInfo->winSup;
11,514,582✔
1072
  for (int32_t j = *startIndex; j < *endIndex; ++j) {
803,452,515✔
1073
    if (pBlock->info.scanFlag != PRE_SCAN) {
791,959,871✔
1074
      if (pInfo->winSup.lastTs == INT64_MIN || gid != pRowSup->groupId || !pInfo->hasKey) {
791,889,604✔
1075
        pInfo->winSup.lastTs = tsList[j];
148,266,423✔
1076
      } else {
1077
        if (tsList[j] == pInfo->winSup.lastTs) {
643,623,181✔
1078
          // forbid duplicated ts rows
1079
          qError("%s:%d duplicated ts found in state window aggregation", __FILE__, __LINE__);
22,093✔
1080
          pTaskInfo->code = TSDB_CODE_QRY_WINDOW_DUP_TIMESTAMP;
22,093✔
1081
          T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_WINDOW_DUP_TIMESTAMP);
22,093✔
1082
        }
1083
      }
1084
    }
1085
    pAgg = (pBlock->pBlockAgg != NULL) ?
791,937,623✔
1086
      &pBlock->pBlockAgg[pInfo->stateCol.slotId] : NULL;
791,937,623✔
1087
    if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pAgg)) {
1,583,875,401✔
1088
      doKeepStateWindowNullInfo(pRowSup, j);
334,835,727✔
1089
      continue;
334,835,727✔
1090
    }
1091
    if (pStateColInfoData->pData == NULL) {
457,102,051✔
1092
      qError("%s:%d state column data is null", __FILE__, __LINE__);
×
1093
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1094
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1095
    }
1096

1097
    char* val = colDataGetData(pStateColInfoData, j);
457,101,896✔
1098

1099
    if (gid != pRowSup->groupId || !pInfo->hasKey) {
457,102,051✔
1100
      assignVal(pInfo->stateKey.pData, val, bytes, pInfo->stateKey.type);
3,011,746✔
1101
      pInfo->hasKey = true;
3,011,746✔
1102

1103
      doKeepNewStateWindowStartInfo(
3,011,746✔
1104
        pRowSup, tsList, j, gid, &extendOption, false);
1105
      doKeepTuple(pRowSup, tsList[j], j, gid);
3,011,746✔
1106
    } else if (compareVal(val, &pInfo->stateKey)) {
454,090,305✔
1107
      doKeepTuple(pRowSup, tsList[j], j, gid);
287,482,901✔
1108
    } else {
1109
      // close and process current state window
1110
      doKeepCurStateWindowEndInfo(pRowSup, tsList, j, &extendOption, true);
166,607,249✔
1111
      int32_t code = processClosedStateWindow(
166,607,249✔
1112
        pInfo, pRowSup, pTaskInfo, pSup, numOfOutput);
1113
      if (TSDB_CODE_SUCCESS != code) {
166,607,094✔
1114
        T_LONG_JMP(pTaskInfo->env, code);
×
1115
      }
1116
      *numPartialCalcRows = pRowSup->startRowIndex + pRowSup->numOfRows;
166,607,094✔
1117
      
1118
      // start a new state window
1119
      doKeepNewStateWindowStartInfo(
166,607,094✔
1120
        pRowSup, tsList, j, gid, &extendOption, true);
1121
      doKeepTuple(pRowSup, tsList[j], j, gid);
166,607,249✔
1122

1123
      assignVal(pInfo->stateKey.pData, val, bytes, pInfo->stateKey.type);
166,607,249✔
1124
    }
1125
  }
1126

1127
  if (!pInfo->hasKey || 
11,492,489✔
1128
    (pRowSup->numOfRows == 0 && 
8,975,843✔
1129
    extendOption != STATE_WIN_EXTEND_OPTION_BACKWARD)) {
1,127,686✔
1130
    // if no valid state window or we don't know
1131
    // the belonging of these null rows,
1132
    // just return
1133
    return;
3,643,083✔
1134
  }
1135
  doKeepCurStateWindowEndInfo(pRowSup, tsList, *endIndex, &extendOption, false);
7,849,561✔
1136
  int32_t code = processClosedStateWindow(
7,849,561✔
1137
    pInfo, pRowSup, pTaskInfo, pSup, numOfOutput);
1138
  if (TSDB_CODE_SUCCESS != code) {
7,849,561✔
1139
    T_LONG_JMP(pTaskInfo->env, code);
512✔
1140
  }
1141
  *numPartialCalcRows = pRowSup->startRowIndex + pRowSup->numOfRows;
7,849,049✔
1142
  // reset pRowSup after doing agg calculation
1143
  pRowSup->startRowIndex = 0;
7,849,049✔
1144
  pRowSup->numOfRows = 0;
7,849,049✔
1145
}
1146

1147
static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) {
676,248✔
1148
  if (OPTR_IS_OPENED(pOperator)) {
676,248✔
1149
    return TSDB_CODE_SUCCESS;
193,603✔
1150
  }
1151

1152
  int32_t                   code = TSDB_CODE_SUCCESS;
482,645✔
1153
  int32_t                   lino = 0;
482,645✔
1154
  SStateWindowOperatorInfo* pInfo = pOperator->info;
482,645✔
1155
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
482,645✔
1156

1157
  SExprSupp* pSup = &pOperator->exprSupp;
482,645✔
1158
  int32_t    order = pInfo->binfo.inputTsOrder;
482,645✔
1159
  int64_t    st = taosGetTimestampUs();
482,645✔
1160

1161
  SOperatorInfo* downstream = pOperator->pDownstream[0];
482,645✔
1162
  pInfo->cleanGroupResInfo = false;
482,645✔
1163

1164
  SSDataBlock* pUnfinishedBlock = NULL;
482,645✔
1165
  int32_t      startIndex = 0;
482,645✔
1166
  int32_t      endIndex = 0;
482,645✔
1167
  int32_t      numPartialCalcRows = 0;
482,645✔
1168
  while (1) {
11,492,132✔
1169
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
11,974,777✔
1170
    if (pBlock == NULL) {
11,974,393✔
1171
      if (pUnfinishedBlock != NULL) {
459,656✔
1172
        blockDataDestroy(pUnfinishedBlock);
29,521✔
1173
        pUnfinishedBlock = NULL;
29,521✔
1174
        resetWindowRowsSup(&pInfo->winSup);
29,521✔
1175
      }
1176
      break;
459,656✔
1177
    }
1178
    
1179
    // mark whether pUnfinishedBlock is a reference to pBlock
1180
    bool isRef = false;
11,514,737✔
1181
    startIndex = 0;
11,514,737✔
1182
    if (pUnfinishedBlock != NULL) {
11,514,737✔
1183
      startIndex = pUnfinishedBlock->info.rows;
5,545,098✔
1184
      // merge unfinished block with current block
1185
      code = blockDataMerge(pUnfinishedBlock, pBlock);
5,545,098✔
1186
      QUERY_CHECK_CODE(code, lino, _end);
5,545,098✔
1187
    } else {
1188
      pUnfinishedBlock = pBlock;
5,969,639✔
1189
      isRef = true;
5,969,639✔
1190
    }
1191
    endIndex = pUnfinishedBlock->info.rows;
11,514,737✔
1192

1193
    pInfo->binfo.pRes->info.scanFlag = pUnfinishedBlock->info.scanFlag;
11,514,737✔
1194
    code = setInputDataBlock(
11,514,737✔
1195
      pSup, pUnfinishedBlock, order, pUnfinishedBlock->info.scanFlag, true);
11,514,737✔
1196
    QUERY_CHECK_CODE(code, lino, _end);
11,514,737✔
1197

1198
    code = blockDataUpdateTsWindow(pUnfinishedBlock, pInfo->tsSlotId);
11,514,737✔
1199
    QUERY_CHECK_CODE(code, lino, _end);
11,514,737✔
1200

1201
    // there is an scalar expression that 
1202
    // needs to be calculated right before apply the group aggregation.
1203
    if (pInfo->scalarSup.pExprInfo != NULL) {
11,514,737✔
1204
      pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo,
376,243✔
1205
        pUnfinishedBlock, pUnfinishedBlock, pInfo->scalarSup.pCtx,
1206
        pInfo->scalarSup.numOfExprs, NULL,
1207
        GET_STM_RTINFO(pOperator->pTaskInfo));
376,243✔
1208
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
376,088✔
UNCOV
1209
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1210
      }
1211
    }
1212

1213
    doStateWindowAggImpl(pOperator, pInfo, pUnfinishedBlock, 
11,514,737✔
1214
      &startIndex, &endIndex, &numPartialCalcRows);
1215
    if (numPartialCalcRows < pUnfinishedBlock->info.rows) {
11,492,132✔
1216
      // save unfinished block for next round processing
1217
      if (isRef) {
5,574,619✔
1218
        code = createOneDataBlock(pBlock, true, &pUnfinishedBlock);
1,608,564✔
1219
        QUERY_CHECK_CODE(code, lino, _end);
1,608,436✔
1220
      }
1221
      code = blockDataTrimFirstRows(pUnfinishedBlock, numPartialCalcRows);
5,574,491✔
1222
      QUERY_CHECK_NULL(pUnfinishedBlock, code, lino, _end, terrno);
5,574,619✔
1223
    } else {
1224
      if (!isRef) {
5,917,513✔
1225
        blockDataDestroy(pUnfinishedBlock);
1,579,043✔
1226
      }
1227
      pUnfinishedBlock = NULL;
5,917,513✔
1228
    }
1229
    numPartialCalcRows = 0;
11,492,132✔
1230
  }
1231

1232
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
459,656✔
1233
  code = initGroupedResultInfo(
459,656✔
1234
    &pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
1235
  QUERY_CHECK_CODE(code, lino, _end);
459,656✔
1236
  pInfo->cleanGroupResInfo = true;
459,656✔
1237
  pOperator->status = OP_RES_TO_RETURN;
459,656✔
1238

1239
_end:
459,656✔
1240
  if (code != TSDB_CODE_SUCCESS) {
459,656✔
UNCOV
1241
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1242
    pTaskInfo->code = code;
×
1243
    T_LONG_JMP(pTaskInfo->env, code);
×
1244
  }
1245
  return code;
459,656✔
1246
}
1247

1248
static int32_t doStateWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,065,141✔
1249
  if (pOperator->status == OP_EXEC_DONE) {
1,065,141✔
1250
    (*ppRes) = NULL;
388,893✔
1251
    return TSDB_CODE_SUCCESS;
388,893✔
1252
  }
1253

1254
  int32_t                   code = TSDB_CODE_SUCCESS;
676,248✔
1255
  int32_t                   lino = 0;
676,248✔
1256
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
676,248✔
1257
  SStateWindowOperatorInfo* pInfo = pOperator->info;
676,248✔
1258
  SOptrBasicInfo*           pBInfo = &pInfo->binfo;
676,248✔
1259

1260
  code = pOperator->fpSet._openFn(pOperator);
676,248✔
1261
  QUERY_CHECK_CODE(code, lino, _end);
653,259✔
1262

1263
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
653,259✔
1264
  QUERY_CHECK_CODE(code, lino, _end);
653,259✔
1265

UNCOV
1266
  while (1) {
×
1267
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
653,259✔
1268
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
653,259✔
1269
    QUERY_CHECK_CODE(code, lino, _end);
653,259✔
1270

1271
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
653,259✔
1272
    if (!hasRemain) {
653,259✔
1273
      setOperatorCompleted(pOperator);
437,196✔
1274
      break;
437,196✔
1275
    }
1276

1277
    if (pBInfo->pRes->info.rows > 0) {
216,063✔
1278
      break;
216,063✔
1279
    }
1280
  }
1281

1282
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
653,259✔
1283

1284
_end:
653,259✔
1285
  if (code != TSDB_CODE_SUCCESS) {
653,259✔
UNCOV
1286
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1287
    pTaskInfo->code = code;
×
1288
    T_LONG_JMP(pTaskInfo->env, code);
×
1289
  }
1290
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
653,259✔
1291
  return code;
653,259✔
1292
}
1293

1294
static int32_t doBuildIntervalResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
12,691,817✔
1295
  int32_t                   code = TSDB_CODE_SUCCESS;
12,691,817✔
1296
  int32_t                   lino = 0;
12,691,817✔
1297
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
12,691,817✔
1298
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
12,691,817✔
1299

1300
  if (pOperator->status == OP_EXEC_DONE) {
12,691,817✔
1301
    (*ppRes) = NULL;
1,490,347✔
1302
    return code;
1,490,347✔
1303
  }
1304

1305
  SSDataBlock* pBlock = pInfo->binfo.pRes;
11,201,470✔
1306
  code = pOperator->fpSet._openFn(pOperator);
11,201,470✔
1307
  QUERY_CHECK_CODE(code, lino, _end);
11,201,106✔
1308

1309
  while (1) {
2,055✔
1310
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
11,203,161✔
1311
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL, NULL);
11,202,592✔
1312
    QUERY_CHECK_CODE(code, lino, _end);
11,203,057✔
1313

1314
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
11,203,057✔
1315
    if (!hasRemain) {
11,203,060✔
1316
      setOperatorCompleted(pOperator);
2,194,176✔
1317
      break;
2,194,641✔
1318
    }
1319

1320
    if (pBlock->info.rows > 0) {
9,008,884✔
1321
      break;
9,006,829✔
1322
    }
1323
  }
1324

1325
  size_t rows = pBlock->info.rows;
11,201,470✔
1326
  pOperator->resultInfo.totalRows += rows;
11,200,355✔
1327

1328
_end:
11,200,820✔
1329
  if (code != TSDB_CODE_SUCCESS) {
11,200,820✔
UNCOV
1330
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1331
    pTaskInfo->code = code;
×
1332
    T_LONG_JMP(pTaskInfo->env, code);
×
1333
  }
1334
  (*ppRes) = (rows == 0) ? NULL : pBlock;
11,200,820✔
1335
  return code;
11,201,002✔
1336
}
1337

1338
static void destroyStateWindowOperatorInfo(void* param) {
462,789✔
1339
  if (param == NULL) {
462,789✔
UNCOV
1340
    return;
×
1341
  }
1342
  SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
462,789✔
1343
  cleanupBasicInfo(&pInfo->binfo);
462,789✔
1344
  taosMemoryFreeClear(pInfo->stateKey.pData);
462,789✔
1345
  if (pInfo->pOperator) {
462,789✔
1346
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
462,789✔
1347
                      pInfo->cleanGroupResInfo);
462,789✔
1348
    pInfo->pOperator = NULL;
462,789✔
1349
  }
1350

1351
  cleanupExprSupp(&pInfo->scalarSup);
462,789✔
1352
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
462,789✔
1353
  cleanupAggSup(&pInfo->aggSup);
462,789✔
1354
  cleanupGroupResInfo(&pInfo->groupResInfo);
462,789✔
1355

1356
  taosMemoryFreeClear(param);
462,789✔
1357
}
1358

1359
static void freeItem(void* param) {
34,072✔
1360
  SGroupKeys* pKey = (SGroupKeys*)param;
34,072✔
1361
  taosMemoryFree(pKey->pData);
34,072✔
1362
}
34,072✔
1363

1364
void destroyIntervalOperatorInfo(void* param) {
2,678,266✔
1365
  if (param == NULL) {
2,678,266✔
UNCOV
1366
    return;
×
1367
  }
1368

1369
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
2,678,266✔
1370

1371
  cleanupBasicInfo(&pInfo->binfo);
2,678,266✔
1372
  if (pInfo->pOperator) {
2,678,266✔
1373
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
2,678,266✔
1374
                      pInfo->cleanGroupResInfo);
2,678,266✔
1375
    pInfo->pOperator = NULL;
2,678,266✔
1376
  }
1377

1378
  cleanupAggSup(&pInfo->aggSup);
2,678,266✔
1379
  cleanupExprSupp(&pInfo->scalarSupp);
2,678,084✔
1380

1381
  pInfo->binfo.resultRowInfo.openWindow = tdListFree(pInfo->binfo.resultRowInfo.openWindow);
2,678,266✔
1382

1383
  taosArrayDestroy(pInfo->pInterpCols);
2,678,084✔
1384
  pInfo->pInterpCols = NULL;
2,678,084✔
1385

1386
  taosArrayDestroyEx(pInfo->pPrevValues, freeItem);
2,678,266✔
1387
  pInfo->pPrevValues = NULL;
2,678,084✔
1388

1389
  cleanupGroupResInfo(&pInfo->groupResInfo);
2,678,084✔
1390
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
2,677,889✔
1391
  destroyBoundedQueue(pInfo->pBQ);
2,678,266✔
1392
  taosMemoryFreeClear(param);
2,678,071✔
1393
}
1394

1395
static int32_t initWindowInterpPrevVal(SIntervalAggOperatorInfo* pInfo) {
17,036✔
1396
  int32_t code = TSDB_CODE_SUCCESS;
17,036✔
1397
  int32_t lino = 0;
17,036✔
1398
  void*   tmp = NULL;
17,036✔
1399

1400
  pInfo->pInterpCols = taosArrayInit(4, sizeof(SColumn));
17,036✔
1401
  QUERY_CHECK_NULL(pInfo->pInterpCols, code, lino, _end, terrno);
17,036✔
1402

1403
  pInfo->pPrevValues = taosArrayInit(4, sizeof(SGroupKeys));
17,036✔
1404
  QUERY_CHECK_NULL(pInfo->pPrevValues, code, lino, _end, terrno);
17,036✔
1405

1406
  {  // ts column
1407
    SColumn c = {0};
17,036✔
1408
    c.colId = 1;
17,036✔
1409
    c.slotId = pInfo->primaryTsIndex;
17,036✔
1410
    c.type = TSDB_DATA_TYPE_TIMESTAMP;
17,036✔
1411
    c.bytes = sizeof(int64_t);
17,036✔
1412
    tmp = taosArrayPush(pInfo->pInterpCols, &c);
17,036✔
1413
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
17,036✔
1414

1415
    SGroupKeys key;
17,036✔
1416
    key.bytes = c.bytes;
17,036✔
1417
    key.type = c.type;
17,036✔
1418
    key.isNull = true;  // to denote no value is assigned yet
17,036✔
1419
    key.pData = taosMemoryCalloc(1, c.bytes);
17,036✔
1420
    QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
17,036✔
1421

1422
    tmp = taosArrayPush(pInfo->pPrevValues, &key);
17,036✔
1423
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
17,036✔
1424
  }
1425
_end:
17,036✔
1426
  if (code != TSDB_CODE_SUCCESS) {
17,036✔
UNCOV
1427
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1428
  }
1429
  return code;
17,036✔
1430
}
1431

1432
static int32_t timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SIntervalAggOperatorInfo* pInfo,
2,675,877✔
1433
                                      bool* pRes) {
1434
  // the primary timestamp column
1435
  bool    needed = false;
2,675,877✔
1436
  int32_t code = TSDB_CODE_SUCCESS;
2,675,877✔
1437
  int32_t lino = 0;
2,675,877✔
1438
  void*   tmp = NULL;
2,675,877✔
1439

1440
  for (int32_t i = 0; i < numOfCols; ++i) {
13,976,922✔
1441
    SExprInfo* pExpr = pCtx[i].pExpr;
11,323,171✔
1442
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
11,322,221✔
1443
      needed = true;
17,036✔
1444
      break;
17,036✔
1445
    }
1446
  }
1447

1448
  if (needed) {
2,670,787✔
1449
    code = initWindowInterpPrevVal(pInfo);
17,036✔
1450
    QUERY_CHECK_CODE(code, lino, _end);
17,036✔
1451
  }
1452

1453
  for (int32_t i = 0; i < numOfCols; ++i) {
13,997,215✔
1454
    SExprInfo* pExpr = pCtx[i].pExpr;
11,328,761✔
1455

1456
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
11,329,842✔
1457
      SFunctParam* pParam = &pExpr->base.pParam[0];
17,036✔
1458

1459
      SColumn c = *pParam->pCol;
17,036✔
1460
      tmp = taosArrayPush(pInfo->pInterpCols, &c);
17,036✔
1461
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
17,036✔
1462

1463
      SGroupKeys key = {0};
17,036✔
1464
      key.bytes = c.bytes;
17,036✔
1465
      key.type = c.type;
17,036✔
1466
      key.isNull = false;
17,036✔
1467
      key.pData = taosMemoryCalloc(1, c.bytes);
17,036✔
1468
      QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
17,036✔
1469

1470
      tmp = taosArrayPush(pInfo->pPrevValues, &key);
17,036✔
1471
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
17,036✔
1472
    }
1473
  }
1474

1475
_end:
2,668,454✔
1476
  if (code != TSDB_CODE_SUCCESS) {
2,669,772✔
UNCOV
1477
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1478
  }
1479
  *pRes = needed;
2,669,772✔
1480
  return code;
2,674,513✔
1481
}
1482

UNCOV
1483
static int32_t resetInterval(SOperatorInfo* pOper, SIntervalAggOperatorInfo* pIntervalInfo){
×
UNCOV
1484
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
×
1485
  SIntervalPhysiNode* pPhynode = (SIntervalPhysiNode*)pOper->pPhyNode;
×
1486
  pOper->status = OP_NOT_OPENED;
×
1487

1488
  resetBasicOperatorState(&pIntervalInfo->binfo);
×
UNCOV
1489
  cleanupResultInfo(pIntervalInfo->pOperator->pTaskInfo, &pIntervalInfo->pOperator->exprSupp, &pIntervalInfo->groupResInfo, &pIntervalInfo->aggSup,
×
1490
    pIntervalInfo->cleanGroupResInfo);
×
1491

1492
  colDataDestroy(&pIntervalInfo->twAggSup.timeWindowData);
×
UNCOV
1493
  int32_t code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pTaskInfo->window);
×
1494
  if (code == 0) {
×
1495
    code = resetAggSup(&pOper->exprSupp, &pIntervalInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
×
1496
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
×
1497
                       &pTaskInfo->storageAPI.functionStore);
1498
  }
UNCOV
1499
  if (code == 0) {
×
UNCOV
1500
    code = resetExprSupp(&pIntervalInfo->scalarSupp, pTaskInfo, pPhynode->window.pExprs, NULL,
×
1501
                         &pTaskInfo->storageAPI.functionStore);
1502
  }
1503

UNCOV
1504
  if (pIntervalInfo->binfo.resultRowInfo.openWindow != NULL){
×
UNCOV
1505
    tdListEmpty(pIntervalInfo->binfo.resultRowInfo.openWindow);
×
1506
  }
1507

UNCOV
1508
  if (pPhynode->window.node.pSlimit && ((SLimitNode*)pPhynode->window.node.pSlimit)->limit) {
×
UNCOV
1509
    pIntervalInfo->curGroupId = UINT64_MAX;
×
1510
  }
1511

UNCOV
1512
  pIntervalInfo->cleanGroupResInfo = false;
×
UNCOV
1513
  pIntervalInfo->handledGroupNum = 0;
×
1514
  pIntervalInfo->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder;
×
1515
  pIntervalInfo->binfo.outputTsOrder = pPhynode->window.node.outputTsOrder;
×
1516

1517
  taosArrayDestroy(pIntervalInfo->pInterpCols);
×
UNCOV
1518
  pIntervalInfo->pInterpCols = NULL;
×
1519

1520
  if (pIntervalInfo->pPrevValues != NULL) {
×
UNCOV
1521
    taosArrayDestroyEx(pIntervalInfo->pPrevValues, freeItem);
×
1522
    pIntervalInfo->pPrevValues = NULL;
×
1523
    code = initWindowInterpPrevVal(pIntervalInfo);
×
1524
  }
1525

UNCOV
1526
  cleanupGroupResInfo(&pIntervalInfo->groupResInfo);
×
UNCOV
1527
  destroyBoundedQueue(pIntervalInfo->pBQ);
×
1528
  pIntervalInfo->pBQ = NULL;
×
1529
  return code;
×
1530
}
1531

UNCOV
1532
static int32_t resetIntervalOperState(SOperatorInfo* pOper) {
×
UNCOV
1533
  SIntervalAggOperatorInfo* pInfo = pOper->info;
×
1534
  return resetInterval(pOper, pInfo);
×
1535
}
1536

1537
int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
2,202,471✔
1538
                                   SOperatorInfo** pOptrInfo) {
1539
  QRY_PARAM_CHECK(pOptrInfo);
2,202,471✔
1540

1541
  int32_t                   code = TSDB_CODE_SUCCESS;
2,203,479✔
1542
  int32_t                   lino = 0;
2,203,479✔
1543
  SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
2,203,479✔
1544
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2,202,028✔
1545
  if (pInfo == NULL || pOperator == NULL) {
2,201,533✔
UNCOV
1546
    code = terrno;
×
UNCOV
1547
    lino = __LINE__;
×
1548
    goto _error;
×
1549
  }
1550

1551
  pOperator->pPhyNode = pPhyNode;
2,201,533✔
1552
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->window.node.pOutputDataBlockDesc);
2,201,233✔
1553
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
2,203,389✔
1554
  initBasicInfo(&pInfo->binfo, pResBlock);
2,203,389✔
1555

1556
  SExprSupp* pSup = &pOperator->exprSupp;
2,203,674✔
1557
  pSup->hasWindowOrGroup = true;
2,203,674✔
1558

1559
  pInfo->primaryTsIndex = ((SColumnNode*)pPhyNode->window.pTspk)->slotId;
2,203,546✔
1560

1561
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
2,203,674✔
1562
  initResultSizeInfo(&pOperator->resultInfo, 512);
2,203,674✔
1563
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
2,203,049✔
1564
  QUERY_CHECK_CODE(code, lino, _error);
2,203,517✔
1565

1566
  int32_t    num = 0;
2,203,517✔
1567
  SExprInfo* pExprInfo = NULL;
2,203,517✔
1568
  code = createExprInfo(pPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
2,203,517✔
1569
  QUERY_CHECK_CODE(code, lino, _error);
2,203,674✔
1570

1571
  code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
2,203,674✔
1572
                    &pTaskInfo->storageAPI.functionStore);
1573
  QUERY_CHECK_CODE(code, lino, _error);
2,202,838✔
1574

1575
  SInterval interval = {.interval = pPhyNode->interval,
6,607,015✔
1576
                        .sliding = pPhyNode->sliding,
2,202,373✔
1577
                        .intervalUnit = pPhyNode->intervalUnit,
2,202,838✔
1578
                        .slidingUnit = pPhyNode->slidingUnit,
2,202,478✔
1579
                        .offset = pPhyNode->offset,
2,201,903✔
1580
                        .precision = ((SColumnNode*)pPhyNode->window.pTspk)->node.resType.precision,
2,202,769✔
1581
                        .timeRange = pPhyNode->timeRange};
1582
  calcIntervalAutoOffset(&interval);
2,201,571✔
1583

1584
  STimeWindowAggSupp as = {
6,607,978✔
1585
      .waterMark = pPhyNode->window.watermark,
2,202,377✔
1586
      .calTrigger = pPhyNode->window.triggerType,
2,202,469✔
1587
      .maxTs = INT64_MIN,
1588
  };
1589

1590
  pInfo->win = pTaskInfo->window;
2,202,603✔
1591
  pInfo->binfo.inputTsOrder = pPhyNode->window.node.inputTsOrder;
2,201,691✔
1592
  pInfo->binfo.outputTsOrder = pPhyNode->window.node.outputTsOrder;
2,201,107✔
1593
  pInfo->interval = interval;
2,201,766✔
1594
  pInfo->twAggSup = as;
2,200,803✔
1595
  pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock;
2,201,884✔
1596
  if (pPhyNode->window.node.pLimit && ((SLimitNode*)pPhyNode->window.node.pLimit)->limit) {
2,201,511✔
1597
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pLimit;
904,002✔
1598
    pInfo->limited = true;
904,392✔
1599
    pInfo->limit = pLimit->limit->datum.i + (pLimit->offset ? pLimit->offset->datum.i : 0);
904,392✔
1600
  }
1601
  if (pPhyNode->window.node.pSlimit && ((SLimitNode*)pPhyNode->window.node.pSlimit)->limit) {
2,201,697✔
1602
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pSlimit;
89,326✔
1603
    pInfo->slimited = true;
89,326✔
1604
    pInfo->slimit = pLimit->limit->datum.i + (pLimit->offset ? pLimit->offset->datum.i : 0);
89,326✔
1605
    pInfo->curGroupId = UINT64_MAX;
89,131✔
1606
  }
1607

1608
  if (pPhyNode->window.pExprs != NULL) {
2,202,068✔
1609
    int32_t    numOfScalar = 0;
68,659✔
1610
    SExprInfo* pScalarExprInfo = NULL;
68,659✔
1611
    code = createExprInfo(pPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
68,787✔
1612
    QUERY_CHECK_CODE(code, lino, _error);
68,965✔
1613

1614
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
68,965✔
1615
    if (code != TSDB_CODE_SUCCESS) {
69,102✔
UNCOV
1616
      goto _error;
×
1617
    }
1618
  }
1619

1620
  code = filterInitFromNode((SNode*)pPhyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
2,201,365✔
1621
                            pTaskInfo->pStreamRuntimeInfo);
2,202,784✔
1622
  if (code != TSDB_CODE_SUCCESS) {
2,201,440✔
UNCOV
1623
    goto _error;
×
1624
  }
1625

1626
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
2,201,440✔
1627
  QUERY_CHECK_CODE(code, lino, _error);
2,199,687✔
1628

1629
  pInfo->timeWindowInterpo = false;
2,199,687✔
1630
  code = timeWindowinterpNeeded(pSup->pCtx, num, pInfo, &pInfo->timeWindowInterpo);
2,199,291✔
1631
  QUERY_CHECK_CODE(code, lino, _error);
2,200,660✔
1632
  if (pInfo->timeWindowInterpo) {
2,200,660✔
1633
    pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
17,036✔
1634
    if (pInfo->binfo.resultRowInfo.openWindow == NULL) {
17,036✔
UNCOV
1635
      goto _error;
×
1636
    }
1637
  }
1638

1639
  pInfo->pOperator = pOperator;
2,200,386✔
1640
  pInfo->cleanGroupResInfo = false;
2,201,356✔
1641
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
2,198,824✔
1642
  setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED,
2,200,426✔
1643
                  pInfo, pTaskInfo);
1644

1645
  pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResultNext, NULL, destroyIntervalOperatorInfo,
2,201,330✔
1646
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1647
  setOperatorResetStateFn(pOperator, resetIntervalOperState);
2,201,663✔
1648
  code = appendDownstream(pOperator, &downstream, 1);
2,202,415✔
1649
  if (code != TSDB_CODE_SUCCESS) {
2,200,881✔
UNCOV
1650
    goto _error;
×
1651
  }
1652

1653
  *pOptrInfo = pOperator;
2,200,881✔
1654
  return TSDB_CODE_SUCCESS;
2,200,353✔
1655

UNCOV
1656
_error:
×
UNCOV
1657
  if (pInfo != NULL) {
×
1658
    destroyIntervalOperatorInfo(pInfo);
×
1659
  }
1660

UNCOV
1661
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
UNCOV
1662
  pTaskInfo->code = code;
×
1663
  qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code));
×
1664
  return code;
×
1665
}
1666

1667
// todo handle multiple timeline cases. assume no timeline interweaving
1668
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) {
5,912,126✔
1669
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
5,912,126✔
1670
  SExprSupp*     pSup = &pOperator->exprSupp;
5,912,126✔
1671

1672
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
5,912,126✔
1673
  if (!pColInfoData) {
5,912,126✔
UNCOV
1674
    pTaskInfo->code = terrno;
×
UNCOV
1675
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1676
  }
1677

1678
  bool    masterScan = true;
5,912,126✔
1679
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
5,912,126✔
1680
  int64_t gid = pBlock->info.id.groupId;
5,912,126✔
1681

1682
  int64_t gap = pInfo->gap;
5,912,126✔
1683

1684
  if (!pInfo->reptScan) {
5,912,126✔
1685
    pInfo->reptScan = true;
180,932✔
1686
    pInfo->winSup.prevTs = INT64_MIN;
180,932✔
1687
  }
1688

1689
  SWindowRowsSup* pRowSup = &pInfo->winSup;
5,912,126✔
1690
  pRowSup->numOfRows = 0;
5,912,126✔
1691
  pRowSup->startRowIndex = 0;
5,912,126✔
1692

1693
  // In case of ascending or descending order scan data, only one time window needs to be kepted for each table.
1694
  TSKEY* tsList = (TSKEY*)pColInfoData->pData;
5,912,126✔
1695
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
1,881,181,158✔
1696
    if (gid != pRowSup->groupId || pInfo->winSup.prevTs == INT64_MIN) {
1,875,269,032✔
1697
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
4,558,659✔
1698
      doKeepTuple(pRowSup, tsList[j], j, gid);
4,558,659✔
1699
    } else if (((tsList[j] - pRowSup->prevTs >= 0) && (tsList[j] - pRowSup->prevTs <= gap)) ||
1,870,710,373✔
1700
               ((pRowSup->prevTs - tsList[j] >= 0) && (pRowSup->prevTs - tsList[j] <= gap))) {
655,859,679✔
1701
      // The gap is less than the threshold, so it belongs to current session window that has been opened already.
1702
      doKeepTuple(pRowSup, tsList[j], j, gid);
1,214,877,034✔
1703
    } else {  // start a new session window
1704
      // start a new session window
1705
      if (pRowSup->numOfRows > 0) {  // handled data that belongs to the previous session window
655,833,339✔
1706
        SResultRow* pResult = NULL;
655,708,384✔
1707

1708
        // keep the time window for the closed time window.
1709
        STimeWindow window = pRowSup->win;
655,708,384✔
1710

1711
        int32_t ret =
1712
            setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
655,708,384✔
1713
                                   numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1714
        if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
655,708,384✔
UNCOV
1715
          T_LONG_JMP(pTaskInfo->env, ret);
×
1716
        }
1717

1718
        // pInfo->numOfRows data belong to the current session window
1719
        updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
655,708,384✔
1720
        ret =
1721
            applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData,
655,708,384✔
1722
                                            pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
655,708,384✔
1723
        if (ret != TSDB_CODE_SUCCESS) {
655,708,384✔
UNCOV
1724
          T_LONG_JMP(pTaskInfo->env, ret);
×
1725
        }
1726
      }
1727

1728
      // here we start a new session window
1729
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
655,833,339✔
1730
      doKeepTuple(pRowSup, tsList[j], j, gid);
655,833,339✔
1731
    }
1732
  }
1733

1734
  SResultRow* pResult = NULL;
5,912,126✔
1735
  pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
5,912,126✔
1736
  int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
5,912,126✔
1737
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1738
  if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
5,912,126✔
UNCOV
1739
    T_LONG_JMP(pTaskInfo->env, ret);
×
1740
  }
1741

1742
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
5,912,126✔
1743
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
5,912,126✔
1744
                                        pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
5,912,126✔
1745
  if (ret != TSDB_CODE_SUCCESS) {
5,912,126✔
UNCOV
1746
    T_LONG_JMP(pTaskInfo->env, ret);
×
1747
  }
1748
}
5,912,126✔
1749

1750
static int32_t doSessionWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,928,348✔
1751
  if (pOperator->status == OP_EXEC_DONE) {
1,928,348✔
1752
    (*ppRes) = NULL;
121,983✔
1753
    return TSDB_CODE_SUCCESS;
121,983✔
1754
  }
1755

1756
  int32_t                  code = TSDB_CODE_SUCCESS;
1,806,365✔
1757
  int32_t                  lino = 0;
1,806,365✔
1758
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
1,806,365✔
1759
  SSessionAggOperatorInfo* pInfo = pOperator->info;
1,806,365✔
1760
  SOptrBasicInfo*          pBInfo = &pInfo->binfo;
1,806,365✔
1761
  SExprSupp*               pSup = &pOperator->exprSupp;
1,806,365✔
1762

1763
  if (pOperator->status == OP_RES_TO_RETURN) {
1,806,365✔
UNCOV
1764
    while (1) {
×
1765
      doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
1,619,639✔
1766
      code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
1,619,639✔
1767
      QUERY_CHECK_CODE(code, lino, _end);
1,619,639✔
1768

1769
      bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
1,619,639✔
1770
      if (!hasRemain) {
1,619,639✔
1771
        setOperatorCompleted(pOperator);
14,160✔
1772
        break;
14,160✔
1773
      }
1774

1775
      if (pBInfo->pRes->info.rows > 0) {
1,605,479✔
1776
        break;
1,605,479✔
1777
      }
1778
    }
1779
    pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
1,619,639✔
1780
    (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
1,619,639✔
1781
    return code;
1,619,639✔
1782
  }
1783

1784
  int64_t st = taosGetTimestampUs();
186,726✔
1785
  int32_t order = pInfo->binfo.inputTsOrder;
186,726✔
1786

1787
  SOperatorInfo* downstream = pOperator->pDownstream[0];
186,726✔
1788

1789
  pInfo->cleanGroupResInfo = false;
186,726✔
1790
  while (1) {
5,912,126✔
1791
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
6,098,852✔
1792
    if (pBlock == NULL) {
6,098,852✔
1793
      break;
186,726✔
1794
    }
1795

1796
    pBInfo->pRes->info.scanFlag = pBlock->info.scanFlag;
5,912,126✔
1797
    if (pInfo->scalarSupp.pExprInfo != NULL) {
5,912,126✔
1798
      SExprSupp* pExprSup = &pInfo->scalarSupp;
663✔
1799
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
663✔
1800
                                   GET_STM_RTINFO(pOperator->pTaskInfo));
663✔
1801
      QUERY_CHECK_CODE(code, lino, _end);
663✔
1802
    }
1803
    // the pDataBlock are always the same one, no need to call this again
1804
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
5,912,126✔
1805
    QUERY_CHECK_CODE(code, lino, _end);
5,912,126✔
1806

1807
    code = blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
5,912,126✔
1808
    QUERY_CHECK_CODE(code, lino, _end);
5,912,126✔
1809

1810
    doSessionWindowAggImpl(pOperator, pInfo, pBlock);
5,912,126✔
1811
  }
1812

1813
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
186,726✔
1814

1815
  // restore the value
1816
  pOperator->status = OP_RES_TO_RETURN;
186,726✔
1817

1818
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
186,726✔
1819
  QUERY_CHECK_CODE(code, lino, _end);
186,726✔
1820
  pInfo->cleanGroupResInfo = true;
186,726✔
1821

1822
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
186,726✔
1823
  QUERY_CHECK_CODE(code, lino, _end);
186,726✔
UNCOV
1824
  while (1) {
×
1825
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
186,726✔
1826
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
186,726✔
1827
    QUERY_CHECK_CODE(code, lino, _end);
186,726✔
1828

1829
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
186,726✔
1830
    if (!hasRemain) {
186,726✔
1831
      setOperatorCompleted(pOperator);
130,007✔
1832
      break;
130,007✔
1833
    }
1834

1835
    if (pBInfo->pRes->info.rows > 0) {
56,719✔
1836
      break;
56,719✔
1837
    }
1838
  }
1839
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
186,726✔
1840

1841
_end:
186,726✔
1842
  if (code != TSDB_CODE_SUCCESS) {
186,726✔
UNCOV
1843
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1844
    pTaskInfo->code = code;
×
1845
    T_LONG_JMP(pTaskInfo->env, code);
×
1846
  }
1847
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
186,726✔
1848
  return code;
186,726✔
1849
}
1850

1851
static int32_t resetStatewindowOperState(SOperatorInfo* pOper) {
22,440✔
1852
  SStateWindowOperatorInfo* pInfo = pOper->info;
22,440✔
1853
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
22,568✔
1854
  SStateWindowPhysiNode* pPhynode = (SStateWindowPhysiNode*)pOper->pPhyNode;
22,568✔
1855
  pOper->status = OP_NOT_OPENED;
22,440✔
1856

1857
  resetBasicOperatorState(&pInfo->binfo);
22,568✔
1858
  cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
22,568✔
1859
                    pInfo->cleanGroupResInfo);
22,568✔
1860

1861
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
22,568✔
1862
  int32_t code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
22,312✔
1863
  if (code == 0) {
22,312✔
1864
    code = resetAggSup(&pOper->exprSupp, &pInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
44,880✔
1865
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
22,312✔
1866
                       &pTaskInfo->storageAPI.functionStore);
1867
  }
1868
  if (code == 0) {
22,440✔
1869
    code = resetExprSupp(&pInfo->scalarSup, pTaskInfo, pPhynode->window.pExprs, NULL,
22,440✔
1870
                         &pTaskInfo->storageAPI.functionStore);
1871
  }
1872

1873
  pInfo->cleanGroupResInfo = false;
22,568✔
1874
  pInfo->hasKey = false;
22,568✔
1875
  pInfo->winSup.lastTs = INT64_MIN;
22,312✔
1876
  cleanupGroupResInfo(&pInfo->groupResInfo);
22,312✔
1877
  memset(pInfo->stateKey.pData, 0, pInfo->stateKey.bytes);
22,440✔
1878
  return code;
22,568✔
1879
}
1880

1881
// todo make this as an non-blocking operator
1882
int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWindowPhysiNode* pStateNode,
462,789✔
1883
                                      SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
1884
  QRY_PARAM_CHECK(pOptrInfo);
462,789✔
1885

1886
  int32_t                   code = TSDB_CODE_SUCCESS;
462,789✔
1887
  int32_t                   lino = 0;
462,789✔
1888
  SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
462,789✔
1889
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
462,789✔
1890
  if (pInfo == NULL || pOperator == NULL) {
462,789✔
UNCOV
1891
    code = terrno;
×
UNCOV
1892
    goto _error;
×
1893
  }
1894

1895
  pOperator->pPhyNode = pStateNode;
462,789✔
1896
  pOperator->exprSupp.hasWindowOrGroup = true;
462,789✔
1897
  int32_t      tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
462,789✔
1898
  SColumnNode* pColNode = (SColumnNode*)(pStateNode->pStateKey);
462,789✔
1899

1900
  if (pStateNode->window.pExprs != NULL) {
462,789✔
1901
    int32_t    numOfScalarExpr = 0;
311,021✔
1902
    SExprInfo* pScalarExprInfo = NULL;
311,021✔
1903
    code = createExprInfo(pStateNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
311,021✔
1904
    QUERY_CHECK_CODE(code, lino, _error);
311,021✔
1905

1906
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
311,021✔
1907
    if (code != TSDB_CODE_SUCCESS) {
311,021✔
UNCOV
1908
      goto _error;
×
1909
    }
1910
  }
1911

1912
  pInfo->stateCol = extractColumnFromColumnNode(pColNode);
462,789✔
1913
  pInfo->stateKey.type = pInfo->stateCol.type;
462,789✔
1914
  pInfo->stateKey.bytes = pInfo->stateCol.bytes;
462,789✔
1915
  pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes);
462,789✔
1916
  if (pInfo->stateKey.pData == NULL) {
462,789✔
UNCOV
1917
    goto _error;
×
1918
  }
1919
  pInfo->binfo.inputTsOrder = pStateNode->window.node.inputTsOrder;
462,789✔
1920
  pInfo->binfo.outputTsOrder = pStateNode->window.node.outputTsOrder;
462,789✔
1921

1922
  code = filterInitFromNode((SNode*)pStateNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
462,789✔
1923
                            pTaskInfo->pStreamRuntimeInfo);
462,789✔
1924
  if (code != TSDB_CODE_SUCCESS) {
462,789✔
UNCOV
1925
    goto _error;
×
1926
  }
1927

1928
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
462,789✔
1929

1930
  int32_t    num = 0;
462,789✔
1931
  SExprInfo* pExprInfo = NULL;
462,789✔
1932
  code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &num);
462,789✔
1933
  QUERY_CHECK_CODE(code, lino, _error);
462,789✔
1934

1935
  initResultSizeInfo(&pOperator->resultInfo, 4096);
462,789✔
1936

1937
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
925,578✔
1938
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
462,789✔
1939
  if (code != TSDB_CODE_SUCCESS) {
462,789✔
UNCOV
1940
    goto _error;
×
1941
  }
1942

1943
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pStateNode->window.node.pOutputDataBlockDesc);
462,789✔
1944
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
462,789✔
1945
  initBasicInfo(&pInfo->binfo, pResBlock);
462,789✔
1946
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
462,789✔
1947

1948
  pInfo->twAggSup =
462,789✔
1949
      (STimeWindowAggSupp){.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};
462,789✔
1950

1951
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
462,789✔
1952
  QUERY_CHECK_CODE(code, lino, _error);
462,789✔
1953

1954
  pInfo->tsSlotId = tsSlotId;
462,789✔
1955
  pInfo->pOperator = pOperator;
462,789✔
1956
  pInfo->cleanGroupResInfo = false;
462,789✔
1957
  pInfo->extendOption = pStateNode->extendOption;
462,789✔
1958
  pInfo->trueForLimit = pStateNode->trueForLimit;
462,789✔
1959
  pInfo->winSup.lastTs = INT64_MIN;
462,789✔
1960

1961
  setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
462,789✔
1962
                  pTaskInfo);
1963
  pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAggNext, NULL, destroyStateWindowOperatorInfo,
462,789✔
1964
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1965
  setOperatorResetStateFn(pOperator, resetStatewindowOperState);
462,789✔
1966

1967
  code = appendDownstream(pOperator, &downstream, 1);
462,789✔
1968
  if (code != TSDB_CODE_SUCCESS) {
462,789✔
UNCOV
1969
    goto _error;
×
1970
  }
1971

1972
  *pOptrInfo = pOperator;
462,789✔
1973
  return TSDB_CODE_SUCCESS;
462,789✔
1974

UNCOV
1975
_error:
×
UNCOV
1976
  if (pInfo != NULL) {
×
1977
    destroyStateWindowOperatorInfo(pInfo);
×
1978
  }
1979

UNCOV
1980
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
UNCOV
1981
  pTaskInfo->code = code;
×
1982
  return code;
×
1983
}
1984

1985
void destroySWindowOperatorInfo(void* param) {
186,726✔
1986
  SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param;
186,726✔
1987
  if (pInfo == NULL) {
186,726✔
UNCOV
1988
    return;
×
1989
  }
1990

1991
  cleanupBasicInfo(&pInfo->binfo);
186,726✔
1992
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
186,726✔
1993
  if (pInfo->pOperator) {
186,726✔
1994
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
186,726✔
1995
                      pInfo->cleanGroupResInfo);
186,726✔
1996
    pInfo->pOperator = NULL;
186,726✔
1997
  }
1998

1999
  cleanupAggSup(&pInfo->aggSup);
186,726✔
2000
  cleanupExprSupp(&pInfo->scalarSupp);
186,726✔
2001

2002
  cleanupGroupResInfo(&pInfo->groupResInfo);
186,726✔
2003
  taosMemoryFreeClear(param);
186,726✔
2004
}
2005

UNCOV
2006
static int32_t resetSessionAggOperState(SOperatorInfo* pOper) {
×
UNCOV
2007
  SSessionAggOperatorInfo* pInfo = pOper->info;
×
2008
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
×
2009
  SSessionWinodwPhysiNode* pPhynode = (SSessionWinodwPhysiNode*)pOper->pPhyNode;
×
2010
  pOper->status = OP_NOT_OPENED;
×
2011

2012
  resetBasicOperatorState(&pInfo->binfo);
×
UNCOV
2013
  cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
×
2014
                    pInfo->cleanGroupResInfo);
×
2015

2016
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
×
UNCOV
2017
  int32_t code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
×
2018
  if (code == 0) {
×
2019
    code = resetAggSup(&pOper->exprSupp, &pInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
×
2020
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
×
2021
                       &pTaskInfo->storageAPI.functionStore);
2022
  }
UNCOV
2023
  if (code == 0) {
×
UNCOV
2024
    code = resetExprSupp(&pInfo->scalarSupp, pTaskInfo, pPhynode->window.pExprs, NULL,
×
2025
                         &pTaskInfo->storageAPI.functionStore);
2026
  }
2027

UNCOV
2028
  pInfo->cleanGroupResInfo = false;
×
UNCOV
2029
  pInfo->winSup = (SWindowRowsSup){0};
×
2030
  pInfo->winSup.prevTs = INT64_MIN;
×
2031
  pInfo->reptScan = false;
×
2032

2033
  cleanupGroupResInfo(&pInfo->groupResInfo);
×
UNCOV
2034
  return code;
×
2035
}
2036

2037
int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
186,726✔
2038
                                     SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2039
  QRY_PARAM_CHECK(pOptrInfo);
186,726✔
2040

2041
  int32_t                  code = TSDB_CODE_SUCCESS;
186,726✔
2042
  int32_t                  lino = 0;
186,726✔
2043
  SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
186,726✔
2044
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
186,726✔
2045
  if (pInfo == NULL || pOperator == NULL) {
186,726✔
UNCOV
2046
    code = terrno;
×
UNCOV
2047
    goto _error;
×
2048
  }
2049

2050
  pOperator->pPhyNode = pSessionNode;
186,726✔
2051
  pOperator->exprSupp.hasWindowOrGroup = true;
186,726✔
2052

2053
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
186,726✔
2054
  initResultSizeInfo(&pOperator->resultInfo, 4096);
186,726✔
2055

2056
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc);
186,726✔
2057
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
186,726✔
2058
  initBasicInfo(&pInfo->binfo, pResBlock);
186,726✔
2059

2060
  int32_t    numOfCols = 0;
186,726✔
2061
  SExprInfo* pExprInfo = NULL;
186,726✔
2062
  code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
186,726✔
2063
  QUERY_CHECK_CODE(code, lino, _error);
186,726✔
2064

2065
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
373,452✔
2066
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
186,726✔
2067
  QUERY_CHECK_CODE(code, lino, _error);
186,726✔
2068

2069
  pInfo->twAggSup.waterMark = pSessionNode->window.watermark;
186,726✔
2070
  pInfo->twAggSup.calTrigger = pSessionNode->window.triggerType;
186,726✔
2071
  pInfo->gap = pSessionNode->gap;
186,726✔
2072

2073
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
186,726✔
2074
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
186,726✔
2075
  QUERY_CHECK_CODE(code, lino, _error);
186,726✔
2076

2077
  pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
186,726✔
2078
  pInfo->binfo.pRes = pResBlock;
186,726✔
2079
  pInfo->winSup.prevTs = INT64_MIN;
186,726✔
2080
  pInfo->reptScan = false;
186,726✔
2081
  pInfo->binfo.inputTsOrder = pSessionNode->window.node.inputTsOrder;
186,726✔
2082
  pInfo->binfo.outputTsOrder = pSessionNode->window.node.outputTsOrder;
186,726✔
2083

2084
  if (pSessionNode->window.pExprs != NULL) {
186,726✔
2085
    int32_t    numOfScalar = 0;
221✔
2086
    SExprInfo* pScalarExprInfo = NULL;
221✔
2087
    code = createExprInfo(pSessionNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
221✔
2088
    QUERY_CHECK_CODE(code, lino, _error);
221✔
2089

2090
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
221✔
2091
    QUERY_CHECK_CODE(code, lino, _error);
221✔
2092
  }
2093

2094
  code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
186,726✔
2095
                            pTaskInfo->pStreamRuntimeInfo);
186,726✔
2096
  QUERY_CHECK_CODE(code, lino, _error);
186,726✔
2097

2098
  pInfo->pOperator = pOperator;
186,726✔
2099
  pInfo->cleanGroupResInfo = false;
186,726✔
2100
  setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED,
186,726✔
2101
                  pInfo, pTaskInfo);
2102
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSessionWindowAggNext, NULL, destroySWindowOperatorInfo,
186,726✔
2103
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2104
  pOperator->pTaskInfo = pTaskInfo;
186,726✔
2105
  setOperatorResetStateFn(pOperator, resetSessionAggOperState);
186,726✔
2106

2107
  code = appendDownstream(pOperator, &downstream, 1);
186,726✔
2108
  QUERY_CHECK_CODE(code, lino, _error);
186,726✔
2109

2110
  *pOptrInfo = pOperator;
186,726✔
2111
  return TSDB_CODE_SUCCESS;
186,726✔
2112

UNCOV
2113
_error:
×
UNCOV
2114
  if (pInfo != NULL) destroySWindowOperatorInfo(pInfo);
×
2115
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2116
  pTaskInfo->code = code;
×
2117
  return code;
×
2118
}
2119

2120
void destroyMAIOperatorInfo(void* param) {
474,592✔
2121
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = (SMergeAlignedIntervalAggOperatorInfo*)param;
474,592✔
2122
  destroyIntervalOperatorInfo(miaInfo->intervalAggOperatorInfo);
474,592✔
2123
  taosMemoryFreeClear(param);
474,592✔
2124
}
474,592✔
2125

2126
static SResultRow* doSetSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, SAggSupporter* pSup) {
420,056✔
2127
  SResultRow* pResult = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
420,056✔
2128
  if (NULL == pResult) {
420,056✔
UNCOV
2129
    return pResult;
×
2130
  }
2131
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
420,056✔
2132
  return pResult;
420,056✔
2133
}
2134

2135
static int32_t setSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult,
378,731,705✔
2136
                                       SExprSupp* pExprSup, SAggSupporter* pAggSup) {
2137
  if (*pResult == NULL) {
378,731,705✔
2138
    *pResult = doSetSingleOutputTupleBuf(pResultRowInfo, pAggSup);
420,056✔
2139
    if (*pResult == NULL) {
420,056✔
UNCOV
2140
      return terrno;
×
2141
    }
2142
  }
2143

2144
  // set time window for current result
2145
  (*pResult)->win = (*win);
378,731,887✔
2146
  return setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
378,731,705✔
2147
}
2148

2149
static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,
1,722,539✔
2150
                                          SSDataBlock* pBlock, SSDataBlock* pResultBlock) {
2151
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
1,722,539✔
2152
  SIntervalAggOperatorInfo*             iaInfo = miaInfo->intervalAggOperatorInfo;
1,722,539✔
2153

2154
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
1,722,539✔
2155
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
1,722,539✔
2156
  SInterval*     pInterval = &iaInfo->interval;
1,722,539✔
2157

2158
  int32_t  startPos = 0;
1,722,539✔
2159
  int64_t* tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
1,722,539✔
2160

2161
  TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
1,722,539✔
2162

2163
  // there is an result exists
2164
  if (miaInfo->curTs != INT64_MIN) {
1,722,539✔
2165
    if (ts != miaInfo->curTs) {
628,679✔
2166
      finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
597,801✔
2167
      resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
597,801✔
2168
      miaInfo->curTs = ts;
597,801✔
2169
    }
2170
  } else {
2171
    miaInfo->curTs = ts;
1,093,860✔
2172
  }
2173

2174
  STimeWindow win = {0};
1,722,539✔
2175
  win.skey = miaInfo->curTs;
1,722,539✔
2176
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
1,722,539✔
2177

2178
  int32_t ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
1,722,539✔
2179
  if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
1,722,539✔
UNCOV
2180
    T_LONG_JMP(pTaskInfo->env, ret);
×
2181
  }
2182

2183
  int32_t currPos = startPos;
1,722,539✔
2184

2185
  STimeWindow currWin = win;
1,722,539✔
2186
  while (++currPos < pBlock->info.rows) {
874,026,662✔
2187
    if (tsCols[currPos] == miaInfo->curTs) {
872,296,479✔
2188
      continue;
495,295,503✔
2189
    }
2190

2191
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
377,006,800✔
2192
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
753,995,240✔
2193
                                          currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
377,007,892✔
2194
    if (ret != TSDB_CODE_SUCCESS) {
377,005,162✔
UNCOV
2195
      T_LONG_JMP(pTaskInfo->env, ret);
×
2196
    }
2197

2198
    finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
377,005,162✔
2199
    resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
377,008,074✔
2200
    miaInfo->curTs = tsCols[currPos];
377,008,256✔
2201

2202
    currWin.skey = miaInfo->curTs;
377,008,984✔
2203
    currWin.ekey =
377,008,984✔
2204
        taosTimeAdd(currWin.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
377,008,438✔
2205

2206
    startPos = currPos;
377,008,984✔
2207
    ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
377,008,984✔
2208
    if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
377,008,620✔
UNCOV
2209
      T_LONG_JMP(pTaskInfo->env, ret);
×
2210
    }
2211

2212
    miaInfo->curTs = currWin.skey;
377,008,620✔
2213
  }
2214

2215
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
1,722,539✔
2216
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
3,443,906✔
2217
                                        currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
1,722,539✔
2218
  if (ret != TSDB_CODE_SUCCESS) {
1,722,539✔
UNCOV
2219
    T_LONG_JMP(pTaskInfo->env, ret);
×
2220
  }
2221
}
1,722,539✔
2222

2223
static void cleanupAfterGroupResultGen(SMergeAlignedIntervalAggOperatorInfo* pMiaInfo, SSDataBlock* pRes) {
1,088,471✔
2224
  pRes->info.id.groupId = pMiaInfo->groupId;
1,088,471✔
2225
  pMiaInfo->curTs = INT64_MIN;
1,088,471✔
2226
  pMiaInfo->groupId = 0;
1,088,471✔
2227
}
1,088,471✔
2228

2229
static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
1,543,116✔
2230
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
1,543,116✔
2231
  int32_t                               code = TSDB_CODE_SUCCESS;
1,543,116✔
2232
  int32_t                               lino = 0;
1,543,116✔
2233
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
1,543,116✔
2234
  SIntervalAggOperatorInfo*             pIaInfo = pMiaInfo->intervalAggOperatorInfo;
1,543,116✔
2235

2236
  SExprSupp*      pSup = &pOperator->exprSupp;
1,543,116✔
2237
  SSDataBlock*    pRes = pIaInfo->binfo.pRes;
1,543,116✔
2238
  SResultRowInfo* pResultRowInfo = &pIaInfo->binfo.resultRowInfo;
1,543,116✔
2239
  SOperatorInfo*  downstream = pOperator->pDownstream[0];
1,543,116✔
2240

2241
  while (1) {
1,322,430✔
2242
    SSDataBlock* pBlock = NULL;
2,865,546✔
2243
    if (pMiaInfo->prefetchedBlock == NULL) {
2,865,546✔
2244
      pBlock = getNextBlockFromDownstream(pOperator, 0);
2,191,742✔
2245
    } else {
2246
      pBlock = pMiaInfo->prefetchedBlock;
673,804✔
2247
      pMiaInfo->prefetchedBlock = NULL;
673,804✔
2248

2249
      pMiaInfo->groupId = pBlock->info.id.groupId;
673,804✔
2250
    }
2251

2252
    // no data exists, all query processing is done
2253
    if (pBlock == NULL) {
2,865,546✔
2254
      // close last unclosed time window
2255
      if (pMiaInfo->curTs != INT64_MIN) {
469,203✔
2256
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
414,667✔
2257
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
414,667✔
2258
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
414,667✔
2259
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
414,667✔
2260
        QUERY_CHECK_CODE(code, lino, _end);
414,667✔
2261
      }
2262

2263
      setOperatorCompleted(pOperator);
469,203✔
2264
      break;
469,203✔
2265
    }
2266

2267
    if (pMiaInfo->groupId == 0) {
2,396,343✔
2268
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
777,119✔
2269
        pMiaInfo->groupId = pBlock->info.id.groupId;
106,860✔
2270
        pRes->info.id.groupId = pMiaInfo->groupId;
106,860✔
2271
      }
2272
    } else {
2273
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
1,619,224✔
2274
        // if there are unclosed time window, close it firstly.
2275
        if (pMiaInfo->curTs == INT64_MIN) {
673,804✔
UNCOV
2276
          pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
UNCOV
2277
          T_LONG_JMP(pTaskInfo->env, terrno);
×
2278
        }
2279
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
673,804✔
2280
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
673,804✔
2281

2282
        pMiaInfo->prefetchedBlock = pBlock;
673,804✔
2283
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
673,804✔
2284
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
673,804✔
2285
        QUERY_CHECK_CODE(code, lino, _end);
673,804✔
2286
        if (pRes->info.rows == 0) {
673,804✔
2287
          // After filtering for last group, the result is empty, so we need to continue to process next group
2288
          continue;
4,658✔
2289
        } else {
2290
          break;
669,146✔
2291
        }
2292
      } else {
2293
        // continue
2294
        pRes->info.id.groupId = pMiaInfo->groupId;
945,420✔
2295
      }
2296
    }
2297

2298
    pRes->info.scanFlag = pBlock->info.scanFlag;
1,722,539✔
2299
    code = setInputDataBlock(pSup, pBlock, pIaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
1,722,539✔
2300
    QUERY_CHECK_CODE(code, lino, _end);
1,722,539✔
2301

2302
    doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes);
1,722,539✔
2303

2304
    code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
1,722,539✔
2305
    QUERY_CHECK_CODE(code, lino, _end);
1,722,539✔
2306

2307
    if (pRes->info.rows >= pOperator->resultInfo.capacity) {
1,722,539✔
2308
      break;
404,767✔
2309
    }
2310
  }
2311

2312
_end:
1,543,116✔
2313
  if (code != TSDB_CODE_SUCCESS) {
1,543,116✔
UNCOV
2314
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
2315
    pTaskInfo->code = code;
×
2316
    T_LONG_JMP(pTaskInfo->env, code);
×
2317
  }
2318
}
1,543,116✔
2319

2320
static int32_t mergeAlignedIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,779,803✔
2321
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
1,779,803✔
2322
  int32_t                               code = TSDB_CODE_SUCCESS;
1,779,803✔
2323
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
1,779,803✔
2324
  SIntervalAggOperatorInfo*             iaInfo = pMiaInfo->intervalAggOperatorInfo;
1,779,803✔
2325
  if (pOperator->status == OP_EXEC_DONE) {
1,779,803✔
2326
    (*ppRes) = NULL;
430,417✔
2327
    return code;
430,417✔
2328
  }
2329

2330
  SSDataBlock* pRes = iaInfo->binfo.pRes;
1,349,386✔
2331
  blockDataCleanup(pRes);
1,349,386✔
2332

2333
  if (iaInfo->binfo.mergeResultBlock) {
1,349,386✔
2334
    while (1) {
2335
      if (pOperator->status == OP_EXEC_DONE) {
1,189,164✔
2336
        break;
174,367✔
2337
      }
2338

2339
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
1,014,797✔
2340
        break;
323,350✔
2341
      }
2342

2343
      doMergeAlignedIntervalAgg(pOperator);
691,447✔
2344
    }
2345
  } else {
2346
    doMergeAlignedIntervalAgg(pOperator);
851,669✔
2347
  }
2348

2349
  size_t rows = pRes->info.rows;
1,349,386✔
2350
  pOperator->resultInfo.totalRows += rows;
1,349,386✔
2351
  (*ppRes) = (rows == 0) ? NULL : pRes;
1,349,386✔
2352
  return code;
1,349,386✔
2353
}
2354

UNCOV
2355
static int32_t resetMergeAlignedIntervalOperState(SOperatorInfo* pOper) {
×
UNCOV
2356
  SMergeAlignedIntervalAggOperatorInfo* pInfo = pOper->info;
×
2357
  
2358
  uint64_t     groupId;  // current groupId
2359
  int64_t      curTs;    // current ts
2360
  SSDataBlock* prefetchedBlock;
2361
  SResultRow*  pResultRow;
2362

UNCOV
2363
  pInfo->groupId = 0;
×
UNCOV
2364
  pInfo->curTs = INT64_MIN;
×
2365
  pInfo->prefetchedBlock = NULL;
×
2366
  pInfo->pResultRow = NULL;
×
2367

2368
  return resetInterval(pOper, pInfo->intervalAggOperatorInfo);
×
2369
}
2370

2371
int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
474,592✔
2372
                                               SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2373
  QRY_PARAM_CHECK(pOptrInfo);
474,592✔
2374

2375
  int32_t                               code = TSDB_CODE_SUCCESS;
474,592✔
2376
  int32_t                               lino = 0;
474,592✔
2377
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedIntervalAggOperatorInfo));
474,592✔
2378
  SOperatorInfo*                        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
474,592✔
2379
  if (miaInfo == NULL || pOperator == NULL) {
474,592✔
UNCOV
2380
    code = terrno;
×
UNCOV
2381
    goto _error;
×
2382
  }
2383

2384
  pOperator->pPhyNode = pNode;
474,592✔
2385
  miaInfo->intervalAggOperatorInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
474,592✔
2386
  if (miaInfo->intervalAggOperatorInfo == NULL) {
474,592✔
UNCOV
2387
    code = terrno;
×
UNCOV
2388
    goto _error;
×
2389
  }
2390

2391
  SInterval interval = {.interval = pNode->interval,
1,421,432✔
2392
                        .sliding = pNode->sliding,
474,592✔
2393
                        .intervalUnit = pNode->intervalUnit,
474,592✔
2394
                        .slidingUnit = pNode->slidingUnit,
474,592✔
2395
                        .offset = pNode->offset,
474,592✔
2396
                        .precision = ((SColumnNode*)pNode->window.pTspk)->node.resType.precision,
474,592✔
2397
                        .timeRange = pNode->timeRange};
2398
  calcIntervalAutoOffset(&interval);
474,592✔
2399

2400
  SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
474,592✔
2401
  SExprSupp*                pSup = &pOperator->exprSupp;
474,592✔
2402
  pSup->hasWindowOrGroup = true;
474,592✔
2403

2404
  code = filterInitFromNode((SNode*)pNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
474,592✔
2405
                            pTaskInfo->pStreamRuntimeInfo);
474,592✔
2406
  QUERY_CHECK_CODE(code, lino, _error);
474,592✔
2407

2408
  miaInfo->curTs = INT64_MIN;
474,592✔
2409
  iaInfo->win = pTaskInfo->window;
474,592✔
2410
  iaInfo->binfo.inputTsOrder = pNode->window.node.inputTsOrder;
474,592✔
2411
  iaInfo->binfo.outputTsOrder = pNode->window.node.outputTsOrder;
474,592✔
2412
  iaInfo->interval = interval;
474,592✔
2413
  iaInfo->primaryTsIndex = ((SColumnNode*)pNode->window.pTspk)->slotId;
474,592✔
2414
  iaInfo->binfo.mergeResultBlock = pNode->window.mergeDataBlock;
474,592✔
2415

2416
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
474,592✔
2417
  initResultSizeInfo(&pOperator->resultInfo, 512);
474,592✔
2418

2419
  int32_t    num = 0;
474,592✔
2420
  SExprInfo* pExprInfo = NULL;
474,592✔
2421
  code = createExprInfo(pNode->window.pFuncs, NULL, &pExprInfo, &num);
474,592✔
2422
  QUERY_CHECK_CODE(code, lino, _error);
474,592✔
2423

2424
  code = initAggSup(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
948,012✔
2425
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
474,592✔
2426
  QUERY_CHECK_CODE(code, lino, _error);
474,592✔
2427

2428
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pNode->window.node.pOutputDataBlockDesc);
474,592✔
2429
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
474,592✔
2430
  initBasicInfo(&iaInfo->binfo, pResBlock);
474,592✔
2431
  code = initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
474,592✔
2432
  QUERY_CHECK_CODE(code, lino, _error);
474,592✔
2433

2434
  iaInfo->timeWindowInterpo = false;
474,592✔
2435
  code = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo, &iaInfo->timeWindowInterpo);
474,592✔
2436
  QUERY_CHECK_CODE(code, lino, _error);
474,592✔
2437
  if (iaInfo->timeWindowInterpo) {
474,592✔
UNCOV
2438
    iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
2439
  }
2440

2441
  initResultRowInfo(&iaInfo->binfo.resultRowInfo);
474,592✔
2442
  code = blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity);
474,592✔
2443
  QUERY_CHECK_CODE(code, lino, _error);
474,592✔
2444
  iaInfo->pOperator = pOperator;
474,592✔
2445
  setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL,
474,592✔
2446
                  false, OP_NOT_OPENED, miaInfo, pTaskInfo);
2447

2448
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAggNext, NULL, destroyMAIOperatorInfo,
474,592✔
2449
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2450
  setOperatorResetStateFn(pOperator, resetMergeAlignedIntervalOperState);
474,592✔
2451

2452
  code = appendDownstream(pOperator, &downstream, 1);
474,592✔
2453
  QUERY_CHECK_CODE(code, lino, _error);
474,592✔
2454

2455
  *pOptrInfo = pOperator;
474,592✔
2456
  return TSDB_CODE_SUCCESS;
474,592✔
2457

UNCOV
2458
_error:
×
UNCOV
2459
  if (miaInfo != NULL) destroyMAIOperatorInfo(miaInfo);
×
2460
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2461
  pTaskInfo->code = code;
×
2462
  return code;
×
2463
}
2464

2465
//=====================================================================================================================
2466
// merge interval operator
2467
typedef struct SMergeIntervalAggOperatorInfo {
2468
  SIntervalAggOperatorInfo intervalAggOperatorInfo;
2469
  SList*                   groupIntervals;
2470
  SListIter                groupIntervalsIter;
2471
  bool                     hasGroupId;
2472
  uint64_t                 groupId;
2473
  SSDataBlock*             prefetchedBlock;
2474
  bool                     inputBlocksFinished;
2475
} SMergeIntervalAggOperatorInfo;
2476

2477
typedef struct SGroupTimeWindow {
2478
  uint64_t    groupId;
2479
  STimeWindow window;
2480
} SGroupTimeWindow;
2481

UNCOV
2482
void destroyMergeIntervalOperatorInfo(void* param) {
×
UNCOV
2483
  SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
×
2484
  miaInfo->groupIntervals = tdListFree(miaInfo->groupIntervals);
×
2485
  destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo);
×
2486

2487
  taosMemoryFreeClear(param);
×
UNCOV
2488
}
×
2489

2490
static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock,
×
2491
                                        STimeWindow* newWin) {
2492
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
UNCOV
2493
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2494
  bool                           ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
2495

2496
  SGroupTimeWindow groupTimeWindow = {.groupId = tableGroupId, .window = *newWin};
×
UNCOV
2497
  int32_t          code = tdListAppend(miaInfo->groupIntervals, &groupTimeWindow);
×
2498
  if (code != TSDB_CODE_SUCCESS) {
×
2499
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2500
    return code;
×
2501
  }
2502

UNCOV
2503
  SListIter iter = {0};
×
UNCOV
2504
  tdListInitIter(miaInfo->groupIntervals, &iter, TD_LIST_FORWARD);
×
2505
  SListNode* listNode = NULL;
×
2506
  while ((listNode = tdListNext(&iter)) != NULL) {
×
2507
    SGroupTimeWindow* prevGrpWin = (SGroupTimeWindow*)listNode->data;
×
2508
    if (prevGrpWin->groupId != tableGroupId) {
×
2509
      continue;
×
2510
    }
2511

UNCOV
2512
    STimeWindow* prevWin = &prevGrpWin->window;
×
UNCOV
2513
    if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) {
×
2514
      SListNode* tmp = tdListPopNode(miaInfo->groupIntervals, listNode);
×
2515
      taosMemoryFreeClear(tmp);
×
2516
    }
2517
  }
2518

UNCOV
2519
  return TSDB_CODE_SUCCESS;
×
2520
}
2521

UNCOV
2522
static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
×
2523
                                   int32_t scanFlag, SSDataBlock* pResultBlock) {
2524
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
UNCOV
2525
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2526

2527
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
×
UNCOV
2528
  SExprSupp*     pExprSup = &pOperatorInfo->exprSupp;
×
2529

2530
  int32_t     startPos = 0;
×
UNCOV
2531
  int32_t     numOfOutput = pExprSup->numOfExprs;
×
2532
  int64_t*    tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
×
2533
  uint64_t    tableGroupId = pBlock->info.id.groupId;
×
2534
  bool        ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
2535
  TSKEY       blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
×
2536
  SResultRow* pResult = NULL;
×
2537

2538
  STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
×
2539
                                        iaInfo->binfo.inputTsOrder);
2540

2541
  int32_t ret =
UNCOV
2542
      setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
2543
                             numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2544
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
×
UNCOV
2545
    T_LONG_JMP(pTaskInfo->env, ret);
×
2546
  }
2547

UNCOV
2548
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
×
UNCOV
2549
  int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
2550
                                                 iaInfo->binfo.inputTsOrder);
2551
  if (forwardRows <= 0) {
×
UNCOV
2552
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2553
  }
2554

2555
  // prev time window not interpolation yet.
UNCOV
2556
  if (iaInfo->timeWindowInterpo) {
×
UNCOV
2557
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
×
2558
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
×
2559

2560
    // restore current time window
UNCOV
2561
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
2562
                                 numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2563
    if (ret != TSDB_CODE_SUCCESS) {
×
UNCOV
2564
      T_LONG_JMP(pTaskInfo->env, ret);
×
2565
    }
2566

2567
    // window start key interpolation
UNCOV
2568
    ret = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &win, startPos, forwardRows, pExprSup);
×
UNCOV
2569
    if (ret != TSDB_CODE_SUCCESS) {
×
2570
      T_LONG_JMP(pTaskInfo->env, ret);
×
2571
    }
2572
  }
2573

UNCOV
2574
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, 1);
×
UNCOV
2575
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
×
2576
                                        forwardRows, pBlock->info.rows, numOfOutput);
×
2577
  if (ret != TSDB_CODE_SUCCESS) {
×
2578
    T_LONG_JMP(pTaskInfo->env, ret);
×
2579
  }
2580
  doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
2581

2582
  // output previous interval results after this interval (&win) is closed
UNCOV
2583
  int32_t code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &win);
×
UNCOV
2584
  if (code != TSDB_CODE_SUCCESS) {
×
2585
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2586
    T_LONG_JMP(pTaskInfo->env, code);
×
2587
  }
2588

UNCOV
2589
  STimeWindow nextWin = win;
×
UNCOV
2590
  while (1) {
×
2591
    int32_t prevEndPos = forwardRows - 1 + startPos;
×
2592
    startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos,
×
2593
                                      iaInfo->binfo.inputTsOrder);
2594
    if (startPos < 0) {
×
UNCOV
2595
      break;
×
2596
    }
2597

2598
    // null data, failed to allocate more memory buffer
2599
    code =
UNCOV
2600
        setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
×
2601
                               pExprSup->pCtx, numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2602
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
×
UNCOV
2603
      T_LONG_JMP(pTaskInfo->env, code);
×
2604
    }
2605

UNCOV
2606
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
×
UNCOV
2607
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
2608
                                           iaInfo->binfo.inputTsOrder);
2609

2610
    // window start(end) key interpolation
UNCOV
2611
    code = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup);
×
UNCOV
2612
    if (code != TSDB_CODE_SUCCESS) {
×
2613
      T_LONG_JMP(pTaskInfo->env, code);
×
2614
    }
2615

UNCOV
2616
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, 1);
×
UNCOV
2617
    code = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
×
2618
                                           forwardRows, pBlock->info.rows, numOfOutput);
×
2619
    if (code != TSDB_CODE_SUCCESS) {
×
2620
      T_LONG_JMP(pTaskInfo->env, code);
×
2621
    }
2622
    doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
2623

2624
    // output previous interval results after this interval (&nextWin) is closed
UNCOV
2625
    code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &nextWin);
×
UNCOV
2626
    if (code != TSDB_CODE_SUCCESS) {
×
2627
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2628
      T_LONG_JMP(pTaskInfo->env, code);
×
2629
    }
2630
  }
2631

UNCOV
2632
  if (iaInfo->timeWindowInterpo) {
×
UNCOV
2633
    saveDataBlockLastRow(iaInfo->pPrevValues, pBlock, iaInfo->pInterpCols);
×
2634
  }
2635
}
×
2636

2637
static int32_t doMergeIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
UNCOV
2638
  int32_t        code = TSDB_CODE_SUCCESS;
×
2639
  int32_t        lino = 0;
×
2640
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
×
2641

2642
  SMergeIntervalAggOperatorInfo* miaInfo = pOperator->info;
×
UNCOV
2643
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2644
  SExprSupp*                     pExpSupp = &pOperator->exprSupp;
×
2645

2646
  if (pOperator->status == OP_EXEC_DONE) {
×
UNCOV
2647
    (*ppRes) = NULL;
×
2648
    return code;
×
2649
  }
2650

UNCOV
2651
  SSDataBlock* pRes = iaInfo->binfo.pRes;
×
UNCOV
2652
  blockDataCleanup(pRes);
×
2653
  code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
×
2654
  QUERY_CHECK_CODE(code, lino, _end);
×
2655

2656
  if (!miaInfo->inputBlocksFinished) {
×
UNCOV
2657
    SOperatorInfo* downstream = pOperator->pDownstream[0];
×
2658
    while (1) {
×
2659
      SSDataBlock* pBlock = NULL;
×
2660
      if (miaInfo->prefetchedBlock == NULL) {
×
2661
        pBlock = getNextBlockFromDownstream(pOperator, 0);
×
2662
      } else {
2663
        pBlock = miaInfo->prefetchedBlock;
×
UNCOV
2664
        miaInfo->groupId = pBlock->info.id.groupId;
×
2665
        miaInfo->prefetchedBlock = NULL;
×
2666
      }
2667

UNCOV
2668
      if (pBlock == NULL) {
×
UNCOV
2669
        tdListInitIter(miaInfo->groupIntervals, &miaInfo->groupIntervalsIter, TD_LIST_FORWARD);
×
2670
        miaInfo->inputBlocksFinished = true;
×
2671
        break;
×
2672
      }
2673

UNCOV
2674
      if (!miaInfo->hasGroupId) {
×
UNCOV
2675
        miaInfo->hasGroupId = true;
×
2676
        miaInfo->groupId = pBlock->info.id.groupId;
×
2677
      } else if (miaInfo->groupId != pBlock->info.id.groupId) {
×
2678
        miaInfo->prefetchedBlock = pBlock;
×
2679
        break;
×
2680
      }
2681

UNCOV
2682
      pRes->info.scanFlag = pBlock->info.scanFlag;
×
UNCOV
2683
      code = setInputDataBlock(pExpSupp, pBlock, iaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
×
2684
      QUERY_CHECK_CODE(code, lino, _end);
×
2685

2686
      doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, pBlock->info.scanFlag, pRes);
×
2687

2688
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
×
UNCOV
2689
        break;
×
2690
      }
2691
    }
2692

UNCOV
2693
    pRes->info.id.groupId = miaInfo->groupId;
×
2694
  }
2695

UNCOV
2696
  if (miaInfo->inputBlocksFinished) {
×
UNCOV
2697
    SListNode* listNode = tdListNext(&miaInfo->groupIntervalsIter);
×
2698

2699
    if (listNode != NULL) {
×
UNCOV
2700
      SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data);
×
2701
      pRes->info.id.groupId = grpWin->groupId;
×
2702
    }
2703
  }
2704

UNCOV
2705
  if (pRes->info.rows == 0) {
×
UNCOV
2706
    setOperatorCompleted(pOperator);
×
2707
  }
2708

UNCOV
2709
  size_t rows = pRes->info.rows;
×
UNCOV
2710
  pOperator->resultInfo.totalRows += rows;
×
2711

2712
_end:
×
UNCOV
2713
  if (code != TSDB_CODE_SUCCESS) {
×
2714
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2715
    pTaskInfo->code = code;
×
2716
    T_LONG_JMP(pTaskInfo->env, code);
×
2717
  }
2718
  (*ppRes) = (rows == 0) ? NULL : pRes;
×
UNCOV
2719
  return code;
×
2720
}
2721

UNCOV
2722
static int32_t resetMergeIntervalOperState(SOperatorInfo* pOper) {
×
UNCOV
2723
  SMergeIntervalAggOperatorInfo* pInfo = pOper->info;
×
2724

2725
  pInfo->hasGroupId = false;
×
UNCOV
2726
  pInfo->groupId = 0;
×
2727
  pInfo->prefetchedBlock = NULL;
×
2728
  pInfo->inputBlocksFinished = false;
×
2729
  tdListEmpty(pInfo->groupIntervals);
×
2730
  
2731
  SIntervalAggOperatorInfo* pIntervalInfo = &pInfo->intervalAggOperatorInfo;
×
UNCOV
2732
  return resetInterval(pOper, pIntervalInfo);
×
2733
}
2734

UNCOV
2735
int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
×
2736
                                        SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2737
  QRY_PARAM_CHECK(pOptrInfo);
×
2738

2739
  int32_t                        code = TSDB_CODE_SUCCESS;
×
UNCOV
2740
  int32_t                        lino = 0;
×
2741
  SMergeIntervalAggOperatorInfo* pMergeIntervalInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo));
×
2742
  SOperatorInfo*                 pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
2743
  if (pMergeIntervalInfo == NULL || pOperator == NULL) {
×
2744
    code = terrno;
×
2745
    goto _error;
×
2746
  }
2747

UNCOV
2748
  pOperator->pPhyNode = pIntervalPhyNode;
×
UNCOV
2749
  SInterval interval = {.interval = pIntervalPhyNode->interval,
×
2750
                        .sliding = pIntervalPhyNode->sliding,
×
2751
                        .intervalUnit = pIntervalPhyNode->intervalUnit,
×
2752
                        .slidingUnit = pIntervalPhyNode->slidingUnit,
×
2753
                        .offset = pIntervalPhyNode->offset,
×
2754
                        .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision,
×
2755
                        .timeRange = pIntervalPhyNode->timeRange};
2756
  calcIntervalAutoOffset(&interval);
×
2757

2758
  pMergeIntervalInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow));
×
2759

2760
  SIntervalAggOperatorInfo* pIntervalInfo = &pMergeIntervalInfo->intervalAggOperatorInfo;
×
UNCOV
2761
  pIntervalInfo->win = pTaskInfo->window;
×
2762
  pIntervalInfo->binfo.inputTsOrder = pIntervalPhyNode->window.node.inputTsOrder;
×
2763
  pIntervalInfo->interval = interval;
×
2764
  pIntervalInfo->binfo.mergeResultBlock = pIntervalPhyNode->window.mergeDataBlock;
×
2765
  pIntervalInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
×
2766
  pIntervalInfo->binfo.outputTsOrder = pIntervalPhyNode->window.node.outputTsOrder;
×
2767

2768
  SExprSupp* pExprSupp = &pOperator->exprSupp;
×
UNCOV
2769
  pExprSupp->hasWindowOrGroup = true;
×
2770

2771
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
×
UNCOV
2772
  initResultSizeInfo(&pOperator->resultInfo, 4096);
×
2773

2774
  int32_t    num = 0;
×
UNCOV
2775
  SExprInfo* pExprInfo = NULL;
×
2776
  code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
×
2777
  QUERY_CHECK_CODE(code, lino, _error);
×
2778

2779
  code = initAggSup(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
×
UNCOV
2780
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
×
2781
  if (code != TSDB_CODE_SUCCESS) {
×
2782
    goto _error;
×
2783
  }
2784

UNCOV
2785
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pIntervalPhyNode->window.node.pOutputDataBlockDesc);
×
UNCOV
2786
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
×
2787
  initBasicInfo(&pIntervalInfo->binfo, pResBlock);
×
2788
  code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win);
×
2789
  QUERY_CHECK_CODE(code, lino, _error);
×
2790

2791
  pIntervalInfo->timeWindowInterpo = false;
×
UNCOV
2792
  code = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo, &pIntervalInfo->timeWindowInterpo);
×
2793
  QUERY_CHECK_CODE(code, lino, _error);
×
2794
  if (pIntervalInfo->timeWindowInterpo) {
×
2795
    pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
2796
    if (pIntervalInfo->binfo.resultRowInfo.openWindow == NULL) {
×
2797
      goto _error;
×
2798
    }
2799
  }
2800

UNCOV
2801
  pIntervalInfo->pOperator = pOperator;
×
UNCOV
2802
  initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo);
×
2803
  setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false,
×
2804
                  OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo);
2805
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAggNext, NULL, destroyMergeIntervalOperatorInfo,
×
2806
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2807
  setOperatorResetStateFn(pOperator, resetMergeIntervalOperState);
×
2808

2809
  code = appendDownstream(pOperator, &downstream, 1);
×
UNCOV
2810
  if (code != TSDB_CODE_SUCCESS) {
×
2811
    goto _error;
×
2812
  }
2813

UNCOV
2814
  *pOptrInfo = pOperator;
×
UNCOV
2815
  return TSDB_CODE_SUCCESS;
×
2816
_error:
×
2817
  if (pMergeIntervalInfo != NULL) {
×
2818
    destroyMergeIntervalOperatorInfo(pMergeIntervalInfo);
×
2819
  }
2820
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
UNCOV
2821
  pTaskInfo->code = code;
×
2822
  return code;
×
2823
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc