• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4423

04 Jul 2025 08:20AM UTC coverage: 63.643% (+2.6%) from 61.007%
#4423

push

travis-ci

GitHub
Merge pull request #31575 from taosdata/fix/huoh/taos_log

160637 of 321218 branches covered (50.01%)

Branch coverage included in aggregate %.

247567 of 320175 relevant lines covered (77.32%)

16621050.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.0
/source/libs/executor/src/timewindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "executorInt.h"
16
#include "filter.h"
17
#include "function.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "taoserror.h"
23
#include "tchecksum.h"
24
#include "tcommon.h"
25
#include "tcompare.h"
26
#include "tdatablock.h"
27
#include "tfill.h"
28
#include "tglobal.h"
29
#include "tlog.h"
30
#include "ttime.h"
31

32
typedef enum SResultTsInterpType {
33
  RESULT_ROW_START_INTERP = 1,
34
  RESULT_ROW_END_INTERP = 2,
35
} SResultTsInterpType;
36

37
typedef struct SOpenWindowInfo {
38
  SResultRowPosition pos;
39
  uint64_t           groupId;
40
} SOpenWindowInfo;
41

42
static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo);
43

44
static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult,
45
                                              uint64_t groupId, SExecTaskInfo* pTaskInfo);
46
static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult);
47

48
static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, bool masterscan,
209,811,567✔
49
                                      SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
50
                                      int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
51
                                      SExecTaskInfo* pTaskInfo) {
52
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
209,811,567✔
53
                                                  masterscan, tableGroupId, pTaskInfo, true, pAggSup, true);
54

55
  if (pResultRow == NULL || pTaskInfo->code != 0) {
210,646,577!
56
    *pResult = NULL;
×
57
    qError("failed to set result output buffer, error:%s", tstrerror(pTaskInfo->code));
×
58
    return pTaskInfo->code;
1✔
59
  }
60

61
  // set time window for current result
62
  TAOS_SET_POBJ_ALIGNED(&pResultRow->win, win);
210,767,257✔
63
  *pResult = pResultRow;
210,767,257✔
64
  return setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
210,767,257✔
65
}
66

67
void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) {
89,815,538✔
68
  pRowSup->win.ekey = ts;
89,815,538✔
69
  pRowSup->prevTs = ts;
89,815,538✔
70
  pRowSup->numOfRows += 1;
89,815,538✔
71
  pRowSup->groupId = groupId;
89,815,538✔
72
}
89,815,538✔
73

74
void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex,
74,017,157✔
75
                                     uint64_t groupId) {
76
  pRowSup->startRowIndex = rowIndex;
74,017,157✔
77
  pRowSup->numOfRows = 0;
74,017,157✔
78
  pRowSup->win.skey = tsList[rowIndex];
74,017,157✔
79
  pRowSup->groupId = groupId;
74,017,157✔
80
}
74,017,157✔
81

82
FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos,
×
83
                                            int32_t order, int64_t* pData) {
84
  int32_t forwardRows = 0;
192,582,028✔
85

86
  if (order == TSDB_ORDER_ASC) {
×
87
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
157,340,136✔
88
    if (end >= 0) {
155,439,150!
89
      forwardRows = end;
155,228,586✔
90

91
      while (pData[end + pos] == ekey) {
156,472,880!
92
        forwardRows += 1;
1,244,294✔
93
        ++pos;
1,244,294✔
94
      }
95
    }
96
  } else {
97
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
35,241,892✔
98
    if (end >= 0) {
36,206,994!
99
      forwardRows = end;
36,206,596✔
100

101
      while (pData[end + pos] == ekey) {
53,264,457!
102
        forwardRows += 1;
17,057,861✔
103
        ++pos;
17,057,861✔
104
      }
105
    }
106
    //    int32_t end = searchFn((char*)pData, pos + 1, ekey, order);
107
    //    if (end >= 0) {
108
    //      forwardRows = pos - end;
109
    //
110
    //      if (pData[end] == ekey) {
111
    //        forwardRows += 1;
112
    //      }
113
    //    }
114
  }
115

116
  return forwardRows;
191,646,144✔
117
}
118

119
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
271,738,517✔
120
  int32_t midPos = -1;
271,738,517✔
121
  int32_t numOfRows;
122

123
  if (num <= 0) {
271,738,517✔
124
    return -1;
331,556✔
125
  }
126

127
  TSKEY*  keyList = (TSKEY*)pValue;
271,406,961✔
128
  int32_t firstPos = 0;
271,406,961✔
129
  int32_t lastPos = num - 1;
271,406,961✔
130

131
  if (order == TSDB_ORDER_DESC) {
271,406,961✔
132
    // find the first position which is smaller than the key
133
    while (1) {
134
      if (key >= keyList[firstPos]) return firstPos;
242,176,598✔
135
      if (key == keyList[lastPos]) return lastPos;
218,463,272✔
136

137
      if (key < keyList[lastPos]) {
218,092,111✔
138
        lastPos += 1;
27,266,832✔
139
        if (lastPos >= num) {
27,266,832!
140
          return -1;
×
141
        } else {
142
          return lastPos;
27,266,832✔
143
        }
144
      }
145

146
      numOfRows = lastPos - firstPos + 1;
190,825,279✔
147
      midPos = (numOfRows >> 1) + firstPos;
190,825,279✔
148

149
      if (key < keyList[midPos]) {
190,825,279✔
150
        firstPos = midPos + 1;
39,846,790✔
151
      } else if (key > keyList[midPos]) {
150,978,489✔
152
        lastPos = midPos - 1;
150,395,300✔
153
      } else {
154
        break;
583,189✔
155
      }
156
    }
157

158
  } else {
159
    // find the first position which is bigger than the key
160
    while (1) {
161
      if (key <= keyList[firstPos]) return firstPos;
1,613,617,919✔
162
      if (key == keyList[lastPos]) return lastPos;
1,554,464,494✔
163

164
      if (key > keyList[lastPos]) {
1,553,203,333✔
165
        lastPos = lastPos + 1;
156,554,901✔
166
        if (lastPos >= num)
156,554,901!
167
          return -1;
×
168
        else
169
          return lastPos;
156,554,901✔
170
      }
171

172
      numOfRows = lastPos - firstPos + 1;
1,396,648,432✔
173
      midPos = (numOfRows >> 1u) + firstPos;
1,396,648,432✔
174

175
      if (key < keyList[midPos]) {
1,396,648,432✔
176
        lastPos = midPos - 1;
1,160,956,424✔
177
      } else if (key > keyList[midPos]) {
235,692,008✔
178
        firstPos = midPos + 1;
233,189,042✔
179
      } else {
180
        break;
2,502,966✔
181
      }
182
    }
183
  }
184

185
  return midPos;
3,086,155✔
186
}
187

188
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
196,689,526✔
189
                                 __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order) {
190
  int32_t num = -1;
196,689,526✔
191
  int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
196,689,526✔
192

193
  if (order == TSDB_ORDER_ASC) {
196,689,526✔
194
    if (ekey < pDataBlockInfo->window.ekey && pPrimaryColumn) {
160,127,561!
195
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
156,438,335!
196
      if (item != NULL) {
155,439,150!
197
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
198
      }
199
    } else {
200
      num = pDataBlockInfo->rows - startPos;
3,689,226✔
201
      if (item != NULL) {
3,689,226!
202
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
203
      }
204
    }
205
  } else {  // desc
206
    if (ekey > pDataBlockInfo->window.skey && pPrimaryColumn) {
36,561,965!
207
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
36,143,693!
208
      if (item != NULL) {
36,206,994!
209
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
210
      }
211
    } else {
212
      num = pDataBlockInfo->rows - startPos;
418,272✔
213
      if (item != NULL) {
418,272!
214
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
215
      }
216
    }
217
  }
218

219
  return num;
195,753,642✔
220
}
221

222
void doTimeWindowInterpolation(SArray* pPrevValues, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs,
9,222,162✔
223
                               int32_t curRowIndex, TSKEY windowKey, int32_t type, SExprSupp* pSup) {
224
  SqlFunctionCtx* pCtx = pSup->pCtx;
9,222,162✔
225

226
  int32_t index = 1;
9,222,162✔
227
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
19,211,725✔
228
    if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) {
9,987,318✔
229
      pCtx[k].start.key = INT64_MIN;
757,713✔
230
      continue;
757,713✔
231
    }
232

233
    SFunctParam*     pParam = &pCtx[k].param[0];
9,230,971✔
234
    SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, pParam->pCol->slotId);
9,230,971✔
235

236
    double v1 = 0, v2 = 0, v = 0;
9,226,040✔
237
    if (prevRowIndex == -1) {
9,226,040!
238
      SGroupKeys* p = taosArrayGet(pPrevValues, index);
×
239
      GET_TYPED_DATA(v1, double, pColInfo->info.type, p->pData, typeGetTypeModFromColInfo(&pColInfo->info));
346!
240
    } else {
241
      GET_TYPED_DATA(v1, double, pColInfo->info.type, colDataGetData(pColInfo, prevRowIndex), typeGetTypeModFromColInfo(&pColInfo->info));
9,233,609!
242
    }
243

244
    GET_TYPED_DATA(v2, double, pColInfo->info.type, colDataGetData(pColInfo, curRowIndex), typeGetTypeModFromColInfo(&pColInfo->info));
9,233,955!
245

246
#if 0
247
    if (functionId == FUNCTION_INTERP) {
248
      if (type == RESULT_ROW_START_INTERP) {
249
        pCtx[k].start.key = prevTs;
250
        pCtx[k].start.val = v1;
251

252
        pCtx[k].end.key = curTs;
253
        pCtx[k].end.val = v2;
254

255
        if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_VARBINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR ||
256
            pColInfo->info.type == TSDB_DATA_TYPE_GEOMETRY) {
257
          if (prevRowIndex == -1) {
258
            //            pCtx[k].start.ptr = (char*)pRuntimeEnv->prevRow[index];
259
          } else {
260
            pCtx[k].start.ptr = (char*)pColInfo->pData + prevRowIndex * pColInfo->info.bytes;
261
          }
262

263
          pCtx[k].end.ptr = (char*)pColInfo->pData + curRowIndex * pColInfo->info.bytes;
264
        }
265
      }
266
    } else if (functionId == FUNCTION_TWA) {
267
#endif
268

269
    SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
9,233,955✔
270
    SPoint point2 = (SPoint){.key = curTs, .val = &v2};
9,233,955✔
271
    SPoint point = (SPoint){.key = windowKey, .val = &v};
9,233,955✔
272

273
    if (!fmIsElapsedFunc(pCtx[k].functionId)) {
9,233,955✔
274
      taosGetLinearInterpolationVal(&point, TSDB_DATA_TYPE_DOUBLE, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, 0);
6,064,359✔
275
    }
276

277
    if (type == RESULT_ROW_START_INTERP) {
9,231,850✔
278
      pCtx[k].start.key = point.key;
4,486,636✔
279
      pCtx[k].start.val = v;
4,486,636✔
280
    } else {
281
      pCtx[k].end.key = point.key;
4,745,214✔
282
      pCtx[k].end.val = v;
4,745,214✔
283
    }
284

285
    index += 1;
9,231,850✔
286
  }
287
#if 0
288
  }
289
#endif
290
}
9,224,407✔
291

292
static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t type) {
613,271✔
293
  if (type == RESULT_ROW_START_INTERP) {
613,271✔
294
    for (int32_t k = 0; k < numOfOutput; ++k) {
926,829✔
295
      pCtx[k].start.key = INT64_MIN;
490,929✔
296
    }
297
  } else {
298
    for (int32_t k = 0; k < numOfOutput; ++k) {
386,583✔
299
      pCtx[k].end.key = INT64_MIN;
209,212✔
300
    }
301
  }
302
}
613,271✔
303

304
static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, int32_t pos, SSDataBlock* pBlock,
4,904,463✔
305
                                              const TSKEY* tsCols, STimeWindow* win, SExprSupp* pSup) {
306
  bool ascQuery = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
4,904,463✔
307

308
  TSKEY curTs = tsCols[pos];
4,904,463✔
309

310
  SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
4,904,463✔
311
  TSKEY       lastTs = *(int64_t*)pTsKey->pData;
4,903,841✔
312

313
  // lastTs == INT64_MIN and pos == 0 means this is the first time window, interpolation is not needed.
314
  // start exactly from this point, no need to do interpolation
315
  TSKEY key = ascQuery ? win->skey : win->ekey;
4,903,841!
316
  if (key == curTs) {
4,903,841✔
317
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
321,950✔
318
    return true;
321,950✔
319
  }
320

321
  // it is the first time window, no need to do interpolation
322
  if (pTsKey->isNull && pos == 0) {
4,581,891!
323
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
101,332✔
324
  } else {
325
    TSKEY prevTs = ((pos == 0) ? lastTs : tsCols[pos - 1]);
4,480,559!
326
    doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, pos - 1, curTs, pos, key,
4,480,559✔
327
                              RESULT_ROW_START_INTERP, pSup);
328
  }
329

330
  return true;
4,584,206✔
331
}
332

333
static int32_t setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex,
4,918,813✔
334
                                            int32_t nextRowIndex, SArray* pDataBlock, const TSKEY* tsCols,
335
                                            TSKEY blockEkey, STimeWindow* win, bool* pRes) {
336
  int32_t code = TSDB_CODE_SUCCESS;
4,918,813✔
337
  int32_t lino = 0;
4,918,813✔
338
  int32_t order = pInfo->binfo.inputTsOrder;
4,918,813✔
339

340
  TSKEY actualEndKey = tsCols[endRowIndex];
4,918,813✔
341
  TSKEY key = (order == TSDB_ORDER_ASC) ? win->ekey : win->skey;
4,918,813!
342

343
  // not ended in current data block, do not invoke interpolation
344
  if ((key > blockEkey && (order == TSDB_ORDER_ASC)) || (key < blockEkey && (order == TSDB_ORDER_DESC))) {
4,918,813!
345
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
115,343✔
346
    (*pRes) = false;
115,586✔
347
    return code;
115,586✔
348
  }
349

350
  // there is actual end point of current time window, no interpolation needs
351
  if (key == actualEndKey) {
4,803,470✔
352
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
61,789✔
353
    (*pRes) = true;
61,789✔
354
    return code;
61,789✔
355
  }
356

357
  if (nextRowIndex < 0) {
4,741,681!
358
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
359
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
360
  }
361

362
  TSKEY nextKey = tsCols[nextRowIndex];
4,741,681✔
363
  doTimeWindowInterpolation(pInfo->pPrevValues, pDataBlock, actualEndKey, endRowIndex, nextKey, nextRowIndex, key,
4,741,681✔
364
                            RESULT_ROW_END_INTERP, pSup);
365
  (*pRes) = true;
4,741,146✔
366
  return code;
4,741,146✔
367
}
368

369
bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd, EStreamType blockType) {
203,752,705✔
370
  if (pInterval->interval != pInterval->sliding &&
203,752,705✔
371
      ((pWin->ekey < calStart || pWin->skey > calEnd) || (blockType == STREAM_PULL_DATA && pWin->skey < calStart))) {
81,494,846!
372
    return false;
75✔
373
  }
374

375
  return true;
203,752,630✔
376
}
377

378
bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo) {
203,237,313✔
379
  return inCalSlidingWindow(pInterval, pWin, pBlockInfo->calWin.skey, pBlockInfo->calWin.ekey, pBlockInfo->type);
203,237,313✔
380
}
381

382
int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
198,090,070✔
383
                               TSKEY* primaryKeys, int32_t prevPosition, int32_t order) {
384
  bool ascQuery = (order == TSDB_ORDER_ASC);
198,090,070✔
385

386
  int32_t precision = pInterval->precision;
198,090,070✔
387
  getNextTimeWindow(pInterval, pNext, order);
198,090,070✔
388

389
  // next time window is not in current block
390
  if ((pNext->skey > pDataBlockInfo->window.ekey && order == TSDB_ORDER_ASC) ||
197,628,338!
391
      (pNext->ekey < pDataBlockInfo->window.skey && order == TSDB_ORDER_DESC)) {
195,485,288✔
392
    return -1;
2,397,935✔
393
  }
394

395
  if (!inSlidingWindow(pInterval, pNext, pDataBlockInfo) && order == TSDB_ORDER_ASC) {
195,230,403!
396
    return -1;
30✔
397
  }
398

399
  TSKEY   skey = ascQuery ? pNext->skey : pNext->ekey;
195,491,521✔
400
  int32_t startPos = 0;
195,491,521✔
401

402
  // tumbling time window query, a special case of sliding time window query
403
  if (pInterval->sliding == pInterval->interval && prevPosition != -1) {
195,491,521!
404
    startPos = prevPosition + 1;
114,154,776✔
405
  } else {
406
    if ((skey <= pDataBlockInfo->window.skey && ascQuery) || (skey >= pDataBlockInfo->window.ekey && !ascQuery)) {
81,336,745!
407
      startPos = 0;
2,156,136✔
408
    } else {
409
      startPos = binarySearchForKey((char*)primaryKeys, pDataBlockInfo->rows, skey, order);
79,180,609✔
410
    }
411
  }
412

413
  /* interp query with fill should not skip time window */
414
  //  if (pQueryAttr->pointInterpQuery && pQueryAttr->fillType != TSDB_FILL_NONE) {
415
  //    return startPos;
416
  //  }
417

418
  /*
419
   * This time window does not cover any data, try next time window,
420
   * this case may happen when the time window is too small
421
   */
422
  if (primaryKeys != NULL) {
195,091,071✔
423
    if (ascQuery && primaryKeys[startPos] > pNext->ekey) {
259,261,282✔
424
      TSKEY next = primaryKeys[startPos];
64,138,816✔
425
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
64,138,816!
426
        pNext->skey = taosTimeTruncate(next, pInterval);
306,315✔
427
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
331,562✔
428
      } else {
429
        pNext->ekey += ((next - pNext->ekey + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
63,832,501✔
430
        pNext->skey = pNext->ekey - pInterval->interval + 1;
63,832,501✔
431
      }
432
    } else if ((!ascQuery) && primaryKeys[startPos] < pNext->skey) {
130,941,279✔
433
      TSKEY next = primaryKeys[startPos];
21,785,875✔
434
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
21,785,875!
435
        pNext->skey = taosTimeTruncate(next, pInterval);
×
436
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
×
437
      } else {
438
        pNext->skey -= ((pNext->skey - next + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
21,810,396✔
439
        pNext->ekey = pNext->skey + pInterval->interval - 1;
21,810,396✔
440
      }
441
    }
442
  }
443

444
  return startPos;
195,157,963✔
445
}
446

447
static bool isResultRowInterpolated(SResultRow* pResult, SResultTsInterpType type) {
14,747,631✔
448
  if (type == RESULT_ROW_START_INTERP) {
14,747,631✔
449
    return pResult->startInterp == true;
4,916,767✔
450
  } else {
451
    return pResult->endInterp == true;
9,830,864✔
452
  }
453
}
454

455
static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
9,708,273✔
456
  if (type == RESULT_ROW_START_INTERP) {
9,708,273✔
457
    pResult->startInterp = true;
4,906,274✔
458
  } else {
459
    pResult->endInterp = true;
4,801,999✔
460
  }
461
}
9,708,273✔
462

463
static int32_t doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataBlock* pBlock, SResultRow* pResult,
186,818,979✔
464
                                           STimeWindow* win, int32_t startPos, int32_t forwardRows, SExprSupp* pSup) {
465
  int32_t code = TSDB_CODE_SUCCESS;
186,818,979✔
466
  int32_t lino = 0;
186,818,979✔
467
  if (!pInfo->timeWindowInterpo) {
186,818,979✔
468
    return code;
182,291,615✔
469
  }
470

471
  if (pBlock == NULL) {
4,527,364!
472
    code = TSDB_CODE_INVALID_PARA;
×
473
    return code;
×
474
  }
475

476
  if (pBlock->pDataBlock == NULL) {
4,527,364!
477
    return code;
×
478
  }
479

480
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
4,527,364✔
481

482
  TSKEY* tsCols = (TSKEY*)(pColInfo->pData);
4,916,775✔
483
  bool   done = isResultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
4,916,775✔
484
  if (!done) {  // it is not interpolated, now start to generated the interpolated value
4,916,735✔
485
    bool interp = setTimeWindowInterpolationStartTs(pInfo, startPos, pBlock, tsCols, win, pSup);
4,904,460✔
486
    if (interp) {
4,905,999!
487
      setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
4,906,007✔
488
    }
489
  } else {
490
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
12,275✔
491
  }
492

493
  // point interpolation does not require the end key time window interpolation.
494
  // interpolation query does not generate the time window end interpolation
495
  done = isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
4,918,500✔
496
  if (!done) {
4,918,509✔
497
    int32_t endRowIndex = startPos + forwardRows - 1;
4,918,502✔
498
    int32_t nextRowIndex = endRowIndex + 1;
4,918,502✔
499

500
    // duplicated ts row does not involve in the interpolation of end value for current time window
501
    int32_t x = endRowIndex;
4,918,502✔
502
    while (x > 0) {
4,918,520✔
503
      if (tsCols[x] == tsCols[x - 1]) {
4,884,191✔
504
        x -= 1;
18✔
505
      } else {
506
        endRowIndex = x;
4,884,173✔
507
        break;
4,884,173✔
508
      }
509
    }
510

511
    TSKEY endKey = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey;
4,918,502!
512
    bool  interp = false;
4,918,502✔
513
    code = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, nextRowIndex, pBlock->pDataBlock, tsCols,
4,918,502✔
514
                                           endKey, win, &interp);
515
    QUERY_CHECK_CODE(code, lino, _end);
4,918,484!
516
    if (interp) {
4,918,484✔
517
      setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
4,802,915✔
518
    }
519
  } else {
520
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
7✔
521
  }
522

523
_end:
4,918,669✔
524
  if (code != TSDB_CODE_SUCCESS) {
4,918,669!
525
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
526
  }
527
  return code;
4,918,666✔
528
}
529

530
static void saveDataBlockLastRow(SArray* pPrevKeys, const SSDataBlock* pBlock, SArray* pCols) {
41,514✔
531
  if (pBlock->pDataBlock == NULL) {
41,514!
532
    return;
×
533
  }
534

535
  size_t num = taosArrayGetSize(pPrevKeys);
41,514✔
536
  for (int32_t k = 0; k < num; ++k) {
125,282✔
537
    SColumn* pc = taosArrayGet(pCols, k);
83,768✔
538

539
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pc->slotId);
83,768✔
540

541
    SGroupKeys* pkey = taosArrayGet(pPrevKeys, k);
83,768✔
542
    for (int32_t i = pBlock->info.rows - 1; i >= 0; --i) {
83,768!
543
      if (colDataIsNull_s(pColInfo, i)) {
167,536!
544
        continue;
×
545
      }
546

547
      char* val = colDataGetData(pColInfo, i);
83,768!
548
      if (IS_VAR_DATA_TYPE(pkey->type)) {
83,768!
549
        memcpy(pkey->pData, val, varDataTLen(val));
×
550
      } else {
551
        memcpy(pkey->pData, val, pkey->bytes);
83,768✔
552
      }
553

554
      break;
83,768✔
555
    }
556
  }
557
}
558

559
static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t numOfExprs, SResultRowInfo* pResultRowInfo,
41,514✔
560
                                       SSDataBlock* pBlock, int32_t scanFlag, int64_t* tsCols, SResultRowPosition* p) {
561
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
41,514✔
562

563
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
41,514✔
564
  SExprSupp*                pSup = &pOperatorInfo->exprSupp;
41,514✔
565

566
  int32_t startPos = 0;
41,514✔
567
  int32_t numOfOutput = pSup->numOfExprs;
41,514✔
568

569
  SResultRow* pResult = NULL;
41,514✔
570

571
  while (1) {
346✔
572
    SListNode*          pn = tdListGetHead(pResultRowInfo->openWindow);
41,860✔
573
    SOpenWindowInfo*    pOpenWin = (SOpenWindowInfo*)pn->data;
41,859✔
574
    uint64_t            groupId = pOpenWin->groupId;
41,859✔
575
    SResultRowPosition* p1 = &pOpenWin->pos;
41,859✔
576
    if (p->pageId == p1->pageId && p->offset == p1->offset) {
41,859!
577
      break;
41,514✔
578
    }
579

580
    SResultRow* pr = getResultRowByPos(pInfo->aggSup.pResultBuf, p1, false);
345✔
581
    if (NULL == pr) {
346!
582
      T_LONG_JMP(pTaskInfo->env, terrno);
×
583
    }
584

585
    if (!(pr->offset == p1->offset && pr->pageId == p1->pageId)) {
346!
586
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
587
      T_LONG_JMP(pTaskInfo->env, terrno);
×
588
    }
589

590
    if (pr->closed) {
346!
591
      if (!(isResultRowInterpolated(pr, RESULT_ROW_START_INTERP) &&
×
592
             isResultRowInterpolated(pr, RESULT_ROW_END_INTERP)) ) {
×
593
        pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
594
        T_LONG_JMP(pTaskInfo->env, terrno);
×
595
      }
596
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
×
597
      taosMemoryFree(pNode);
×
598
      continue;
×
599
    }
600

601
    STimeWindow w = pr->win;
346✔
602
    int32_t     ret = setTimeWindowOutputBuf(pResultRowInfo, &w, (scanFlag == MAIN_SCAN), &pResult, groupId, pSup->pCtx,
346✔
603
                                             numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
604
    if (ret != TSDB_CODE_SUCCESS) {
346!
605
      T_LONG_JMP(pTaskInfo->env, ret);
×
606
    }
607

608
    if(isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
346!
609
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
610
      T_LONG_JMP(pTaskInfo->env, terrno);
×
611
    }
612

613
    SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
346✔
614
    if (!pTsKey) {
346!
615
      pTaskInfo->code = terrno;
×
616
      T_LONG_JMP(pTaskInfo->env, terrno);
×
617
    }
618

619
    int64_t     prevTs = *(int64_t*)pTsKey->pData;
346✔
620
    if (groupId == pBlock->info.id.groupId) {
346!
621
      TSKEY curTs = pBlock->info.window.skey;
346✔
622
      if (tsCols != NULL) {
346!
623
        curTs = tsCols[startPos];
346✔
624
      }
625
      doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, curTs, startPos, w.ekey,
346✔
626
                                RESULT_ROW_END_INTERP, pSup);
627
    }
628

629
    setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
346✔
630
    setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
346✔
631

632
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, 1);
346✔
633
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0,
346✔
634
                                          pBlock->info.rows, numOfExprs);
346✔
635
    if (ret != TSDB_CODE_SUCCESS) {
346!
636
      T_LONG_JMP(pTaskInfo->env, ret);
×
637
    }
638

639
    if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
346!
640
      closeResultRow(pr);
346✔
641
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
346✔
642
      taosMemoryFree(pNode);
346!
643
    } else {  // the remains are can not be closed yet.
644
      break;
×
645
    }
646
  }
647
}
41,514✔
648

649
static bool tsKeyCompFn(void* l, void* r, void* param) {
14,251,158✔
650
  TSKEY*                    lTS = (TSKEY*)l;
14,251,158✔
651
  TSKEY*                    rTS = (TSKEY*)r;
14,251,158✔
652
  SIntervalAggOperatorInfo* pInfo = param;
14,251,158✔
653
  return pInfo->binfo.outputTsOrder == ORDER_ASC ? *lTS < *rTS : *lTS > *rTS;
14,251,158✔
654
}
655

656
static bool isCalculatedWin(SIntervalAggOperatorInfo* pInfo, const STimeWindow* win, uint64_t tableGroupId) {
3,676,889✔
657
  char keyBuf[sizeof(TSKEY) + sizeof(uint64_t)] = {0};
3,676,889✔
658
  SET_RES_WINDOW_KEY(keyBuf, (char*)&win->skey, sizeof(TSKEY), tableGroupId);
3,676,889✔
659
  return tSimpleHashGet(pInfo->aggSup.pResultRowHashTable, keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))) != NULL;
3,676,889✔
660
}
661

662
/**
663
 * @brief check if cur window should be filtered out by limit info
664
 * @retval true if should be filtered out
665
 * @retval false if not filtering out
666
 * @note If no limit info, we skip filtering.
667
 *       If input/output ts order mismatch, we skip filtering too.
668
 *       eg. input ts order: desc, and output ts order: asc, limit: 10
669
 *       IntervalOperator should output the first 10 windows, however, we can't find the first 10 windows until we scan
670
 *       every tuple in every block.
671
 *       And the boundedQueue keeps refreshing all records with smaller ts key.
672
 */
673
static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STimeWindow* win, uint64_t groupId, SExecTaskInfo* pTaskInfo) {
189,526,653✔
674
  int32_t code = TSDB_CODE_SUCCESS;
189,526,653✔
675
  int32_t lino = 0;
189,526,653✔
676
  if (!pOperatorInfo->limited  // if no limit info, no filter will be applied
189,526,653✔
677
      || pOperatorInfo->binfo.inputTsOrder != pOperatorInfo->binfo.outputTsOrder
11,112,850✔
678
      // if input/output ts order mismatch, no filter
679
  ) {
680
    return false;
185,726,639✔
681
  }
682

683
  if (pOperatorInfo->limit == 0) return true;
3,800,014✔
684

685
  if (pOperatorInfo->pBQ == NULL) {
3,799,396✔
686
    pOperatorInfo->pBQ = createBoundedQueue(pOperatorInfo->limit - 1, tsKeyCompFn, taosAutoMemoryFree, pOperatorInfo);
22,395✔
687
    QUERY_CHECK_NULL(pOperatorInfo->pBQ, code, lino, _end, terrno);
22,386!
688
  }
689

690
  bool shouldFilter = false;
3,799,387✔
691
  // if BQ has been full, compare it with top of BQ
692
  if (taosBQSize(pOperatorInfo->pBQ) == taosBQMaxSize(pOperatorInfo->pBQ) + 1) {
3,799,387✔
693
    PriorityQueueNode* top = taosBQTop(pOperatorInfo->pBQ);
1,576,572✔
694
    shouldFilter = tsKeyCompFn(top->data, &win->skey, pOperatorInfo);
1,573,837✔
695
  }
696
  if (shouldFilter) {
3,773,077✔
697
    return true;
91,197✔
698
  } else if (isCalculatedWin(pOperatorInfo, win, groupId)) {
3,681,880✔
699
    return false;
2,045,737✔
700
  }
701

702
  // cur win not been filtered out and not been pushed into BQ yet, push it into BQ
703
  PriorityQueueNode node = {.data = taosMemoryMalloc(sizeof(TSKEY))};
1,697,429!
704
  QUERY_CHECK_NULL(node.data, code, lino, _end, terrno);
1,726,583!
705

706
  *((TSKEY*)node.data) = win->skey;
1,726,583✔
707

708
  if (NULL == taosBQPush(pOperatorInfo->pBQ, &node)) {
1,726,583!
709
    taosMemoryFree(node.data);
×
710
    return true;
×
711
  }
712

713
_end:
1,642,630✔
714
  if (code != TSDB_CODE_SUCCESS) {
1,642,630!
715
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
716
    pTaskInfo->code = code;
×
717
    T_LONG_JMP(pTaskInfo->env, code);
×
718
  }
719
  return false;
1,642,630✔
720
}
721

722
static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
1,965,308✔
723
                            int32_t scanFlag) {
724
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
1,965,308✔
725

726
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
1,965,308✔
727
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
1,965,308✔
728

729
  int32_t     startPos = 0;
1,965,308✔
730
  int32_t     numOfOutput = pSup->numOfExprs;
1,965,308✔
731
  int64_t*    tsCols = extractTsCol(pBlock, pInfo, pTaskInfo);
1,965,308✔
732
  uint64_t    tableGroupId = pBlock->info.id.groupId;
1,965,159✔
733
  bool        ascScan = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
1,965,159✔
734
  TSKEY       ts = getStartTsKey(&pBlock->info.window, tsCols);
1,965,159✔
735
  SResultRow* pResult = NULL;
1,965,142✔
736

737
  if (tableGroupId != pInfo->curGroupId) {
1,965,142✔
738
    pInfo->handledGroupNum += 1;
115,562✔
739
    if (pInfo->slimited && pInfo->handledGroupNum > pInfo->slimit) {
115,562✔
740
      return true;
167✔
741
    } else {
742
      pInfo->curGroupId = tableGroupId;
115,395✔
743
      destroyBoundedQueue(pInfo->pBQ);
115,395✔
744
      pInfo->pBQ = NULL;
115,394✔
745
    }
746
  }
747

748
  STimeWindow win =
749
      getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->binfo.inputTsOrder);
1,964,974✔
750
  if (filterWindowWithLimit(pInfo, &win, tableGroupId, pTaskInfo)) return false;
1,964,920✔
751

752
  int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
1,880,195✔
753
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
754
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
1,880,255!
755
    T_LONG_JMP(pTaskInfo->env, ret);
10!
756
  }
757

758
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
1,880,245✔
759
  int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
1,880,245✔
760
                                                 pInfo->binfo.inputTsOrder);
761

762
  // prev time window not interpolation yet.
763
  if (pInfo->timeWindowInterpo) {
1,880,006✔
764
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
41,513✔
765
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
41,514✔
766

767
    // restore current time window
768
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
41,513✔
769
                                 numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
770
    if (ret != TSDB_CODE_SUCCESS) {
41,514!
771
      T_LONG_JMP(pTaskInfo->env, ret);
×
772
    }
773

774
    // window start key interpolation
775
    ret = doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup);
41,514✔
776
    if (ret != TSDB_CODE_SUCCESS) {
41,514!
777
      T_LONG_JMP(pTaskInfo->env, ret);
×
778
    }
779
  }
780

781
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, 1);
1,880,007✔
782
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
1,879,976✔
783
                                  pBlock->info.rows, numOfOutput);
1,879,976✔
784
  if (ret != TSDB_CODE_SUCCESS) {
1,880,387!
785
    T_LONG_JMP(pTaskInfo->env, ret);
×
786
  }
787

788
  doCloseWindow(pResultRowInfo, pInfo, pResult);
1,880,387✔
789

790
  STimeWindow nextWin = win;
1,880,376✔
791
  while (1) {
188,296,781✔
792
    int32_t prevEndPos = forwardRows - 1 + startPos;
190,177,157✔
793
    startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos,
190,177,157✔
794
                                      pInfo->binfo.inputTsOrder);
795
    if (startPos < 0 || filterWindowWithLimit(pInfo, &nextWin, tableGroupId, pTaskInfo)) {
189,394,816!
796
      break;
797
    }
798
    // null data, failed to allocate more memory buffer
799
    int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
187,583,262✔
800
                                          pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
801
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
186,396,480!
802
      T_LONG_JMP(pTaskInfo->env, code);
×
803
    }
804

805
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
186,572,379✔
806
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
186,572,379✔
807
                                           pInfo->binfo.inputTsOrder);
808
    // window start(end) key interpolation
809
    code = doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
186,299,857✔
810
    if (code != TSDB_CODE_SUCCESS) {
187,143,819!
811
      T_LONG_JMP(pTaskInfo->env, code);
×
812
    }
813
    // TODO: add to open window? how to close the open windows after input blocks exhausted?
814
#if 0
815
    if ((ascScan && ekey <= pBlock->info.window.ekey) ||
816
        (!ascScan && ekey >= pBlock->info.window.skey)) {
817
      // window start(end) key interpolation
818
      doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
819
    } else if (pInfo->timeWindowInterpo) {
820
      addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
821
    }
822
#endif
823
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, 1);
187,143,819✔
824
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
187,226,524✔
825
                                          pBlock->info.rows, numOfOutput);
187,226,524✔
826
    if (ret != TSDB_CODE_SUCCESS) {
188,454,183!
827
      T_LONG_JMP(pTaskInfo->env, ret);
×
828
    }
829
    doCloseWindow(pResultRowInfo, pInfo, pResult);
188,454,183✔
830
  }
831

832
  if (pInfo->timeWindowInterpo) {
1,704,647✔
833
    saveDataBlockLastRow(pInfo->pPrevValues, pBlock, pInfo->pInterpCols);
41,514✔
834
  }
835
  return false;
1,880,425✔
836
}
837

838
void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult) {
190,201,418✔
839
  // current result is done in computing final results.
840
  if (pInfo->timeWindowInterpo && isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
190,201,418✔
841
    closeResultRow(pResult);
4,803,311✔
842
    SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
4,803,203✔
843
    taosMemoryFree(pNode);
4,803,145!
844
  }
845
}
190,201,057✔
846

847
SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId,
41,514✔
848
                                       SExecTaskInfo* pTaskInfo) {
849
  int32_t         code = TSDB_CODE_SUCCESS;
41,514✔
850
  int32_t         lino = 0;
41,514✔
851
  SOpenWindowInfo openWin = {0};
41,514✔
852
  openWin.pos.pageId = pResult->pageId;
41,514✔
853
  openWin.pos.offset = pResult->offset;
41,514✔
854
  openWin.groupId = groupId;
41,514✔
855
  SListNode* pn = tdListGetTail(pResultRowInfo->openWindow);
41,514✔
856
  if (pn == NULL) {
41,514✔
857
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
39,050✔
858
    QUERY_CHECK_CODE(code, lino, _end);
39,050!
859
    return openWin.pos;
39,050✔
860
  }
861

862
  SOpenWindowInfo* px = (SOpenWindowInfo*)pn->data;
2,464✔
863
  if (px->pos.pageId != openWin.pos.pageId || px->pos.offset != openWin.pos.offset || px->groupId != openWin.groupId) {
2,464!
864
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
346✔
865
    QUERY_CHECK_CODE(code, lino, _end);
346!
866
  }
867

868
_end:
2,464✔
869
  if (code != TSDB_CODE_SUCCESS) {
2,464!
870
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
871
    pTaskInfo->code = code;
×
872
    T_LONG_JMP(pTaskInfo->env, code);
×
873
  }
874
  return openWin.pos;
2,464✔
875
}
876

877
int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo) {
2,310,048✔
878
  TSKEY* tsCols = NULL;
2,310,048✔
879

880
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
2,310,048!
881
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
2,310,078✔
882
    if (!pColDataInfo) {
2,309,865!
883
      pTaskInfo->code = terrno;
×
884
      T_LONG_JMP(pTaskInfo->env, terrno);
×
885
    }
886

887
    tsCols = (int64_t*)pColDataInfo->pData;
2,309,865✔
888
    if(tsCols[0] == 0) {
2,309,865✔
889
      qWarn("%s at line %d.block start ts:%" PRId64 ",end ts:%" PRId64, __func__, __LINE__, tsCols[0], tsCols[pBlock->info.rows - 1]);
1!
890
    }
891

892
    if (tsCols[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
2,309,888!
893
      int32_t code = blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
717,638✔
894
      if (code != TSDB_CODE_SUCCESS) {
717,635!
895
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
896
        pTaskInfo->code = code;
×
897
        T_LONG_JMP(pTaskInfo->env, code);
×
898
      }
899
    }
900
  }
901

902
  return tsCols;
2,309,855✔
903
}
904

905
static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
1,607,110✔
906
  if (OPTR_IS_OPENED(pOperator)) {
1,607,110✔
907
    return TSDB_CODE_SUCCESS;
405,418✔
908
  }
909

910
  int32_t        code = TSDB_CODE_SUCCESS;
1,201,692✔
911
  int32_t        lino = 0;
1,201,692✔
912
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1,201,692✔
913
  SOperatorInfo* downstream = pOperator->pDownstream[0];
1,201,692✔
914

915
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
1,201,692✔
916
  SExprSupp*                pSup = &pOperator->exprSupp;
1,201,692✔
917

918
  int32_t scanFlag = MAIN_SCAN;
1,201,692✔
919
  int64_t st = taosGetTimestampUs();
1,204,359✔
920

921
  pInfo->cleanGroupResInfo = false;
1,204,359✔
922
  while (1) {
1,965,075✔
923
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
3,169,434✔
924
    if (pBlock == NULL) {
3,170,610✔
925
      break;
1,205,701✔
926
    }
927

928
    pInfo->binfo.pRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
1,964,909✔
929

930
    if (pInfo->scalarSupp.pExprInfo != NULL) {
1,964,909✔
931
      SExprSupp* pExprSup = &pInfo->scalarSupp;
103,587✔
932
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
103,587✔
933
      QUERY_CHECK_CODE(code, lino, _end);
103,577!
934
    }
935

936
    // the pDataBlock are always the same one, no need to call this again
937
    code = setInputDataBlock(pSup, pBlock, pInfo->binfo.inputTsOrder, scanFlag, true);
1,964,899✔
938
    QUERY_CHECK_CODE(code, lino, _end);
1,965,361!
939
    if (hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag)) break;
1,965,361✔
940
  }
941

942
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->binfo.outputTsOrder);
1,205,868✔
943
  QUERY_CHECK_CODE(code, lino, _end);
1,205,921!
944
  pInfo->cleanGroupResInfo = true;
1,205,921✔
945

946
  OPTR_SET_OPENED(pOperator);
1,205,921✔
947

948
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
1,205,968✔
949

950
_end:
1,205,968✔
951
  if (code != TSDB_CODE_SUCCESS) {
1,205,968!
952
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
953
    pTaskInfo->code = code;
×
954
    T_LONG_JMP(pTaskInfo->env, code);
×
955
  }
956
  return code;
1,205,968✔
957
}
958

959
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock) {
71,092✔
960
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
71,092✔
961
  SExprSupp*     pSup = &pOperator->exprSupp;
71,092✔
962

963
  SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->stateCol.slotId);
71,092✔
964
  if (!pStateColInfoData) {
71,094!
965
    pTaskInfo->code = terrno;
×
966
    T_LONG_JMP(pTaskInfo->env, terrno);
×
967
  }
968
  int64_t          gid = pBlock->info.id.groupId;
71,094✔
969

970
  bool    hasResult = false;
71,094✔
971
  bool    masterScan = true;
71,094✔
972
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
71,094✔
973
  int32_t bytes = pStateColInfoData->info.bytes;
71,094✔
974

975
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
71,094✔
976
  if (!pColInfoData) {
71,093✔
977
    pTaskInfo->code = terrno;
1✔
978
    T_LONG_JMP(pTaskInfo->env, terrno);
×
979
  }
980
  TSKEY*           tsList = (TSKEY*)pColInfoData->pData;
71,092✔
981

982
  SWindowRowsSup* pRowSup = &pInfo->winSup;
71,092✔
983
  pRowSup->numOfRows = 0;
71,092✔
984
  pRowSup->startRowIndex = 0;
71,092✔
985

986
  struct SColumnDataAgg* pAgg = NULL;
71,092✔
987
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
15,425,541✔
988
    pAgg = (pBlock->pBlockAgg != NULL) ? &pBlock->pBlockAgg[pInfo->stateCol.slotId] : NULL;
15,354,435!
989
    if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pAgg)) {
30,708,870✔
990
      continue;
412,337✔
991
    }
992
    hasResult = true;
14,942,098✔
993
    if (pStateColInfoData->pData == NULL) {
14,942,098!
994
      qError("%s:%d state column data is null", __FILE__, __LINE__);
×
995
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
996
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
997
    }
998

999
    char* val = colDataGetData(pStateColInfoData, j);
14,942,098!
1000

1001
    if (gid != pRowSup->groupId || !pInfo->hasKey) {
14,942,098✔
1002
      // todo extract method
1003
      if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) {
60,739!
1004
        varDataCopy(pInfo->stateKey.pData, val);
×
1005
      } else {
1006
        memcpy(pInfo->stateKey.pData, val, bytes);
60,782✔
1007
      }
1008

1009
      pInfo->hasKey = true;
60,739✔
1010

1011
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
60,739✔
1012
      doKeepTuple(pRowSup, tsList[j], gid);
61,055✔
1013
    } else if (compareVal(val, &pInfo->stateKey)) {
14,881,359✔
1014
      doKeepTuple(pRowSup, tsList[j], gid);
2,251,836✔
1015
    } else {  // a new state window started
1016
      SResultRow* pResult = NULL;
12,629,745✔
1017

1018
      // keep the time window for the closed time window.
1019
      STimeWindow window = pRowSup->win;
12,629,745✔
1020

1021
      pRowSup->win.ekey = pRowSup->win.skey;
12,629,745✔
1022
      int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
12,629,745✔
1023
                                           numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1024
      if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
12,629,632!
1025
        T_LONG_JMP(pTaskInfo->env, ret);
×
1026
      }
1027

1028
      updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
12,629,632✔
1029
      ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData,
12,629,435✔
1030
                                            pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
12,629,435✔
1031
      if (ret != TSDB_CODE_SUCCESS) {
12,629,601!
1032
        T_LONG_JMP(pTaskInfo->env, ret);
×
1033
      }
1034

1035
      // here we start a new session window
1036
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
12,629,601✔
1037
      doKeepTuple(pRowSup, tsList[j], gid);
12,629,401✔
1038

1039
      // todo extract method
1040
      if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) {
12,629,384!
1041
        varDataCopy(pInfo->stateKey.pData, val);
9✔
1042
      } else {
1043
        memcpy(pInfo->stateKey.pData, val, bytes);
12,629,375✔
1044
      }
1045
    }
1046
  }
1047

1048
  if (!hasResult) {
71,106✔
1049
    return;
1,030✔
1050
  }
1051
  SResultRow* pResult = NULL;
70,076✔
1052
  pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
70,076✔
1053
  int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
70,076✔
1054
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1055
  if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
70,064!
1056
    T_LONG_JMP(pTaskInfo->env, ret);
×
1057
  }
1058

1059
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
70,064✔
1060
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
70,063✔
1061
                                        pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
70,063✔
1062
  if (ret != TSDB_CODE_SUCCESS) {
70,064!
1063
    T_LONG_JMP(pTaskInfo->env, ret);
×
1064
  }
1065
}
1066

1067
static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) {
75,070✔
1068
  if (OPTR_IS_OPENED(pOperator)) {
75,070✔
1069
    return TSDB_CODE_SUCCESS;
10,502✔
1070
  }
1071

1072
  int32_t                   code = TSDB_CODE_SUCCESS;
64,568✔
1073
  int32_t                   lino = 0;
64,568✔
1074
  SStateWindowOperatorInfo* pInfo = pOperator->info;
64,568✔
1075
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
64,568✔
1076

1077
  SExprSupp* pSup = &pOperator->exprSupp;
64,568✔
1078
  int32_t    order = pInfo->binfo.inputTsOrder;
64,568✔
1079
  int64_t    st = taosGetTimestampUs();
64,568✔
1080

1081
  SOperatorInfo* downstream = pOperator->pDownstream[0];
64,568✔
1082
  pInfo->cleanGroupResInfo = false;
64,568✔
1083
  while (1) {
71,094✔
1084
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
135,662✔
1085
    if (pBlock == NULL) {
135,661✔
1086
      break;
64,568✔
1087
    }
1088

1089
    pInfo->binfo.pRes->info.scanFlag = pBlock->info.scanFlag;
71,093✔
1090
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
71,093✔
1091
    QUERY_CHECK_CODE(code, lino, _end);
71,094!
1092

1093
    code = blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
71,094✔
1094
    QUERY_CHECK_CODE(code, lino, _end);
71,094!
1095

1096
    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
1097
    if (pInfo->scalarSup.pExprInfo != NULL) {
71,094✔
1098
      pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
4,254✔
1099
                                              pInfo->scalarSup.numOfExprs, NULL);
1100
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
4,252!
1101
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1102
      }
1103
    }
1104

1105
    doStateWindowAggImpl(pOperator, pInfo, pBlock);
71,092✔
1106
  }
1107

1108
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
64,568✔
1109
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
64,568✔
1110
  QUERY_CHECK_CODE(code, lino, _end);
64,568!
1111
  pInfo->cleanGroupResInfo = true;
64,568✔
1112
  pOperator->status = OP_RES_TO_RETURN;
64,568✔
1113

1114
_end:
64,568✔
1115
  if (code != TSDB_CODE_SUCCESS) {
64,568!
1116
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1117
    pTaskInfo->code = code;
×
1118
    T_LONG_JMP(pTaskInfo->env, code);
×
1119
  }
1120
  return code;
64,568✔
1121
}
1122

1123
static int32_t doStateWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
124,712✔
1124
  if (pOperator->status == OP_EXEC_DONE) {
124,712✔
1125
    (*ppRes) = NULL;
49,642✔
1126
    return TSDB_CODE_SUCCESS;
49,642✔
1127
  }
1128

1129
  int32_t                   code = TSDB_CODE_SUCCESS;
75,070✔
1130
  int32_t                   lino = 0;
75,070✔
1131
  SStateWindowOperatorInfo* pInfo = pOperator->info;
75,070✔
1132
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
75,070✔
1133
  SOptrBasicInfo*           pBInfo = &pInfo->binfo;
75,070✔
1134

1135
  code = pOperator->fpSet._openFn(pOperator);
75,070✔
1136
  QUERY_CHECK_CODE(code, lino, _end);
75,070!
1137

1138
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
75,070✔
1139
  QUERY_CHECK_CODE(code, lino, _end);
75,070!
1140

1141
  while (1) {
×
1142
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
75,070✔
1143
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
75,070✔
1144
    QUERY_CHECK_CODE(code, lino, _end);
75,070!
1145

1146
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
75,070✔
1147
    if (!hasRemain) {
75,070✔
1148
      setOperatorCompleted(pOperator);
64,544✔
1149
      break;
64,544✔
1150
    }
1151

1152
    if (pBInfo->pRes->info.rows > 0) {
10,526!
1153
      break;
10,526✔
1154
    }
1155
  }
1156

1157
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
75,070✔
1158

1159
_end:
75,070✔
1160
  if (code != TSDB_CODE_SUCCESS) {
75,070!
1161
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1162
    pTaskInfo->code = code;
×
1163
    T_LONG_JMP(pTaskInfo->env, code);
×
1164
  }
1165
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
75,070✔
1166
  return code;
75,070✔
1167
}
1168

1169
static int32_t doBuildIntervalResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
2,199,115✔
1170
  int32_t                   code = TSDB_CODE_SUCCESS;
2,199,115✔
1171
  int32_t                   lino = 0;
2,199,115✔
1172
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
2,199,115✔
1173
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
2,199,115✔
1174

1175
  if (pOperator->status == OP_EXEC_DONE) {
2,199,115✔
1176
    (*ppRes) = NULL;
591,935✔
1177
    return code;
591,935✔
1178
  }
1179

1180
  SSDataBlock* pBlock = pInfo->binfo.pRes;
1,607,180✔
1181
  code = pOperator->fpSet._openFn(pOperator);
1,607,180✔
1182
  QUERY_CHECK_CODE(code, lino, _end);
1,611,349!
1183

1184
  while (1) {
15✔
1185
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
1,611,364✔
1186
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
1,611,282✔
1187
    QUERY_CHECK_CODE(code, lino, _end);
1,611,271!
1188

1189
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
1,611,271✔
1190
    if (!hasRemain) {
1,611,285✔
1191
      setOperatorCompleted(pOperator);
1,205,520✔
1192
      break;
1,205,593✔
1193
    }
1194

1195
    if (pBlock->info.rows > 0) {
405,765✔
1196
      break;
405,750✔
1197
    }
1198
  }
1199

1200
  size_t rows = pBlock->info.rows;
1,611,343✔
1201
  pOperator->resultInfo.totalRows += rows;
1,611,343✔
1202

1203
_end:
1,611,343✔
1204
  if (code != TSDB_CODE_SUCCESS) {
1,611,343!
1205
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1206
    pTaskInfo->code = code;
×
1207
    T_LONG_JMP(pTaskInfo->env, code);
×
1208
  }
1209
  (*ppRes) = (rows == 0) ? NULL : pBlock;
1,611,343✔
1210
  return code;
1,611,343✔
1211
}
1212

1213
static void destroyStateWindowOperatorInfo(void* param) {
64,568✔
1214
  if (param == NULL) {
64,568!
1215
    return;
×
1216
  }
1217
  SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
64,568✔
1218
  cleanupBasicInfo(&pInfo->binfo);
64,568✔
1219
  taosMemoryFreeClear(pInfo->stateKey.pData);
64,568!
1220
  if (pInfo->pOperator) {
64,568!
1221
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
64,568✔
1222
                      pInfo->cleanGroupResInfo);
64,568✔
1223
    pInfo->pOperator = NULL;
64,568✔
1224
  }
1225

1226
  cleanupExprSupp(&pInfo->scalarSup);
64,568✔
1227
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
64,568✔
1228
  cleanupAggSup(&pInfo->aggSup);
64,568✔
1229
  cleanupGroupResInfo(&pInfo->groupResInfo);
64,568✔
1230

1231
  taosMemoryFreeClear(param);
64,568!
1232
}
1233

1234
static void freeItem(void* param) {
117,566✔
1235
  SGroupKeys* pKey = (SGroupKeys*)param;
117,566✔
1236
  taosMemoryFree(pKey->pData);
117,566!
1237
}
117,566✔
1238

1239
void destroyIntervalOperatorInfo(void* param) {
1,646,589✔
1240
  if (param == NULL) {
1,646,589!
1241
    return;
×
1242
  }
1243

1244
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
1,646,589✔
1245

1246
  cleanupBasicInfo(&pInfo->binfo);
1,646,589✔
1247
  if (pInfo->pOperator) {
1,646,717✔
1248
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
1,646,714✔
1249
                      pInfo->cleanGroupResInfo);
1,646,714✔
1250
    pInfo->pOperator = NULL;
1,646,596✔
1251
  }
1252

1253
  cleanupAggSup(&pInfo->aggSup);
1,646,599✔
1254
  cleanupExprSupp(&pInfo->scalarSupp);
1,646,675✔
1255

1256
  pInfo->binfo.resultRowInfo.openWindow = tdListFree(pInfo->binfo.resultRowInfo.openWindow);
1,646,617✔
1257

1258
  taosArrayDestroy(pInfo->pInterpCols);
1,646,616✔
1259
  pInfo->pInterpCols = NULL;
1,646,486✔
1260

1261
  taosArrayDestroyEx(pInfo->pPrevValues, freeItem);
1,646,486✔
1262
  pInfo->pPrevValues = NULL;
1,646,416✔
1263

1264
  cleanupGroupResInfo(&pInfo->groupResInfo);
1,646,416✔
1265
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
1,646,677✔
1266
  destroyBoundedQueue(pInfo->pBQ);
1,646,712✔
1267
  taosMemoryFreeClear(param);
1,646,424!
1268
}
1269

1270
static int32_t timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SIntervalAggOperatorInfo* pInfo,
1,643,912✔
1271
                                   bool* pRes) {
1272
  // the primary timestamp column
1273
  bool    needed = false;
1,643,912✔
1274
  int32_t code = TSDB_CODE_SUCCESS;
1,643,912✔
1275
  int32_t lino = 0;
1,643,912✔
1276
  void*   tmp = NULL;
1,643,912✔
1277

1278
  for (int32_t i = 0; i < numOfCols; ++i) {
5,340,972✔
1279
    SExprInfo* pExpr = pCtx[i].pExpr;
3,754,590✔
1280
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
3,754,590✔
1281
      needed = true;
58,413✔
1282
      break;
58,413✔
1283
    }
1284
  }
1285

1286
  if (needed) {
1,644,795✔
1287
    pInfo->pInterpCols = taosArrayInit(4, sizeof(SColumn));
58,413✔
1288
    QUERY_CHECK_NULL(pInfo->pInterpCols, code, lino, _end, terrno);
58,413!
1289

1290
    pInfo->pPrevValues = taosArrayInit(4, sizeof(SGroupKeys));
58,413✔
1291
    QUERY_CHECK_NULL(pInfo->pPrevValues, code, lino, _end, terrno);
58,413!
1292

1293
    {  // ts column
1294
      SColumn c = {0};
58,413✔
1295
      c.colId = 1;
58,413✔
1296
      c.slotId = pInfo->primaryTsIndex;
58,413✔
1297
      c.type = TSDB_DATA_TYPE_TIMESTAMP;
58,413✔
1298
      c.bytes = sizeof(int64_t);
58,413✔
1299
      tmp = taosArrayPush(pInfo->pInterpCols, &c);
58,413✔
1300
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
58,413!
1301

1302
      SGroupKeys key;
1303
      key.bytes = c.bytes;
58,413✔
1304
      key.type = c.type;
58,413✔
1305
      key.isNull = true;  // to denote no value is assigned yet
58,413✔
1306
      key.pData = taosMemoryCalloc(1, c.bytes);
58,413!
1307
      QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
58,413!
1308

1309
      tmp = taosArrayPush(pInfo->pPrevValues, &key);
58,413✔
1310
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
58,413!
1311
    }
1312
  }
1313

1314
  for (int32_t i = 0; i < numOfCols; ++i) {
5,398,324✔
1315
    SExprInfo* pExpr = pCtx[i].pExpr;
3,752,217✔
1316

1317
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
3,752,217✔
1318
      SFunctParam* pParam = &pExpr->base.pParam[0];
59,083✔
1319

1320
      SColumn c = *pParam->pCol;
59,083✔
1321
      tmp = taosArrayPush(pInfo->pInterpCols, &c);
59,083✔
1322
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
59,152!
1323

1324
      SGroupKeys key = {0};
59,152✔
1325
      key.bytes = c.bytes;
59,152✔
1326
      key.type = c.type;
59,152✔
1327
      key.isNull = false;
59,152✔
1328
      key.pData = taosMemoryCalloc(1, c.bytes);
59,152!
1329
      QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
59,153!
1330

1331
      tmp = taosArrayPush(pInfo->pPrevValues, &key);
59,153✔
1332
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
59,153!
1333
    }
1334
  }
1335

1336
_end:
1,646,107✔
1337
  if (code != TSDB_CODE_SUCCESS) {
1,646,107!
1338
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1339
  }
1340
  *pRes = needed;
1,644,907✔
1341
  return code;
1,644,907✔
1342
}
1343

1344
int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
1,203,600✔
1345
                                   SOperatorInfo** pOptrInfo) {
1346
  QRY_PARAM_CHECK(pOptrInfo);
1,203,600!
1347

1348
  int32_t                   code = TSDB_CODE_SUCCESS;
1,203,600✔
1349
  int32_t                   lino = 0;
1,203,600✔
1350
  SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
1,203,600!
1351
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1,204,517!
1352
  if (pInfo == NULL || pOperator == NULL) {
1,204,794!
1353
    code = terrno;
×
1354
    lino = __LINE__;
×
1355
    goto _error;
×
1356
  }
1357

1358
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->window.node.pOutputDataBlockDesc);
1,204,862✔
1359
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
1,205,288!
1360
  initBasicInfo(&pInfo->binfo, pResBlock);
1,205,288✔
1361

1362
  SExprSupp* pSup = &pOperator->exprSupp;
1,204,540✔
1363
  pSup->hasWindowOrGroup = true;
1,204,540✔
1364

1365
  pInfo->primaryTsIndex = ((SColumnNode*)pPhyNode->window.pTspk)->slotId;
1,204,540✔
1366

1367
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
1,204,540✔
1368
  initResultSizeInfo(&pOperator->resultInfo, 512);
1,204,540✔
1369
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
1,205,266✔
1370
  QUERY_CHECK_CODE(code, lino, _error);
1,205,205!
1371

1372
  int32_t    num = 0;
1,205,205✔
1373
  SExprInfo* pExprInfo = NULL;
1,205,205✔
1374
  code = createExprInfo(pPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
1,205,205✔
1375
  QUERY_CHECK_CODE(code, lino, _error);
1,205,558!
1376

1377
  code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
1,205,558✔
1378
                    &pTaskInfo->storageAPI.functionStore);
1379
  QUERY_CHECK_CODE(code, lino, _error);
1,205,127!
1380

1381
  SInterval interval = {.interval = pPhyNode->interval,
1,205,127✔
1382
                        .sliding = pPhyNode->sliding,
1,205,127✔
1383
                        .intervalUnit = pPhyNode->intervalUnit,
1,205,127✔
1384
                        .slidingUnit = pPhyNode->slidingUnit,
1,205,127✔
1385
                        .offset = pPhyNode->offset,
1,205,127✔
1386
                        .precision = ((SColumnNode*)pPhyNode->window.pTspk)->node.resType.precision,
1,205,127✔
1387
                        .timeRange = pPhyNode->timeRange};
1388
  calcIntervalAutoOffset(&interval);
1,205,127✔
1389

1390
  STimeWindowAggSupp as = {
1,204,196✔
1391
      .waterMark = pPhyNode->window.watermark,
1,204,196✔
1392
      .calTrigger = pPhyNode->window.triggerType,
1,204,196✔
1393
      .maxTs = INT64_MIN,
1394
  };
1395

1396
  pInfo->win = pTaskInfo->window;
1,204,196✔
1397
  pInfo->binfo.inputTsOrder = pPhyNode->window.node.inputTsOrder;
1,204,196✔
1398
  pInfo->binfo.outputTsOrder = pPhyNode->window.node.outputTsOrder;
1,204,196✔
1399
  pInfo->interval = interval;
1,204,196✔
1400
  pInfo->twAggSup = as;
1,204,196✔
1401
  pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock;
1,204,196✔
1402
  if (pPhyNode->window.node.pLimit && ((SLimitNode*)pPhyNode->window.node.pLimit)->limit) {
1,204,196!
1403
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pLimit;
43,360✔
1404
    pInfo->limited = true;
43,360✔
1405
    pInfo->limit = pLimit->limit->datum.i + (pLimit->offset ? pLimit->offset->datum.i : 0);
43,360✔
1406
  }
1407
  if (pPhyNode->window.node.pSlimit && ((SLimitNode*)pPhyNode->window.node.pSlimit)->limit) {
1,204,196!
1408
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pSlimit;
666✔
1409
    pInfo->slimited = true;
666✔
1410
    pInfo->slimit = pLimit->limit->datum.i + (pLimit->offset ? pLimit->offset->datum.i : 0);
666✔
1411
    pInfo->curGroupId = UINT64_MAX;
666✔
1412
  }
1413

1414
  if (pPhyNode->window.pExprs != NULL) {
1,204,196✔
1415
    int32_t    numOfScalar = 0;
2,317✔
1416
    SExprInfo* pScalarExprInfo = NULL;
2,317✔
1417
    code = createExprInfo(pPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
2,317✔
1418
    QUERY_CHECK_CODE(code, lino, _error);
2,320!
1419

1420
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
2,320✔
1421
    if (code != TSDB_CODE_SUCCESS) {
2,318!
1422
      goto _error;
×
1423
    }
1424
  }
1425

1426
  code = filterInitFromNode((SNode*)pPhyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
1,204,197✔
1427
  if (code != TSDB_CODE_SUCCESS) {
1,202,870!
1428
    goto _error;
×
1429
  }
1430

1431
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
1,202,870✔
1432
  QUERY_CHECK_CODE(code, lino, _error);
1,205,240!
1433

1434
  pInfo->timeWindowInterpo = false;
1,205,240✔
1435
  code = timeWindowinterpNeeded(pSup->pCtx, num, pInfo, &pInfo->timeWindowInterpo);
1,205,240✔
1436
  QUERY_CHECK_CODE(code, lino, _error);
1,202,646!
1437
  if (pInfo->timeWindowInterpo) {
1,202,646✔
1438
    pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
58,413✔
1439
    if (pInfo->binfo.resultRowInfo.openWindow == NULL) {
58,413!
1440
      goto _error;
×
1441
    }
1442
  }
1443

1444
  pInfo->pOperator = pOperator;
1,202,646✔
1445
  pInfo->cleanGroupResInfo = false;
1,202,646✔
1446
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
1,202,646✔
1447
  setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED,
1,202,178✔
1448
                  pInfo, pTaskInfo);
1449

1450
  pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResultNext, NULL, destroyIntervalOperatorInfo,
1,202,644✔
1451
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1452

1453
  code = appendDownstream(pOperator, &downstream, 1);
1,202,222✔
1454
  if (code != TSDB_CODE_SUCCESS) {
1,203,976!
1455
    goto _error;
×
1456
  }
1457

1458
  *pOptrInfo = pOperator;
1,203,976✔
1459
  return TSDB_CODE_SUCCESS;
1,203,976✔
1460

1461
_error:
×
1462
  if (pInfo != NULL) {
×
1463
    destroyIntervalOperatorInfo(pInfo);
×
1464
  }
1465

1466
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1467
  pTaskInfo->code = code;
×
1468
  qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code));
×
1469
  return code;
×
1470
}
1471

1472
// todo handle multiple timeline cases. assume no timeline interweaving
1473
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) {
109,166✔
1474
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
109,166✔
1475
  SExprSupp*     pSup = &pOperator->exprSupp;
109,166✔
1476

1477
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
109,166✔
1478
  if (!pColInfoData) {
109,166!
1479
    pTaskInfo->code = terrno;
×
1480
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1481
  }
1482

1483
  bool    masterScan = true;
109,166✔
1484
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
109,166✔
1485
  int64_t gid = pBlock->info.id.groupId;
109,166✔
1486

1487
  int64_t gap = pInfo->gap;
109,166✔
1488

1489
  if (!pInfo->reptScan) {
109,166✔
1490
    pInfo->reptScan = true;
73,006✔
1491
    pInfo->winSup.prevTs = INT64_MIN;
73,006✔
1492
  }
1493

1494
  SWindowRowsSup* pRowSup = &pInfo->winSup;
109,166✔
1495
  pRowSup->numOfRows = 0;
109,166✔
1496
  pRowSup->startRowIndex = 0;
109,166✔
1497

1498
  // In case of ascending or descending order scan data, only one time window needs to be kepted for each table.
1499
  TSKEY* tsList = (TSKEY*)pColInfoData->pData;
109,166✔
1500
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
21,009,239✔
1501
    if (gid != pRowSup->groupId || pInfo->winSup.prevTs == INT64_MIN) {
20,900,016✔
1502
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
98,713✔
1503
      doKeepTuple(pRowSup, tsList[j], gid);
98,761✔
1504
    } else if (((tsList[j] - pRowSup->prevTs >= 0) && (tsList[j] - pRowSup->prevTs <= gap)) ||
20,801,303✔
1505
               ((pRowSup->prevTs - tsList[j] >= 0) && (pRowSup->prevTs - tsList[j] <= gap))) {
7,333,425✔
1506
      // The gap is less than the threshold, so it belongs to current session window that has been opened already.
1507
      doKeepTuple(pRowSup, tsList[j], gid);
13,467,968✔
1508
    } else {  // start a new session window
1509
      // start a new session window
1510
      if (pRowSup->numOfRows > 0) {  // handled data that belongs to the previous session window
7,333,335✔
1511
        SResultRow* pResult = NULL;
7,329,772✔
1512

1513
        // keep the time window for the closed time window.
1514
        STimeWindow window = pRowSup->win;
7,329,772✔
1515

1516
        int32_t ret =
1517
            setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
7,329,772✔
1518
                                   numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1519
        if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
7,329,748!
1520
          T_LONG_JMP(pTaskInfo->env, ret);
×
1521
        }
1522

1523
        // pInfo->numOfRows data belong to the current session window
1524
        updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
7,329,748✔
1525
        ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
7,329,708✔
1526
                                              pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
7,329,708✔
1527
        if (ret != TSDB_CODE_SUCCESS) {
7,329,886!
1528
          T_LONG_JMP(pTaskInfo->env, ret);
×
1529
        }
1530
      }
1531

1532
      // here we start a new session window
1533
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
7,333,449✔
1534
      doKeepTuple(pRowSup, tsList[j], gid);
7,333,397✔
1535
    }
1536
  }
1537

1538
  SResultRow* pResult = NULL;
109,223✔
1539
  pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
109,223✔
1540
  int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
109,223✔
1541
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1542
  if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
109,166!
1543
    T_LONG_JMP(pTaskInfo->env, ret);
×
1544
  }
1545

1546
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
109,166✔
1547
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
109,166✔
1548
                                        pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
109,166✔
1549
  if (ret != TSDB_CODE_SUCCESS) {
109,166!
1550
    T_LONG_JMP(pTaskInfo->env, ret);
×
1551
  }
1552
}
109,166✔
1553

1554
static int32_t doSessionWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
181,764✔
1555
  if (pOperator->status == OP_EXEC_DONE) {
181,764✔
1556
    (*ppRes) = NULL;
71,086✔
1557
    return TSDB_CODE_SUCCESS;
71,086✔
1558
  }
1559

1560
  int32_t                  code = TSDB_CODE_SUCCESS;
110,678✔
1561
  int32_t                  lino = 0;
110,678✔
1562
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
110,678✔
1563
  SSessionAggOperatorInfo* pInfo = pOperator->info;
110,678✔
1564
  SOptrBasicInfo*          pBInfo = &pInfo->binfo;
110,678✔
1565
  SExprSupp*               pSup = &pOperator->exprSupp;
110,678✔
1566

1567
  if (pOperator->status == OP_RES_TO_RETURN) {
110,678✔
1568
    while (1) {
×
1569
      doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
23,294✔
1570
      code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
23,299✔
1571
      QUERY_CHECK_CODE(code, lino, _end);
23,299!
1572

1573
      bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
23,299✔
1574
      if (!hasRemain) {
23,299✔
1575
        setOperatorCompleted(pOperator);
3,976✔
1576
        break;
3,976✔
1577
      }
1578

1579
      if (pBInfo->pRes->info.rows > 0) {
19,323!
1580
        break;
19,323✔
1581
      }
1582
    }
1583
    pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
23,299✔
1584
    (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
23,299!
1585
    return code;
23,299✔
1586
  }
1587

1588
  int64_t st = taosGetTimestampUs();
87,391✔
1589
  int32_t order = pInfo->binfo.inputTsOrder;
87,391✔
1590

1591
  SOperatorInfo* downstream = pOperator->pDownstream[0];
87,391✔
1592

1593
  pInfo->cleanGroupResInfo = false;
87,391✔
1594
  while (1) {
109,166✔
1595
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
196,557✔
1596
    if (pBlock == NULL) {
196,568✔
1597
      break;
87,402✔
1598
    }
1599

1600
    pBInfo->pRes->info.scanFlag = pBlock->info.scanFlag;
109,166✔
1601
    if (pInfo->scalarSupp.pExprInfo != NULL) {
109,166✔
1602
      SExprSupp* pExprSup = &pInfo->scalarSupp;
3✔
1603
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
3✔
1604
      QUERY_CHECK_CODE(code, lino, _end);
3!
1605
    }
1606
    // the pDataBlock are always the same one, no need to call this again
1607
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
109,166✔
1608
    QUERY_CHECK_CODE(code, lino, _end);
109,166!
1609

1610
    code = blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
109,166✔
1611
    QUERY_CHECK_CODE(code, lino, _end);
109,166!
1612

1613
    doSessionWindowAggImpl(pOperator, pInfo, pBlock);
109,166✔
1614
  }
1615

1616
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
87,402✔
1617

1618
  // restore the value
1619
  pOperator->status = OP_RES_TO_RETURN;
87,402✔
1620

1621
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
87,402✔
1622
  QUERY_CHECK_CODE(code, lino, _end);
87,405!
1623
  pInfo->cleanGroupResInfo = true;
87,405✔
1624

1625
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
87,405✔
1626
  QUERY_CHECK_CODE(code, lino, _end);
87,409!
1627
  while (1) {
×
1628
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
87,409✔
1629
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
87,407✔
1630
    QUERY_CHECK_CODE(code, lino, _end);
87,407!
1631

1632
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
87,407✔
1633
    if (!hasRemain) {
87,408✔
1634
      setOperatorCompleted(pOperator);
83,395✔
1635
      break;
83,395✔
1636
    }
1637

1638
    if (pBInfo->pRes->info.rows > 0) {
4,013!
1639
      break;
4,013✔
1640
    }
1641
  }
1642
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
87,408✔
1643

1644
_end:
87,408✔
1645
  if (code != TSDB_CODE_SUCCESS) {
87,408!
1646
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1647
    pTaskInfo->code = code;
×
1648
    T_LONG_JMP(pTaskInfo->env, code);
×
1649
  }
1650
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
87,408✔
1651
  return code;
87,408✔
1652
}
1653

1654
// todo make this as an non-blocking operator
1655
int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode,
64,568✔
1656
                                      SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
1657
  QRY_PARAM_CHECK(pOptrInfo);
64,568!
1658

1659
  int32_t                   code = TSDB_CODE_SUCCESS;
64,568✔
1660
  int32_t                   lino = 0;
64,568✔
1661
  SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
64,568!
1662
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
64,567!
1663
  if (pInfo == NULL || pOperator == NULL) {
64,568!
1664
    code = terrno;
×
1665
    goto _error;
×
1666
  }
1667

1668
  pOperator->exprSupp.hasWindowOrGroup = true;
64,568✔
1669
  int32_t      tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
64,568✔
1670
  SColumnNode* pColNode = (SColumnNode*)(pStateNode->pStateKey);
64,568✔
1671

1672
  if (pStateNode->window.pExprs != NULL) {
64,568✔
1673
    int32_t    numOfScalarExpr = 0;
3,710✔
1674
    SExprInfo* pScalarExprInfo = NULL;
3,710✔
1675
    code = createExprInfo(pStateNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
3,710✔
1676
    QUERY_CHECK_CODE(code, lino, _error);
3,710!
1677

1678
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
3,710✔
1679
    if (code != TSDB_CODE_SUCCESS) {
3,710!
1680
      goto _error;
×
1681
    }
1682
  }
1683

1684
  pInfo->stateCol = extractColumnFromColumnNode(pColNode);
64,568✔
1685
  pInfo->stateKey.type = pInfo->stateCol.type;
64,568✔
1686
  pInfo->stateKey.bytes = pInfo->stateCol.bytes;
64,568✔
1687
  pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes);
64,568!
1688
  if (pInfo->stateKey.pData == NULL) {
64,568!
1689
    goto _error;
×
1690
  }
1691
  pInfo->binfo.inputTsOrder = pStateNode->window.node.inputTsOrder;
64,568✔
1692
  pInfo->binfo.outputTsOrder = pStateNode->window.node.outputTsOrder;
64,568✔
1693

1694
  code = filterInitFromNode((SNode*)pStateNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
64,568✔
1695
  if (code != TSDB_CODE_SUCCESS) {
64,568!
1696
    goto _error;
×
1697
  }
1698

1699
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
64,568✔
1700

1701
  int32_t    num = 0;
64,568✔
1702
  SExprInfo* pExprInfo = NULL;
64,568✔
1703
  code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &num);
64,568✔
1704
  QUERY_CHECK_CODE(code, lino, _error);
64,568!
1705

1706
  initResultSizeInfo(&pOperator->resultInfo, 4096);
64,568✔
1707

1708
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
64,568✔
1709
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
64,568✔
1710
  if (code != TSDB_CODE_SUCCESS) {
64,568!
1711
    goto _error;
×
1712
  }
1713

1714
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pStateNode->window.node.pOutputDataBlockDesc);
64,568✔
1715
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
64,568!
1716
  initBasicInfo(&pInfo->binfo, pResBlock);
64,568✔
1717
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
64,568✔
1718

1719
  pInfo->twAggSup =
64,568✔
1720
      (STimeWindowAggSupp){.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};
64,568✔
1721

1722
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
64,568✔
1723
  QUERY_CHECK_CODE(code, lino, _error);
64,568!
1724

1725
  pInfo->tsSlotId = tsSlotId;
64,568✔
1726
  pInfo->pOperator = pOperator;
64,568✔
1727
  pInfo->cleanGroupResInfo = false;
64,568✔
1728
  pInfo->trueForLimit = pStateNode->trueForLimit;
64,568✔
1729
  setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
64,568✔
1730
                  pTaskInfo);
1731
  pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAggNext, NULL, destroyStateWindowOperatorInfo,
64,568✔
1732
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1733

1734
  code = appendDownstream(pOperator, &downstream, 1);
64,568✔
1735
  if (code != TSDB_CODE_SUCCESS) {
64,568!
1736
    goto _error;
×
1737
  }
1738

1739
  *pOptrInfo = pOperator;
64,568✔
1740
  return TSDB_CODE_SUCCESS;
64,568✔
1741

1742
_error:
×
1743
  if (pInfo != NULL) {
×
1744
    destroyStateWindowOperatorInfo(pInfo);
×
1745
  }
1746

1747
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1748
  pTaskInfo->code = code;
×
1749
  return code;
×
1750
}
1751

1752
void destroySWindowOperatorInfo(void* param) {
87,408✔
1753
  SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param;
87,408✔
1754
  if (pInfo == NULL) {
87,408!
1755
    return;
×
1756
  }
1757

1758
  cleanupBasicInfo(&pInfo->binfo);
87,408✔
1759
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
87,408✔
1760
  if (pInfo->pOperator) {
87,409!
1761
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
87,409✔
1762
                      pInfo->cleanGroupResInfo);
87,409✔
1763
    pInfo->pOperator = NULL;
87,409✔
1764
  }
1765

1766
  cleanupAggSup(&pInfo->aggSup);
87,409✔
1767
  cleanupExprSupp(&pInfo->scalarSupp);
87,409✔
1768

1769
  cleanupGroupResInfo(&pInfo->groupResInfo);
87,409✔
1770
  taosMemoryFreeClear(param);
87,408!
1771
}
1772

1773
int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
87,395✔
1774
                                     SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
1775
  QRY_PARAM_CHECK(pOptrInfo);
87,395!
1776

1777
  int32_t                  code = TSDB_CODE_SUCCESS;
87,395✔
1778
  int32_t                  lino = 0;
87,395✔
1779
  SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
87,395!
1780
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
87,407!
1781
  if (pInfo == NULL || pOperator == NULL) {
87,407!
1782
    code = terrno;
×
1783
    goto _error;
×
1784
  }
1785

1786
  pOperator->exprSupp.hasWindowOrGroup = true;
87,408✔
1787

1788
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
87,408✔
1789
  initResultSizeInfo(&pOperator->resultInfo, 4096);
87,408✔
1790

1791
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc);
87,407✔
1792
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
87,409!
1793
  initBasicInfo(&pInfo->binfo, pResBlock);
87,409✔
1794

1795
  int32_t      numOfCols = 0;
87,409✔
1796
  SExprInfo*   pExprInfo = NULL;
87,409✔
1797
  code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
87,409✔
1798
  QUERY_CHECK_CODE(code, lino, _error);
87,408!
1799

1800
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
87,408✔
1801
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
87,408✔
1802
  QUERY_CHECK_CODE(code, lino, _error);
87,408!
1803

1804
  pInfo->twAggSup.waterMark = pSessionNode->window.watermark;
87,408✔
1805
  pInfo->twAggSup.calTrigger = pSessionNode->window.triggerType;
87,408✔
1806
  pInfo->gap = pSessionNode->gap;
87,408✔
1807

1808
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
87,408✔
1809
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
87,408✔
1810
  QUERY_CHECK_CODE(code, lino, _error);
87,404!
1811

1812
  pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
87,404✔
1813
  pInfo->binfo.pRes = pResBlock;
87,404✔
1814
  pInfo->winSup.prevTs = INT64_MIN;
87,404✔
1815
  pInfo->reptScan = false;
87,404✔
1816
  pInfo->binfo.inputTsOrder = pSessionNode->window.node.inputTsOrder;
87,404✔
1817
  pInfo->binfo.outputTsOrder = pSessionNode->window.node.outputTsOrder;
87,404✔
1818

1819
  if (pSessionNode->window.pExprs != NULL) {
87,404✔
1820
    int32_t    numOfScalar = 0;
1✔
1821
    SExprInfo* pScalarExprInfo = NULL;
1✔
1822
    code = createExprInfo(pSessionNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
1✔
1823
    QUERY_CHECK_CODE(code, lino, _error);
1!
1824

1825
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
1✔
1826
    QUERY_CHECK_CODE(code, lino, _error);
1!
1827
  }
1828

1829
  code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
87,404✔
1830
  QUERY_CHECK_CODE(code, lino, _error);
87,403!
1831

1832
  pInfo->pOperator = pOperator;
87,403✔
1833
  pInfo->cleanGroupResInfo = false;
87,403✔
1834
  setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED,
87,403✔
1835
                  pInfo, pTaskInfo);
1836
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSessionWindowAggNext, NULL, destroySWindowOperatorInfo,
87,394✔
1837
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1838
  pOperator->pTaskInfo = pTaskInfo;
87,392✔
1839
  code = appendDownstream(pOperator, &downstream, 1);
87,392✔
1840
  QUERY_CHECK_CODE(code, lino, _error);
87,401!
1841

1842
  *pOptrInfo = pOperator;
87,401✔
1843
  return TSDB_CODE_SUCCESS;
87,401✔
1844

1845
_error:
×
1846
  if (pInfo != NULL) destroySWindowOperatorInfo(pInfo);
×
1847
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1848
  pTaskInfo->code = code;
×
1849
  return code;
×
1850
}
1851

1852
void destroyMAIOperatorInfo(void* param) {
440,679✔
1853
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = (SMergeAlignedIntervalAggOperatorInfo*)param;
440,679✔
1854
  destroyIntervalOperatorInfo(miaInfo->intervalAggOperatorInfo);
440,679✔
1855
  taosMemoryFreeClear(param);
440,680!
1856
}
440,682✔
1857

1858
static SResultRow* doSetSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, SAggSupporter* pSup) {
283,806✔
1859
  SResultRow* pResult = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
283,806✔
1860
  if (NULL == pResult) {
283,803!
1861
    return pResult;
×
1862
  }
1863
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
283,803✔
1864
  return pResult;
283,803✔
1865
}
1866

1867
static int32_t setSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult,
56,521,585✔
1868
                                       SExprSupp* pExprSup, SAggSupporter* pAggSup) {
1869
  if (*pResult == NULL) {
56,521,585✔
1870
    *pResult = doSetSingleOutputTupleBuf(pResultRowInfo, pAggSup);
283,807✔
1871
    if (*pResult == NULL) {
283,803!
1872
      return terrno;
×
1873
    }
1874
  }
1875

1876
  // set time window for current result
1877
  (*pResult)->win = (*win);
56,521,581✔
1878
  return setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
56,521,581✔
1879
}
1880

1881
static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,
344,718✔
1882
                                          SSDataBlock* pBlock, SSDataBlock* pResultBlock) {
1883
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
344,718✔
1884
  SIntervalAggOperatorInfo*             iaInfo = miaInfo->intervalAggOperatorInfo;
344,718✔
1885

1886
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
344,718✔
1887
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
344,718✔
1888
  SInterval*     pInterval = &iaInfo->interval;
344,718✔
1889

1890
  int32_t  startPos = 0;
344,718✔
1891
  int64_t* tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
344,718✔
1892

1893
  TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
344,719✔
1894

1895
  // there is an result exists
1896
  if (miaInfo->curTs != INT64_MIN) {
344,717✔
1897
    if (ts != miaInfo->curTs) {
41,251✔
1898
      finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
33,416✔
1899
      resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
33,416✔
1900
      miaInfo->curTs = ts;
33,416✔
1901
    }
1902
  } else {
1903
    miaInfo->curTs = ts;
303,466✔
1904
  }
1905

1906
  STimeWindow win = {0};
344,717✔
1907
  win.skey = miaInfo->curTs;
344,717✔
1908
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
344,717✔
1909

1910
  int32_t ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
344,723✔
1911
  if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
344,720!
1912
    T_LONG_JMP(pTaskInfo->env, ret);
1!
1913
  }
1914

1915
  int32_t currPos = startPos;
344,719✔
1916

1917
  STimeWindow currWin = win;
344,719✔
1918
  while (++currPos < pBlock->info.rows) {
80,585,928✔
1919
    if (tsCols[currPos] == miaInfo->curTs) {
80,243,224✔
1920
      continue;
24,078,089✔
1921
    }
1922

1923
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
56,165,135✔
1924
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
56,168,884✔
1925
                                          currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
56,168,884✔
1926
    if (ret != TSDB_CODE_SUCCESS) {
56,189,870!
1927
      T_LONG_JMP(pTaskInfo->env, ret);
×
1928
    }
1929

1930
    finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
56,189,870✔
1931
    resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
56,111,909✔
1932
    miaInfo->curTs = tsCols[currPos];
56,145,444✔
1933

1934
    currWin.skey = miaInfo->curTs;
56,145,444✔
1935
    currWin.ekey = taosTimeAdd(currWin.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
56,145,444✔
1936

1937
    startPos = currPos;
56,174,003✔
1938
    ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
56,174,003✔
1939
    if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
56,160,933!
1940
      T_LONG_JMP(pTaskInfo->env, ret);
×
1941
    }
1942

1943
    miaInfo->curTs = currWin.skey;
56,163,120✔
1944
  }
1945

1946
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
342,704✔
1947
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
344,725✔
1948
                                        currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
344,725✔
1949
  if (ret != TSDB_CODE_SUCCESS) {
344,725!
1950
    T_LONG_JMP(pTaskInfo->env, ret);
×
1951
  }
1952
}
344,725✔
1953

1954
static void cleanupAfterGroupResultGen(SMergeAlignedIntervalAggOperatorInfo* pMiaInfo, SSDataBlock* pRes) {
303,015✔
1955
  pRes->info.id.groupId = pMiaInfo->groupId;
303,015✔
1956
  pMiaInfo->curTs = INT64_MIN;
303,015✔
1957
  pMiaInfo->groupId = 0;
303,015✔
1958
}
303,015✔
1959

1960
static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
508,175✔
1961
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
508,175✔
1962
  int32_t                               code = TSDB_CODE_SUCCESS;
508,175✔
1963
  int32_t                               lino = 0;
508,175✔
1964
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
508,175✔
1965
  SIntervalAggOperatorInfo*             pIaInfo = pMiaInfo->intervalAggOperatorInfo;
508,175✔
1966

1967
  SExprSupp*      pSup = &pOperator->exprSupp;
508,175✔
1968
  SSDataBlock*    pRes = pIaInfo->binfo.pRes;
508,175✔
1969
  SResultRowInfo* pResultRowInfo = &pIaInfo->binfo.resultRowInfo;
508,175✔
1970
  SOperatorInfo*  downstream = pOperator->pDownstream[0];
508,175✔
1971

1972
  while (1) {
296,426✔
1973
    SSDataBlock* pBlock = NULL;
804,601✔
1974
    if (pMiaInfo->prefetchedBlock == NULL) {
804,601✔
1975
      pBlock = getNextBlockFromDownstream(pOperator, 0);
784,935✔
1976
    } else {
1977
      pBlock = pMiaInfo->prefetchedBlock;
19,666✔
1978
      pMiaInfo->prefetchedBlock = NULL;
19,666✔
1979

1980
      pMiaInfo->groupId = pBlock->info.id.groupId;
19,666✔
1981
    }
1982

1983
    // no data exists, all query processing is done
1984
    if (pBlock == NULL) {
804,603✔
1985
      // close last unclosed time window
1986
      if (pMiaInfo->curTs != INT64_MIN) {
440,219✔
1987
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
283,353✔
1988
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
283,348✔
1989
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
283,348✔
1990
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
283,350✔
1991
        QUERY_CHECK_CODE(code, lino, _end);
283,350!
1992
      }
1993

1994
      setOperatorCompleted(pOperator);
440,216✔
1995
      break;
440,221✔
1996
    }
1997

1998
    if (pMiaInfo->groupId == 0) {
364,384✔
1999
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
318,172✔
2000
        pMiaInfo->groupId = pBlock->info.id.groupId;
1,863✔
2001
        pRes->info.id.groupId = pMiaInfo->groupId;
1,863✔
2002
      }
2003
    } else {
2004
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
46,212✔
2005
        // if there are unclosed time window, close it firstly.
2006
        if (pMiaInfo->curTs == INT64_MIN) {
19,666!
2007
          pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2008
          T_LONG_JMP(pTaskInfo->env, terrno);
×
2009
        }
2010
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
19,666✔
2011
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
19,666✔
2012

2013
        pMiaInfo->prefetchedBlock = pBlock;
19,666✔
2014
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
19,666✔
2015
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
19,666✔
2016
        QUERY_CHECK_CODE(code, lino, _end);
19,666!
2017
        if (pRes->info.rows == 0) {
19,666✔
2018
          // After filtering for last group, the result is empty, so we need to continue to process next group
2019
          continue;
34✔
2020
        } else {
2021
          break;
19,632✔
2022
        }
2023
      } else {
2024
        // continue
2025
        pRes->info.id.groupId = pMiaInfo->groupId;
26,546✔
2026
      }
2027
    }
2028

2029
    pRes->info.scanFlag = pBlock->info.scanFlag;
344,718✔
2030
    code = setInputDataBlock(pSup, pBlock, pIaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
344,718✔
2031
    QUERY_CHECK_CODE(code, lino, _end);
344,721!
2032

2033
    doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes);
344,721✔
2034

2035
    code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
344,725✔
2036
    QUERY_CHECK_CODE(code, lino, _end);
344,737!
2037

2038
    if (pRes->info.rows >= pOperator->resultInfo.capacity) {
344,737✔
2039
      break;
48,345✔
2040
    }
2041
  }
2042

2043
_end:
508,198✔
2044
  if (code != TSDB_CODE_SUCCESS) {
508,198!
2045
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2046
    pTaskInfo->code = code;
×
2047
    T_LONG_JMP(pTaskInfo->env, code);
×
2048
  }
2049
}
508,198✔
2050

2051
static int32_t mergeAlignedIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,008,446✔
2052
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
1,008,446✔
2053
  int32_t                               code = TSDB_CODE_SUCCESS;
1,008,446✔
2054
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
1,008,446✔
2055
  SIntervalAggOperatorInfo*             iaInfo = pMiaInfo->intervalAggOperatorInfo;
1,008,446✔
2056
  if (pOperator->status == OP_EXEC_DONE) {
1,008,446✔
2057
    (*ppRes) = NULL;
504,987✔
2058
    return code;
504,987✔
2059
  }
2060

2061
  SSDataBlock* pRes = iaInfo->binfo.pRes;
503,459✔
2062
  blockDataCleanup(pRes);
503,459✔
2063

2064
  if (iaInfo->binfo.mergeResultBlock) {
503,458✔
2065
    while (1) {
2066
      if (pOperator->status == OP_EXEC_DONE) {
416,148✔
2067
        break;
167,386✔
2068
      }
2069

2070
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
248,762✔
2071
        break;
38,329✔
2072
      }
2073

2074
      doMergeAlignedIntervalAgg(pOperator);
210,433✔
2075
    }
2076
  } else {
2077
    doMergeAlignedIntervalAgg(pOperator);
297,745✔
2078
  }
2079

2080
  size_t rows = pRes->info.rows;
503,481✔
2081
  pOperator->resultInfo.totalRows += rows;
503,481✔
2082
  (*ppRes) = (rows == 0) ? NULL : pRes;
503,481✔
2083
  return code;
503,481✔
2084
}
2085

2086
int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
440,665✔
2087
                                                      SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2088
  QRY_PARAM_CHECK(pOptrInfo);
440,665!
2089

2090
  int32_t                               code = TSDB_CODE_SUCCESS;
440,665✔
2091
  int32_t                               lino = 0;
440,665✔
2092
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedIntervalAggOperatorInfo));
440,665!
2093
  SOperatorInfo*                        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
440,678!
2094
  if (miaInfo == NULL || pOperator == NULL) {
440,670!
2095
    code = terrno;
×
2096
    goto _error;
×
2097
  }
2098

2099
  miaInfo->intervalAggOperatorInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
440,670!
2100
  if (miaInfo->intervalAggOperatorInfo == NULL) {
440,667!
2101
    code = terrno;
×
2102
    goto _error;
×
2103
  }
2104

2105
  SInterval interval = {.interval = pNode->interval,
440,667✔
2106
                        .sliding = pNode->sliding,
440,667✔
2107
                        .intervalUnit = pNode->intervalUnit,
440,667✔
2108
                        .slidingUnit = pNode->slidingUnit,
440,667✔
2109
                        .offset = pNode->offset,
440,667✔
2110
                        .precision = ((SColumnNode*)pNode->window.pTspk)->node.resType.precision,
440,667✔
2111
                        .timeRange = pNode->timeRange};
2112
  calcIntervalAutoOffset(&interval);
440,667✔
2113

2114
  SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
440,672✔
2115
  SExprSupp*                pSup = &pOperator->exprSupp;
440,672✔
2116
  pSup->hasWindowOrGroup = true;
440,672✔
2117

2118
  code = filterInitFromNode((SNode*)pNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
440,672✔
2119
  QUERY_CHECK_CODE(code, lino, _error);
440,674!
2120

2121
  miaInfo->curTs = INT64_MIN;
440,674✔
2122
  iaInfo->win = pTaskInfo->window;
440,674✔
2123
  iaInfo->binfo.inputTsOrder = pNode->window.node.inputTsOrder;
440,674✔
2124
  iaInfo->binfo.outputTsOrder = pNode->window.node.outputTsOrder;
440,674✔
2125
  iaInfo->interval = interval;
440,674✔
2126
  iaInfo->primaryTsIndex = ((SColumnNode*)pNode->window.pTspk)->slotId;
440,674✔
2127
  iaInfo->binfo.mergeResultBlock = pNode->window.mergeDataBlock;
440,674✔
2128

2129
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
440,674✔
2130
  initResultSizeInfo(&pOperator->resultInfo, 512);
440,674✔
2131

2132
  int32_t    num = 0;
440,675✔
2133
  SExprInfo* pExprInfo = NULL;
440,675✔
2134
  code = createExprInfo(pNode->window.pFuncs, NULL, &pExprInfo, &num);
440,675✔
2135
  QUERY_CHECK_CODE(code, lino, _error);
440,676!
2136

2137
  code = initAggSup(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
440,676✔
2138
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
440,676✔
2139
  QUERY_CHECK_CODE(code, lino, _error);
440,671!
2140

2141
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pNode->window.node.pOutputDataBlockDesc);
440,671✔
2142
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
440,675!
2143
  initBasicInfo(&iaInfo->binfo, pResBlock);
440,675✔
2144
  code = initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
440,671✔
2145
  QUERY_CHECK_CODE(code, lino, _error);
440,674!
2146

2147
  iaInfo->timeWindowInterpo = false;
440,674✔
2148
  code = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo, &iaInfo->timeWindowInterpo);
440,674✔
2149
  QUERY_CHECK_CODE(code, lino, _error);
440,664!
2150
  if (iaInfo->timeWindowInterpo) {
440,664!
2151
    iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
2152
  }
2153

2154
  initResultRowInfo(&iaInfo->binfo.resultRowInfo);
440,664✔
2155
  code = blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity);
440,666✔
2156
  QUERY_CHECK_CODE(code, lino, _error);
440,676!
2157
  iaInfo->pOperator = pOperator;
440,676✔
2158
  setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL,
440,676✔
2159
                  false, OP_NOT_OPENED, miaInfo, pTaskInfo);
2160

2161
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAggNext, NULL, destroyMAIOperatorInfo,
440,678✔
2162
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2163

2164
  code = appendDownstream(pOperator, &downstream, 1);
440,677✔
2165
  QUERY_CHECK_CODE(code, lino, _error);
440,675!
2166

2167
  *pOptrInfo = pOperator;
440,675✔
2168
  return TSDB_CODE_SUCCESS;
440,675✔
2169

2170
_error:
×
2171
  if (miaInfo != NULL) destroyMAIOperatorInfo(miaInfo);
×
2172
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2173
  pTaskInfo->code = code;
×
2174
  return code;
×
2175
}
2176

2177
//=====================================================================================================================
2178
// merge interval operator
2179
typedef struct SMergeIntervalAggOperatorInfo {
2180
  SIntervalAggOperatorInfo intervalAggOperatorInfo;
2181
  SList*                   groupIntervals;
2182
  SListIter                groupIntervalsIter;
2183
  bool                     hasGroupId;
2184
  uint64_t                 groupId;
2185
  SSDataBlock*             prefetchedBlock;
2186
  bool                     inputBlocksFinished;
2187
} SMergeIntervalAggOperatorInfo;
2188

2189
typedef struct SGroupTimeWindow {
2190
  uint64_t    groupId;
2191
  STimeWindow window;
2192
} SGroupTimeWindow;
2193

2194
void destroyMergeIntervalOperatorInfo(void* param) {
×
2195
  SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
×
2196
  miaInfo->groupIntervals = tdListFree(miaInfo->groupIntervals);
×
2197
  destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo);
×
2198

2199
  taosMemoryFreeClear(param);
×
2200
}
×
2201

2202
static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock,
×
2203
                                        STimeWindow* newWin) {
2204
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
2205
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2206
  bool                           ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
2207

2208
  SGroupTimeWindow groupTimeWindow = {.groupId = tableGroupId, .window = *newWin};
×
2209
  int32_t          code = tdListAppend(miaInfo->groupIntervals, &groupTimeWindow);
×
2210
  if (code != TSDB_CODE_SUCCESS) {
×
2211
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2212
    return code;
×
2213
  }
2214

2215
  SListIter iter = {0};
×
2216
  tdListInitIter(miaInfo->groupIntervals, &iter, TD_LIST_FORWARD);
×
2217
  SListNode* listNode = NULL;
×
2218
  while ((listNode = tdListNext(&iter)) != NULL) {
×
2219
    SGroupTimeWindow* prevGrpWin = (SGroupTimeWindow*)listNode->data;
×
2220
    if (prevGrpWin->groupId != tableGroupId) {
×
2221
      continue;
×
2222
    }
2223

2224
    STimeWindow* prevWin = &prevGrpWin->window;
×
2225
    if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) {
×
2226
      SListNode* tmp = tdListPopNode(miaInfo->groupIntervals, listNode);
×
2227
      taosMemoryFreeClear(tmp);
×
2228
    }
2229
  }
2230

2231
  return TSDB_CODE_SUCCESS;
×
2232
}
2233

2234
static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
×
2235
                                   int32_t scanFlag, SSDataBlock* pResultBlock) {
2236
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
2237
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2238

2239
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
×
2240
  SExprSupp*     pExprSup = &pOperatorInfo->exprSupp;
×
2241

2242
  int32_t     startPos = 0;
×
2243
  int32_t     numOfOutput = pExprSup->numOfExprs;
×
2244
  int64_t*    tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
×
2245
  uint64_t    tableGroupId = pBlock->info.id.groupId;
×
2246
  bool        ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
2247
  TSKEY       blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
×
2248
  SResultRow* pResult = NULL;
×
2249

2250
  STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
×
2251
                                        iaInfo->binfo.inputTsOrder);
2252

2253
  int32_t ret =
2254
      setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
2255
                             numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2256
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
×
2257
    T_LONG_JMP(pTaskInfo->env, ret);
×
2258
  }
2259

2260
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
×
2261
  int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
2262
                                                 iaInfo->binfo.inputTsOrder);
2263
  if(forwardRows <= 0) {
×
2264
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2265
  }
2266

2267
  // prev time window not interpolation yet.
2268
  if (iaInfo->timeWindowInterpo) {
×
2269
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
×
2270
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
×
2271

2272
    // restore current time window
2273
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
2274
                                 numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2275
    if (ret != TSDB_CODE_SUCCESS) {
×
2276
      T_LONG_JMP(pTaskInfo->env, ret);
×
2277
    }
2278

2279
    // window start key interpolation
2280
    ret = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &win, startPos, forwardRows, pExprSup);
×
2281
    if (ret != TSDB_CODE_SUCCESS) {
×
2282
      T_LONG_JMP(pTaskInfo->env, ret);
×
2283
    }
2284
  }
2285

2286
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, 1);
×
2287
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,
×
2288
                                  pBlock->info.rows, numOfOutput);
×
2289
  if (ret != TSDB_CODE_SUCCESS) {
×
2290
    T_LONG_JMP(pTaskInfo->env, ret);
×
2291
  }
2292
  doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
2293

2294
  // output previous interval results after this interval (&win) is closed
2295
  int32_t code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &win);
×
2296
  if (code != TSDB_CODE_SUCCESS) {
×
2297
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2298
    T_LONG_JMP(pTaskInfo->env, code);
×
2299
  }
2300

2301
  STimeWindow nextWin = win;
×
2302
  while (1) {
×
2303
    int32_t prevEndPos = forwardRows - 1 + startPos;
×
2304
    startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos,
×
2305
                                      iaInfo->binfo.inputTsOrder);
2306
    if (startPos < 0) {
×
2307
      break;
×
2308
    }
2309

2310
    // null data, failed to allocate more memory buffer
2311
    code =
2312
        setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
×
2313
                               pExprSup->pCtx, numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2314
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
×
2315
      T_LONG_JMP(pTaskInfo->env, code);
×
2316
    }
2317

2318
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
×
2319
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
2320
                                           iaInfo->binfo.inputTsOrder);
2321

2322
    // window start(end) key interpolation
2323
    code = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup);
×
2324
    if (code != TSDB_CODE_SUCCESS) {
×
2325
      T_LONG_JMP(pTaskInfo->env, code);
×
2326
    }
2327

2328
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, 1);
×
2329
    code = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
×
2330
                                           forwardRows, pBlock->info.rows, numOfOutput);
×
2331
    if (code != TSDB_CODE_SUCCESS) {
×
2332
      T_LONG_JMP(pTaskInfo->env, code);
×
2333
    }
2334
    doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
2335

2336
    // output previous interval results after this interval (&nextWin) is closed
2337
    code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &nextWin);
×
2338
    if (code != TSDB_CODE_SUCCESS) {
×
2339
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2340
      T_LONG_JMP(pTaskInfo->env, code);
×
2341
    }
2342
  }
2343

2344
  if (iaInfo->timeWindowInterpo) {
×
2345
    saveDataBlockLastRow(iaInfo->pPrevValues, pBlock, iaInfo->pInterpCols);
×
2346
  }
2347
}
×
2348

2349
static int32_t doMergeIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
2350
  int32_t        code = TSDB_CODE_SUCCESS;
×
2351
  int32_t        lino = 0;
×
2352
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
×
2353

2354
  SMergeIntervalAggOperatorInfo* miaInfo = pOperator->info;
×
2355
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2356
  SExprSupp*                     pExpSupp = &pOperator->exprSupp;
×
2357

2358
  if (pOperator->status == OP_EXEC_DONE) {
×
2359
    (*ppRes) = NULL;
×
2360
    return code;
×
2361
  }
2362

2363
  SSDataBlock* pRes = iaInfo->binfo.pRes;
×
2364
  blockDataCleanup(pRes);
×
2365
  code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
×
2366
  QUERY_CHECK_CODE(code, lino, _end);
×
2367

2368
  if (!miaInfo->inputBlocksFinished) {
×
2369
    SOperatorInfo* downstream = pOperator->pDownstream[0];
×
2370
    while (1) {
×
2371
      SSDataBlock* pBlock = NULL;
×
2372
      if (miaInfo->prefetchedBlock == NULL) {
×
2373
        pBlock = getNextBlockFromDownstream(pOperator, 0);
×
2374
      } else {
2375
        pBlock = miaInfo->prefetchedBlock;
×
2376
        miaInfo->groupId = pBlock->info.id.groupId;
×
2377
        miaInfo->prefetchedBlock = NULL;
×
2378
      }
2379

2380
      if (pBlock == NULL) {
×
2381
        tdListInitIter(miaInfo->groupIntervals, &miaInfo->groupIntervalsIter, TD_LIST_FORWARD);
×
2382
        miaInfo->inputBlocksFinished = true;
×
2383
        break;
×
2384
      }
2385

2386
      if (!miaInfo->hasGroupId) {
×
2387
        miaInfo->hasGroupId = true;
×
2388
        miaInfo->groupId = pBlock->info.id.groupId;
×
2389
      } else if (miaInfo->groupId != pBlock->info.id.groupId) {
×
2390
        miaInfo->prefetchedBlock = pBlock;
×
2391
        break;
×
2392
      }
2393

2394
      pRes->info.scanFlag = pBlock->info.scanFlag;
×
2395
      code = setInputDataBlock(pExpSupp, pBlock, iaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
×
2396
      QUERY_CHECK_CODE(code, lino, _end);
×
2397

2398
      doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, pBlock->info.scanFlag, pRes);
×
2399

2400
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
×
2401
        break;
×
2402
      }
2403
    }
2404

2405
    pRes->info.id.groupId = miaInfo->groupId;
×
2406
  }
2407

2408
  if (miaInfo->inputBlocksFinished) {
×
2409
    SListNode* listNode = tdListNext(&miaInfo->groupIntervalsIter);
×
2410

2411
    if (listNode != NULL) {
×
2412
      SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data);
×
2413
      pRes->info.id.groupId = grpWin->groupId;
×
2414
    }
2415
  }
2416

2417
  if (pRes->info.rows == 0) {
×
2418
    setOperatorCompleted(pOperator);
×
2419
  }
2420

2421
  size_t rows = pRes->info.rows;
×
2422
  pOperator->resultInfo.totalRows += rows;
×
2423

2424
_end:
×
2425
  if (code != TSDB_CODE_SUCCESS) {
×
2426
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2427
    pTaskInfo->code = code;
×
2428
    T_LONG_JMP(pTaskInfo->env, code);
×
2429
  }
2430
  (*ppRes) = (rows == 0) ? NULL : pRes;
×
2431
  return code;
×
2432
}
2433

2434
int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
×
2435
                                               SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2436
  QRY_PARAM_CHECK(pOptrInfo);
×
2437

2438
  int32_t                        code = TSDB_CODE_SUCCESS;
×
2439
  int32_t                        lino = 0;
×
2440
  SMergeIntervalAggOperatorInfo* pMergeIntervalInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo));
×
2441
  SOperatorInfo*                 pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
2442
  if (pMergeIntervalInfo == NULL || pOperator == NULL) {
×
2443
    code = terrno;
×
2444
    goto _error;
×
2445
  }
2446

2447
  SInterval interval = {.interval = pIntervalPhyNode->interval,
×
2448
                        .sliding = pIntervalPhyNode->sliding,
×
2449
                        .intervalUnit = pIntervalPhyNode->intervalUnit,
×
2450
                        .slidingUnit = pIntervalPhyNode->slidingUnit,
×
2451
                        .offset = pIntervalPhyNode->offset,
×
2452
                        .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision,
×
2453
                        .timeRange = pIntervalPhyNode->timeRange};
2454
  calcIntervalAutoOffset(&interval);
×
2455

2456
  pMergeIntervalInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow));
×
2457

2458
  SIntervalAggOperatorInfo* pIntervalInfo = &pMergeIntervalInfo->intervalAggOperatorInfo;
×
2459
  pIntervalInfo->win = pTaskInfo->window;
×
2460
  pIntervalInfo->binfo.inputTsOrder = pIntervalPhyNode->window.node.inputTsOrder;
×
2461
  pIntervalInfo->interval = interval;
×
2462
  pIntervalInfo->binfo.mergeResultBlock = pIntervalPhyNode->window.mergeDataBlock;
×
2463
  pIntervalInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
×
2464
  pIntervalInfo->binfo.outputTsOrder = pIntervalPhyNode->window.node.outputTsOrder;
×
2465

2466
  SExprSupp* pExprSupp = &pOperator->exprSupp;
×
2467
  pExprSupp->hasWindowOrGroup = true;
×
2468

2469
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
×
2470
  initResultSizeInfo(&pOperator->resultInfo, 4096);
×
2471

2472
  int32_t    num = 0;
×
2473
  SExprInfo* pExprInfo = NULL;
×
2474
  code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
×
2475
  QUERY_CHECK_CODE(code, lino, _error);
×
2476

2477
  code = initAggSup(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
×
2478
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
×
2479
  if (code != TSDB_CODE_SUCCESS) {
×
2480
    goto _error;
×
2481
  }
2482

2483
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pIntervalPhyNode->window.node.pOutputDataBlockDesc);
×
2484
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
×
2485
  initBasicInfo(&pIntervalInfo->binfo, pResBlock);
×
2486
  code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win);
×
2487
  QUERY_CHECK_CODE(code, lino, _error);
×
2488

2489
  pIntervalInfo->timeWindowInterpo = false;
×
2490
  code = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo, &pIntervalInfo->timeWindowInterpo);
×
2491
  QUERY_CHECK_CODE(code, lino, _error);
×
2492
  if (pIntervalInfo->timeWindowInterpo) {
×
2493
    pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
2494
    if (pIntervalInfo->binfo.resultRowInfo.openWindow == NULL) {
×
2495
      goto _error;
×
2496
    }
2497
  }
2498

2499
  pIntervalInfo->pOperator = pOperator;
×
2500
  initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo);
×
2501
  setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false,
×
2502
                  OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo);
2503
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAggNext, NULL, destroyMergeIntervalOperatorInfo,
×
2504
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2505

2506
  code = appendDownstream(pOperator, &downstream, 1);
×
2507
  if (code != TSDB_CODE_SUCCESS) {
×
2508
    goto _error;
×
2509
  }
2510

2511
  *pOptrInfo = pOperator;
×
2512
  return TSDB_CODE_SUCCESS;
×
2513
_error:
×
2514
  if (pMergeIntervalInfo != NULL) {
×
2515
    destroyMergeIntervalOperatorInfo(pMergeIntervalInfo);
×
2516
  }
2517
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2518
  pTaskInfo->code = code;
×
2519
  return code;
×
2520
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc