• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3608

12 Feb 2025 05:57AM UTC coverage: 63.066% (+1.4%) from 61.715%
#3608

push

travis-ci

web-flow
Merge pull request #29746 from taosdata/merge/mainto3.02

merge: from main to 3.0 branch

140199 of 286257 branches covered (48.98%)

Branch coverage included in aggregate %.

89 of 161 new or added lines in 18 files covered. (55.28%)

3211 existing lines in 190 files now uncovered.

218998 of 283298 relevant lines covered (77.3%)

5949310.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.8
/source/libs/executor/src/timewindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "executorInt.h"
16
#include "filter.h"
17
#include "function.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "querytask.h"
21
#include "tchecksum.h"
22
#include "tcommon.h"
23
#include "tcompare.h"
24
#include "tdatablock.h"
25
#include "tfill.h"
26
#include "tglobal.h"
27
#include "tlog.h"
28
#include "ttime.h"
29

30
typedef struct SSessionAggOperatorInfo {
31
  SOptrBasicInfo     binfo;
32
  SAggSupporter      aggSup;
33
  SExprSupp          scalarSupp;  // supporter for perform scalar function
34
  SGroupResInfo      groupResInfo;
35
  SWindowRowsSup     winSup;
36
  bool               reptScan;  // next round scan
37
  int64_t            gap;       // session window gap
38
  int32_t            tsSlotId;  // primary timestamp slot id
39
  STimeWindowAggSupp twAggSup;
40
  SOperatorInfo*     pOperator;
41
  bool               cleanGroupResInfo;
42
} SSessionAggOperatorInfo;
43

44
typedef struct SStateWindowOperatorInfo {
45
  SOptrBasicInfo     binfo;
46
  SAggSupporter      aggSup;
47
  SExprSupp          scalarSup;
48
  SGroupResInfo      groupResInfo;
49
  SWindowRowsSup     winSup;
50
  SColumn            stateCol;  // start row index
51
  bool               hasKey;
52
  SStateKeys         stateKey;
53
  int32_t            tsSlotId;  // primary timestamp column slot id
54
  STimeWindowAggSupp twAggSup;
55
  SOperatorInfo*     pOperator;
56
  bool               cleanGroupResInfo;
57
} SStateWindowOperatorInfo;
58

59
typedef enum SResultTsInterpType {
60
  RESULT_ROW_START_INTERP = 1,
61
  RESULT_ROW_END_INTERP = 2,
62
} SResultTsInterpType;
63

64
typedef struct SOpenWindowInfo {
65
  SResultRowPosition pos;
66
  uint64_t           groupId;
67
} SOpenWindowInfo;
68

69
static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo);
70

71
static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult,
72
                                              uint64_t groupId, SExecTaskInfo* pTaskInfo);
73
static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult);
74

75
static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, bool masterscan,
96,982,191✔
76
                                      SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
77
                                      int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
78
                                      SExecTaskInfo* pTaskInfo) {
79
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
96,982,191✔
80
                                                  masterscan, tableGroupId, pTaskInfo, true, pAggSup, true);
81

82
  if (pResultRow == NULL || pTaskInfo->code != 0) {
97,788,070!
83
    *pResult = NULL;
×
84
    qError("failed to set result output buffer, error:%s", tstrerror(pTaskInfo->code));
×
85
    return pTaskInfo->code;
×
86
  }
87

88
  // set time window for current result
89
  pResultRow->win = (*win);
97,885,332✔
90

91
  *pResult = pResultRow;
97,885,332✔
92
  return setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
97,885,332✔
93
}
94

95
void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) {
391,435✔
96
  pRowSup->win.ekey = ts;
391,435✔
97
  pRowSup->prevTs = ts;
391,435✔
98
  pRowSup->numOfRows += 1;
391,435✔
99
  pRowSup->groupId = groupId;
391,435✔
100
}
391,435✔
101

102
void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex,
163,125✔
103
                                     uint64_t groupId) {
104
  pRowSup->startRowIndex = rowIndex;
163,125✔
105
  pRowSup->numOfRows = 0;
163,125✔
106
  pRowSup->win.skey = tsList[rowIndex];
163,125✔
107
  pRowSup->groupId = groupId;
163,125✔
108
}
163,125✔
109

110
FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos,
×
111
                                            int32_t order, int64_t* pData) {
112
  int32_t forwardRows = 0;
101,367,683✔
113

114
  if (order == TSDB_ORDER_ASC) {
×
115
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
86,432,034✔
116
    if (end >= 0) {
84,479,027!
117
      forwardRows = end;
84,571,038✔
118

119
      while (pData[end + pos] == ekey) {
84,773,855!
120
        forwardRows += 1;
202,817✔
121
        ++pos;
202,817✔
122
      }
123
    }
124
  } else {
125
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
14,935,649✔
126
    if (end >= 0) {
15,660,373!
127
      forwardRows = end;
15,659,668✔
128

129
      while (pData[end + pos] == ekey) {
31,326,550!
130
        forwardRows += 1;
15,666,882✔
131
        ++pos;
15,666,882✔
132
      }
133
    }
134
    //    int32_t end = searchFn((char*)pData, pos + 1, ekey, order);
135
    //    if (end >= 0) {
136
    //      forwardRows = pos - end;
137
    //
138
    //      if (pData[end] == ekey) {
139
    //        forwardRows += 1;
140
    //      }
141
    //    }
142
  }
143

144
  return forwardRows;
100,139,400✔
145
}
146

147
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
108,371,784✔
148
  int32_t midPos = -1;
108,371,784✔
149
  int32_t numOfRows;
150

151
  if (num <= 0) {
108,371,784!
152
    return -1;
×
153
  }
154

155
  TSKEY*  keyList = (TSKEY*)pValue;
108,371,784✔
156
  int32_t firstPos = 0;
108,371,784✔
157
  int32_t lastPos = num - 1;
108,371,784✔
158

159
  if (order == TSDB_ORDER_DESC) {
108,371,784✔
160
    // find the first position which is smaller than the key
161
    while (1) {
162
      if (key >= keyList[firstPos]) return firstPos;
25,603,475✔
163
      if (key == keyList[lastPos]) return lastPos;
10,900,902✔
164

165
      if (key < keyList[lastPos]) {
10,562,195✔
166
        lastPos += 1;
17,532✔
167
        if (lastPos >= num) {
17,532!
168
          return -1;
×
169
        } else {
170
          return lastPos;
17,532✔
171
        }
172
      }
173

174
      numOfRows = lastPos - firstPos + 1;
10,544,663✔
175
      midPos = (numOfRows >> 1) + firstPos;
10,544,663✔
176

177
      if (key < keyList[midPos]) {
10,544,663✔
178
        firstPos = midPos + 1;
705,299✔
179
      } else if (key > keyList[midPos]) {
9,839,364✔
180
        lastPos = midPos - 1;
9,260,078✔
181
      } else {
182
        break;
579,286✔
183
      }
184
    }
185

186
  } else {
187
    // find the first position which is bigger than the key
188
    while (1) {
189
      if (key <= keyList[firstPos]) return firstPos;
823,950,906✔
190
      if (key == keyList[lastPos]) return lastPos;
800,455,139✔
191

192
      if (key > keyList[lastPos]) {
799,914,268✔
193
        lastPos = lastPos + 1;
66,817,548✔
194
        if (lastPos >= num)
66,817,548!
195
          return -1;
×
196
        else
197
          return lastPos;
66,817,548✔
198
      }
199

200
      numOfRows = lastPos - firstPos + 1;
733,096,720✔
201
      midPos = (numOfRows >> 1u) + firstPos;
733,096,720✔
202

203
      if (key < keyList[midPos]) {
733,096,720✔
204
        lastPos = midPos - 1;
654,837,496✔
205
      } else if (key > keyList[midPos]) {
78,259,224✔
206
        firstPos = midPos + 1;
76,379,724✔
207
      } else {
208
        break;
1,879,500✔
209
      }
210
    }
211
  }
212

213
  return midPos;
2,458,786✔
214
}
215

216
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
102,832,963✔
217
                                 __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order) {
218
  int32_t num = -1;
102,832,963✔
219
  int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
102,832,963✔
220

221
  if (order == TSDB_ORDER_ASC) {
102,832,963✔
222
    if (ekey < pDataBlockInfo->window.ekey && pPrimaryColumn) {
87,404,942!
223
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
85,758,007!
224
      if (item != NULL) {
84,479,027!
225
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
226
      }
227
    } else {
228
      num = pDataBlockInfo->rows - startPos;
1,646,935✔
229
      if (item != NULL) {
1,646,935!
230
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
231
      }
232
    }
233
  } else {  // desc
234
    if (ekey > pDataBlockInfo->window.skey && pPrimaryColumn) {
15,428,021!
235
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
15,609,676!
236
      if (item != NULL) {
15,660,373!
237
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
238
      }
239
    } else {
UNCOV
240
      num = pDataBlockInfo->rows - startPos;
×
UNCOV
241
      if (item != NULL) {
×
242
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
243
      }
244
    }
245
  }
246

247
  return num;
101,604,680✔
248
}
249

250
void doTimeWindowInterpolation(SArray* pPrevValues, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs,
23,666✔
251
                               int32_t curRowIndex, TSKEY windowKey, int32_t type, SExprSupp* pSup) {
252
  SqlFunctionCtx* pCtx = pSup->pCtx;
23,666✔
253

254
  int32_t index = 1;
23,666✔
255
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
113,225✔
256
    if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) {
89,559✔
257
      pCtx[k].start.key = INT64_MIN;
57,413✔
258
      continue;
57,413✔
259
    }
260

261
    SFunctParam*     pParam = &pCtx[k].param[0];
32,146✔
262
    SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, pParam->pCol->slotId);
32,146✔
263

264
    double v1 = 0, v2 = 0, v = 0;
32,146✔
265
    if (prevRowIndex == -1) {
32,146!
266
      SGroupKeys* p = taosArrayGet(pPrevValues, index);
×
267
      GET_TYPED_DATA(v1, double, pColInfo->info.type, p->pData);
×
268
    } else {
269
      GET_TYPED_DATA(v1, double, pColInfo->info.type, colDataGetData(pColInfo, prevRowIndex));
32,146!
270
    }
271

272
    GET_TYPED_DATA(v2, double, pColInfo->info.type, colDataGetData(pColInfo, curRowIndex));
32,146!
273

274
#if 0
275
    if (functionId == FUNCTION_INTERP) {
276
      if (type == RESULT_ROW_START_INTERP) {
277
        pCtx[k].start.key = prevTs;
278
        pCtx[k].start.val = v1;
279

280
        pCtx[k].end.key = curTs;
281
        pCtx[k].end.val = v2;
282

283
        if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_VARBINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR ||
284
            pColInfo->info.type == TSDB_DATA_TYPE_GEOMETRY) {
285
          if (prevRowIndex == -1) {
286
            //            pCtx[k].start.ptr = (char*)pRuntimeEnv->prevRow[index];
287
          } else {
288
            pCtx[k].start.ptr = (char*)pColInfo->pData + prevRowIndex * pColInfo->info.bytes;
289
          }
290

291
          pCtx[k].end.ptr = (char*)pColInfo->pData + curRowIndex * pColInfo->info.bytes;
292
        }
293
      }
294
    } else if (functionId == FUNCTION_TWA) {
295
#endif
296

297
    SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
32,146✔
298
    SPoint point2 = (SPoint){.key = curTs, .val = &v2};
32,146✔
299
    SPoint point = (SPoint){.key = windowKey, .val = &v};
32,146✔
300

301
    if (!fmIsElapsedFunc(pCtx[k].functionId)) {
32,146✔
302
      taosGetLinearInterpolationVal(&point, TSDB_DATA_TYPE_DOUBLE, &point1, &point2, TSDB_DATA_TYPE_DOUBLE);
27,507✔
303
    }
304

305
    if (type == RESULT_ROW_START_INTERP) {
32,146✔
306
      pCtx[k].start.key = point.key;
15,882✔
307
      pCtx[k].start.val = v;
15,882✔
308
    } else {
309
      pCtx[k].end.key = point.key;
16,264✔
310
      pCtx[k].end.val = v;
16,264✔
311
    }
312

313
    index += 1;
32,146✔
314
  }
315
#if 0
316
  }
317
#endif
318
}
23,666✔
319

320
static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t type) {
2,436✔
321
  if (type == RESULT_ROW_START_INTERP) {
2,436✔
322
    for (int32_t k = 0; k < numOfOutput; ++k) {
7,752✔
323
      pCtx[k].start.key = INT64_MIN;
6,343✔
324
    }
325
  } else {
326
    for (int32_t k = 0; k < numOfOutput; ++k) {
6,887✔
327
      pCtx[k].end.key = INT64_MIN;
5,860✔
328
    }
329
  }
330
}
2,436✔
331

332
static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, int32_t pos, SSDataBlock* pBlock,
13,047✔
333
                                              const TSKEY* tsCols, STimeWindow* win, SExprSupp* pSup) {
334
  bool ascQuery = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
13,047✔
335

336
  TSKEY curTs = tsCols[pos];
13,047✔
337

338
  SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
13,047✔
339
  TSKEY       lastTs = *(int64_t*)pTsKey->pData;
13,047✔
340

341
  // lastTs == INT64_MIN and pos == 0 means this is the first time window, interpolation is not needed.
342
  // start exactly from this point, no need to do interpolation
343
  TSKEY key = ascQuery ? win->skey : win->ekey;
13,047!
344
  if (key == curTs) {
13,047✔
345
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
995✔
346
    return true;
995✔
347
  }
348

349
  // it is the first time window, no need to do interpolation
350
  if (pTsKey->isNull && pos == 0) {
12,052!
351
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
410✔
352
  } else {
353
    TSKEY prevTs = ((pos == 0) ? lastTs : tsCols[pos - 1]);
11,642!
354
    doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, pos - 1, curTs, pos, key,
11,642✔
355
                              RESULT_ROW_START_INTERP, pSup);
356
  }
357

358
  return true;
12,052✔
359
}
360

361
static int32_t setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex,
13,051✔
362
                                            int32_t nextRowIndex, SArray* pDataBlock, const TSKEY* tsCols,
363
                                            TSKEY blockEkey, STimeWindow* win, bool* pRes) {
364
  int32_t code = TSDB_CODE_SUCCESS;
13,051✔
365
  int32_t lino = 0;
13,051✔
366
  int32_t order = pInfo->binfo.inputTsOrder;
13,051✔
367

368
  TSKEY actualEndKey = tsCols[endRowIndex];
13,051✔
369
  TSKEY key = (order == TSDB_ORDER_ASC) ? win->ekey : win->skey;
13,051!
370

371
  // not ended in current data block, do not invoke interpolation
372
  if ((key > blockEkey && (order == TSDB_ORDER_ASC)) || (key < blockEkey && (order == TSDB_ORDER_DESC))) {
13,051!
373
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
501✔
374
    (*pRes) = false;
501✔
375
    return code;
501✔
376
  }
377

378
  // there is actual end point of current time window, no interpolation needs
379
  if (key == actualEndKey) {
12,550✔
380
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
526✔
381
    (*pRes) = true;
526✔
382
    return code;
526✔
383
  }
384

385
  if (nextRowIndex < 0) {
12,024!
386
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
387
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
388
  }
389

390
  TSKEY nextKey = tsCols[nextRowIndex];
12,024✔
391
  doTimeWindowInterpolation(pInfo->pPrevValues, pDataBlock, actualEndKey, endRowIndex, nextKey, nextRowIndex, key,
12,024✔
392
                            RESULT_ROW_END_INTERP, pSup);
393
  (*pRes) = true;
12,024✔
394
  return code;
12,024✔
395
}
396

397
bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd, EStreamType blockType) {
109,031,585✔
398
  if (pInterval->interval != pInterval->sliding &&
109,031,585✔
399
      ((pWin->ekey < calStart || pWin->skey > calEnd) || (blockType == STREAM_PULL_DATA && pWin->skey < calStart))) {
7,512,714!
400
    return false;
75✔
401
  }
402

403
  return true;
109,031,510✔
404
}
405

406
bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo) {
108,676,900✔
407
  return inCalSlidingWindow(pInterval, pWin, pBlockInfo->calWin.skey, pBlockInfo->calWin.ekey, pBlockInfo->type);
108,676,900✔
408
}
409

410
int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
104,094,302✔
411
                               TSKEY* primaryKeys, int32_t prevPosition, int32_t order) {
412
  bool ascQuery = (order == TSDB_ORDER_ASC);
104,094,302✔
413

414
  int32_t precision = pInterval->precision;
104,094,302✔
415
  getNextTimeWindow(pInterval, pNext, order);
104,094,302✔
416

417
  // next time window is not in current block
418
  if ((pNext->skey > pDataBlockInfo->window.ekey && order == TSDB_ORDER_ASC) ||
103,502,839!
419
      (pNext->ekey < pDataBlockInfo->window.skey && order == TSDB_ORDER_DESC)) {
101,906,429✔
420
    return -1;
1,608,183✔
421
  }
422

423
  if (!inSlidingWindow(pInterval, pNext, pDataBlockInfo) && order == TSDB_ORDER_ASC) {
101,894,656!
424
    return -1;
30✔
425
  }
426

427
  TSKEY   skey = ascQuery ? pNext->skey : pNext->ekey;
102,011,209✔
428
  int32_t startPos = 0;
102,011,209✔
429

430
  // tumbling time window query, a special case of sliding time window query
431
  if (pInterval->sliding == pInterval->interval && prevPosition != -1) {
102,011,209!
432
    startPos = prevPosition + 1;
94,609,612✔
433
  } else {
434
    if ((skey <= pDataBlockInfo->window.skey && ascQuery) || (skey >= pDataBlockInfo->window.ekey && !ascQuery)) {
7,401,597!
435
      startPos = 0;
210,952✔
436
    } else {
437
      startPos = binarySearchForKey((char*)primaryKeys, pDataBlockInfo->rows, skey, order);
7,190,645✔
438
    }
439
  }
440

441
  /* interp query with fill should not skip time window */
442
  //  if (pQueryAttr->pointInterpQuery && pQueryAttr->fillType != TSDB_FILL_NONE) {
443
  //    return startPos;
444
  //  }
445

446
  /*
447
   * This time window does not cover any data, try next time window,
448
   * this case may happen when the time window is too small
449
   */
450
  if (primaryKeys != NULL) {
101,393,988✔
451
    if (ascQuery && primaryKeys[startPos] > pNext->ekey) {
145,243,849✔
452
      TSKEY next = primaryKeys[startPos];
43,848,272✔
453
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
43,848,272✔
454
        pNext->skey = taosTimeTruncate(next, pInterval);
12,582✔
455
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
×
456
      } else {
457
        pNext->ekey += ((next - pNext->ekey + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
43,835,690✔
458
        pNext->skey = pNext->ekey - pInterval->interval + 1;
43,835,690✔
459
      }
460
    } else if ((!ascQuery) && primaryKeys[startPos] < pNext->skey) {
57,508,971✔
461
      TSKEY next = primaryKeys[startPos];
14,499,701✔
462
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
14,499,701!
463
        pNext->skey = taosTimeTruncate(next, pInterval);
×
464
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
×
465
      } else {
466
        pNext->skey -= ((pNext->skey - next + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
14,516,872✔
467
        pNext->ekey = pNext->skey + pInterval->interval - 1;
14,516,872✔
468
      }
469
    }
470
  }
471

472
  return startPos;
101,449,493✔
473
}
474

475
static bool isResultRowInterpolated(SResultRow* pResult, SResultTsInterpType type) {
39,153✔
476
  if (type == RESULT_ROW_START_INTERP) {
39,153✔
477
    return pResult->startInterp == true;
13,051✔
478
  } else {
479
    return pResult->endInterp == true;
26,102✔
480
  }
481
}
482

483
static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
25,597✔
484
  if (type == RESULT_ROW_START_INTERP) {
25,597✔
485
    pResult->startInterp = true;
13,047✔
486
  } else {
487
    pResult->endInterp = true;
12,550✔
488
  }
489
}
25,597✔
490

491
static int32_t doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataBlock* pBlock, SResultRow* pResult,
94,960,625✔
492
                                           STimeWindow* win, int32_t startPos, int32_t forwardRows, SExprSupp* pSup) {
493
  int32_t code = TSDB_CODE_SUCCESS;
94,960,625✔
494
  int32_t lino = 0;
94,960,625✔
495
  if (!pInfo->timeWindowInterpo) {
94,960,625!
496
    return code;
95,343,922✔
497
  }
498

UNCOV
499
  if (pBlock == NULL) {
×
500
    code = TSDB_CODE_INVALID_PARA;
×
501
    return code;
×
502
  }
503

UNCOV
504
  if (pBlock->pDataBlock == NULL) {
×
505
    return code;
×
506
  }
507

UNCOV
508
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
×
509

510
  TSKEY* tsCols = (TSKEY*)(pColInfo->pData);
13,051✔
511
  bool   done = isResultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
13,051✔
512
  if (!done) {  // it is not interpolated, now start to generated the interpolated value
13,051✔
513
    bool interp = setTimeWindowInterpolationStartTs(pInfo, startPos, pBlock, tsCols, win, pSup);
13,047✔
514
    if (interp) {
13,047!
515
      setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
13,047✔
516
    }
517
  } else {
518
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
4✔
519
  }
520

521
  // point interpolation does not require the end key time window interpolation.
522
  // interpolation query does not generate the time window end interpolation
523
  done = isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
13,051✔
524
  if (!done) {
13,051!
525
    int32_t endRowIndex = startPos + forwardRows - 1;
13,051✔
526
    int32_t nextRowIndex = endRowIndex + 1;
13,051✔
527

528
    // duplicated ts row does not involve in the interpolation of end value for current time window
529
    int32_t x = endRowIndex;
13,051✔
530
    while (x > 0) {
13,069✔
531
      if (tsCols[x] == tsCols[x - 1]) {
12,600✔
532
        x -= 1;
18✔
533
      } else {
534
        endRowIndex = x;
12,582✔
535
        break;
12,582✔
536
      }
537
    }
538

539
    TSKEY endKey = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey;
13,051!
540
    bool  interp = false;
13,051✔
541
    code = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, nextRowIndex, pBlock->pDataBlock, tsCols,
13,051✔
542
                                           endKey, win, &interp);
543
    QUERY_CHECK_CODE(code, lino, _end);
13,051!
544
    if (interp) {
13,051✔
545
      setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
12,550✔
546
    }
547
  } else {
548
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
×
549
  }
550

551
_end:
13,051✔
552
  if (code != TSDB_CODE_SUCCESS) {
13,051!
553
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
554
  }
555
  return code;
13,051✔
556
}
557

558
static void saveDataBlockLastRow(SArray* pPrevKeys, const SSDataBlock* pBlock, SArray* pCols) {
501✔
559
  if (pBlock->pDataBlock == NULL) {
501!
560
    return;
×
561
  }
562

563
  size_t num = taosArrayGetSize(pPrevKeys);
501✔
564
  for (int32_t k = 0; k < num; ++k) {
2,269✔
565
    SColumn* pc = taosArrayGet(pCols, k);
1,768✔
566

567
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pc->slotId);
1,768✔
568

569
    SGroupKeys* pkey = taosArrayGet(pPrevKeys, k);
1,768✔
570
    for (int32_t i = pBlock->info.rows - 1; i >= 0; --i) {
1,768!
571
      if (colDataIsNull_s(pColInfo, i)) {
3,536!
572
        continue;
×
573
      }
574

575
      char* val = colDataGetData(pColInfo, i);
1,768!
576
      if (IS_VAR_DATA_TYPE(pkey->type)) {
1,768!
577
        memcpy(pkey->pData, val, varDataTLen(val));
×
578
      } else {
579
        memcpy(pkey->pData, val, pkey->bytes);
1,768✔
580
      }
581

582
      break;
1,768✔
583
    }
584
  }
585
}
586

587
static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t numOfExprs, SResultRowInfo* pResultRowInfo,
501✔
588
                                       SSDataBlock* pBlock, int32_t scanFlag, int64_t* tsCols, SResultRowPosition* p) {
589
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
501✔
590

591
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
501✔
592
  SExprSupp*                pSup = &pOperatorInfo->exprSupp;
501✔
593

594
  int32_t startPos = 0;
501✔
595
  int32_t numOfOutput = pSup->numOfExprs;
501✔
596

597
  SResultRow* pResult = NULL;
501✔
598

599
  while (1) {
×
600
    SListNode*          pn = tdListGetHead(pResultRowInfo->openWindow);
501✔
601
    SOpenWindowInfo*    pOpenWin = (SOpenWindowInfo*)pn->data;
501✔
602
    uint64_t            groupId = pOpenWin->groupId;
501✔
603
    SResultRowPosition* p1 = &pOpenWin->pos;
501✔
604
    if (p->pageId == p1->pageId && p->offset == p1->offset) {
501!
605
      break;
501✔
606
    }
607

608
    SResultRow* pr = getResultRowByPos(pInfo->aggSup.pResultBuf, p1, false);
×
609
    if (NULL == pr) {
×
610
      T_LONG_JMP(pTaskInfo->env, terrno);
×
611
    }
612

613
    if (!(pr->offset == p1->offset && pr->pageId == p1->pageId)) {
×
614
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
615
      T_LONG_JMP(pTaskInfo->env, terrno);
×
616
    }
617

618
    if (pr->closed) {
×
619
      if (!(isResultRowInterpolated(pr, RESULT_ROW_START_INTERP) &&
×
620
             isResultRowInterpolated(pr, RESULT_ROW_END_INTERP)) ) {
×
621
        pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
622
        T_LONG_JMP(pTaskInfo->env, terrno);
×
623
      }
624
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
×
625
      taosMemoryFree(pNode);
×
626
      continue;
×
627
    }
628

629
    STimeWindow w = pr->win;
×
630
    int32_t     ret = setTimeWindowOutputBuf(pResultRowInfo, &w, (scanFlag == MAIN_SCAN), &pResult, groupId, pSup->pCtx,
×
631
                                             numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
632
    if (ret != TSDB_CODE_SUCCESS) {
×
633
      T_LONG_JMP(pTaskInfo->env, ret);
×
634
    }
635

636
    if(isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
×
637
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
638
      T_LONG_JMP(pTaskInfo->env, terrno);
×
639
    }
640

641
    SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
×
642
    if (!pTsKey) {
×
643
      pTaskInfo->code = terrno;
×
644
      T_LONG_JMP(pTaskInfo->env, terrno);
×
645
    }
646

647
    int64_t     prevTs = *(int64_t*)pTsKey->pData;
×
648
    if (groupId == pBlock->info.id.groupId) {
×
649
      TSKEY curTs = pBlock->info.window.skey;
×
650
      if (tsCols != NULL) {
×
651
        curTs = tsCols[startPos];
×
652
      }
653
      doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, curTs, startPos, w.ekey,
×
654
                                RESULT_ROW_END_INTERP, pSup);
655
    }
656

657
    setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
×
658
    setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
×
659

660
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, 1);
×
661
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0,
×
662
                                          pBlock->info.rows, numOfExprs);
×
663
    if (ret != TSDB_CODE_SUCCESS) {
×
664
      T_LONG_JMP(pTaskInfo->env, ret);
×
665
    }
666

667
    if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
×
668
      closeResultRow(pr);
×
669
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
×
670
      taosMemoryFree(pNode);
×
671
    } else {  // the remains are can not be closed yet.
672
      break;
×
673
    }
674
  }
675
}
501✔
676

677
static bool tsKeyCompFn(void* l, void* r, void* param) {
11,928,830✔
678
  TSKEY*                    lTS = (TSKEY*)l;
11,928,830✔
679
  TSKEY*                    rTS = (TSKEY*)r;
11,928,830✔
680
  SIntervalAggOperatorInfo* pInfo = param;
11,928,830✔
681
  return pInfo->binfo.outputTsOrder == ORDER_ASC ? *lTS < *rTS : *lTS > *rTS;
11,928,830✔
682
}
683

684
static bool isCalculatedWin(SIntervalAggOperatorInfo* pInfo, const STimeWindow* win, uint64_t tableGroupId) {
3,453,532✔
685
  char keyBuf[sizeof(TSKEY) + sizeof(uint64_t)] = {0};
3,453,532✔
686
  SET_RES_WINDOW_KEY(keyBuf, (char*)&win->skey, sizeof(TSKEY), tableGroupId);
3,453,532✔
687
  return tSimpleHashGet(pInfo->aggSup.pResultRowHashTable, keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))) != NULL;
3,453,532✔
688
}
689

690
/**
691
 * @brief check if cur window should be filtered out by limit info
692
 * @retval true if should be filtered out
693
 * @retval false if not filtering out
694
 * @note If no limit info, we skip filtering.
695
 *       If input/output ts order mismatch, we skip filtering too.
696
 *       eg. input ts order: desc, and output ts order: asc, limit: 10
697
 *       IntervalOperator should output the first 10 windows, however, we can't find the first 10 windows until we scan
698
 *       every tuple in every block.
699
 *       And the boundedQueue keeps refreshing all records with smaller ts key.
700
 */
701
static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STimeWindow* win, uint64_t groupId, SExecTaskInfo* pTaskInfo) {
96,413,470✔
702
  int32_t code = TSDB_CODE_SUCCESS;
96,413,470✔
703
  int32_t lino = 0;
96,413,470✔
704
  if (!pOperatorInfo->limited  // if no limit info, no filter will be applied
96,413,470✔
705
      || pOperatorInfo->binfo.inputTsOrder != pOperatorInfo->binfo.outputTsOrder
10,156,658✔
706
      // if input/output ts order mismatch, no filter
707
  ) {
708
    return false;
92,843,230✔
709
  }
710

711
  if (pOperatorInfo->limit == 0) return true;
3,570,240✔
712

713
  if (pOperatorInfo->pBQ == NULL) {
3,569,625✔
714
    pOperatorInfo->pBQ = createBoundedQueue(pOperatorInfo->limit - 1, tsKeyCompFn, taosAutoMemoryFree, pOperatorInfo);
11,781✔
715
    QUERY_CHECK_NULL(pOperatorInfo->pBQ, code, lino, _end, terrno);
11,786!
716
  }
717

718
  bool shouldFilter = false;
3,569,630✔
719
  // if BQ has been full, compare it with top of BQ
720
  if (taosBQSize(pOperatorInfo->pBQ) == taosBQMaxSize(pOperatorInfo->pBQ) + 1) {
3,569,630✔
721
    PriorityQueueNode* top = taosBQTop(pOperatorInfo->pBQ);
1,569,207✔
722
    shouldFilter = tsKeyCompFn(top->data, &win->skey, pOperatorInfo);
1,567,604✔
723
  }
724
  if (shouldFilter) {
3,542,764✔
725
    return true;
87,747✔
726
  } else if (isCalculatedWin(pOperatorInfo, win, groupId)) {
3,455,017✔
727
    return false;
2,042,163✔
728
  }
729

730
  // cur win not been filtered out and not been pushed into BQ yet, push it into BQ
731
  PriorityQueueNode node = {.data = taosMemoryMalloc(sizeof(TSKEY))};
1,482,035!
732
  QUERY_CHECK_NULL(node.data, code, lino, _end, terrno);
1,510,883!
733

734
  *((TSKEY*)node.data) = win->skey;
1,510,883✔
735

736
  if (NULL == taosBQPush(pOperatorInfo->pBQ, &node)) {
1,510,883!
737
    taosMemoryFree(node.data);
×
738
    return true;
×
739
  }
740

741
_end:
1,407,555✔
742
  if (code != TSDB_CODE_SUCCESS) {
1,407,555!
743
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
744
    pTaskInfo->code = code;
×
745
    T_LONG_JMP(pTaskInfo->env, code);
×
746
  }
747
  return false;
1,407,555✔
748
}
749

750
static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
949,479✔
751
                            int32_t scanFlag) {
752
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
949,479✔
753

754
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
949,479✔
755
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
949,479✔
756

757
  int32_t     startPos = 0;
949,479✔
758
  int32_t     numOfOutput = pSup->numOfExprs;
949,479✔
759
  int64_t*    tsCols = extractTsCol(pBlock, pInfo, pTaskInfo);
949,479✔
760
  uint64_t    tableGroupId = pBlock->info.id.groupId;
949,388✔
761
  bool        ascScan = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
949,388✔
762
  TSKEY       ts = getStartTsKey(&pBlock->info.window, tsCols);
949,388✔
763
  SResultRow* pResult = NULL;
949,364✔
764

765
  if (tableGroupId != pInfo->curGroupId) {
949,364✔
766
    pInfo->handledGroupNum += 1;
106,324✔
767
    if (pInfo->slimited && pInfo->handledGroupNum > pInfo->slimit) {
106,324✔
768
      return true;
139✔
769
    } else {
770
      pInfo->curGroupId = tableGroupId;
106,185✔
771
      destroyBoundedQueue(pInfo->pBQ);
106,185✔
772
      pInfo->pBQ = NULL;
106,176✔
773
    }
774
  }
775

776
  STimeWindow win =
777
      getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->binfo.inputTsOrder);
949,216✔
778
  if (filterWindowWithLimit(pInfo, &win, tableGroupId, pTaskInfo)) return false;
949,080✔
779

780
  int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
865,399✔
781
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
782
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
865,456!
783
    T_LONG_JMP(pTaskInfo->env, ret);
11!
784
  }
785

786
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
865,445✔
787
  int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
865,445✔
788
                                                 pInfo->binfo.inputTsOrder);
789

790
  // prev time window not interpolation yet.
791
  if (pInfo->timeWindowInterpo) {
865,240✔
792
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
501✔
793
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
501✔
794

795
    // restore current time window
796
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
501✔
797
                                 numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
798
    if (ret != TSDB_CODE_SUCCESS) {
501!
799
      T_LONG_JMP(pTaskInfo->env, ret);
×
800
    }
801

802
    // window start key interpolation
803
    ret = doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup);
501✔
804
    if (ret != TSDB_CODE_SUCCESS) {
501!
805
      T_LONG_JMP(pTaskInfo->env, ret);
×
806
    }
807
  }
808

809
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, 1);
865,240✔
810
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
865,223✔
811
                                  pBlock->info.rows, numOfOutput);
865,223✔
812
  if (ret != TSDB_CODE_SUCCESS) {
865,557!
813
    T_LONG_JMP(pTaskInfo->env, ret);
×
814
  }
815

816
  doCloseWindow(pResultRowInfo, pInfo, pResult);
865,557✔
817

818
  STimeWindow nextWin = win;
865,506✔
819
  while (1) {
96,483,682✔
820
    int32_t prevEndPos = forwardRows - 1 + startPos;
97,349,188✔
821
    startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos,
97,349,188✔
822
                                      pInfo->binfo.inputTsOrder);
823
    if (startPos < 0 || filterWindowWithLimit(pInfo, &nextWin, tableGroupId, pTaskInfo)) {
96,074,325!
824
      break;
825
    }
826
    // null data, failed to allocate more memory buffer
827
    int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
95,562,217✔
828
                                          pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
829
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
94,936,355!
830
      T_LONG_JMP(pTaskInfo->env, code);
×
831
    }
832

833
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
95,026,604✔
834
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
95,026,604✔
835
                                           pInfo->binfo.inputTsOrder);
836
    // window start(end) key interpolation
837
    code = doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
94,394,046✔
838
    if (code != TSDB_CODE_SUCCESS) {
95,276,999!
839
      T_LONG_JMP(pTaskInfo->env, code);
×
840
    }
841
    // TODO: add to open window? how to close the open windows after input blocks exhausted?
842
#if 0
843
    if ((ascScan && ekey <= pBlock->info.window.ekey) ||
844
        (!ascScan && ekey >= pBlock->info.window.skey)) {
845
      // window start(end) key interpolation
846
      doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
847
    } else if (pInfo->timeWindowInterpo) {
848
      addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
849
    }
850
#endif
851
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, 1);
95,276,999✔
852
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
95,372,525✔
853
                                          pBlock->info.rows, numOfOutput);
95,372,525✔
854
    if (ret != TSDB_CODE_SUCCESS) {
96,466,559!
855
      T_LONG_JMP(pTaskInfo->env, ret);
×
856
    }
857
    doCloseWindow(pResultRowInfo, pInfo, pResult);
96,466,559✔
858
  }
859

860
  if (pInfo->timeWindowInterpo) {
686,621✔
861
    saveDataBlockLastRow(pInfo->pPrevValues, pBlock, pInfo->pInterpCols);
501✔
862
  }
863
  return false;
865,510✔
864
}
865

866
void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult) {
97,338,981✔
867
  // current result is done in computing final results.
868
  if (pInfo->timeWindowInterpo && isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
97,338,981✔
869
    closeResultRow(pResult);
12,550✔
870
    SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
12,550✔
871
    taosMemoryFree(pNode);
12,550!
872
  }
873
}
97,338,981✔
874

875
SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId,
501✔
876
                                       SExecTaskInfo* pTaskInfo) {
877
  int32_t         code = TSDB_CODE_SUCCESS;
501✔
878
  int32_t         lino = 0;
501✔
879
  SOpenWindowInfo openWin = {0};
501✔
880
  openWin.pos.pageId = pResult->pageId;
501✔
881
  openWin.pos.offset = pResult->offset;
501✔
882
  openWin.groupId = groupId;
501✔
883
  SListNode* pn = tdListGetTail(pResultRowInfo->openWindow);
501✔
884
  if (pn == NULL) {
501✔
885
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
499✔
886
    QUERY_CHECK_CODE(code, lino, _end);
499!
887
    return openWin.pos;
499✔
888
  }
889

890
  SOpenWindowInfo* px = (SOpenWindowInfo*)pn->data;
2✔
891
  if (px->pos.pageId != openWin.pos.pageId || px->pos.offset != openWin.pos.offset || px->groupId != openWin.groupId) {
2!
892
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
×
893
    QUERY_CHECK_CODE(code, lino, _end);
×
894
  }
895

896
_end:
2✔
897
  if (code != TSDB_CODE_SUCCESS) {
2!
898
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
899
    pTaskInfo->code = code;
×
900
    T_LONG_JMP(pTaskInfo->env, code);
×
901
  }
902
  return openWin.pos;
2✔
903
}
904

905
int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo) {
995,722✔
906
  TSKEY* tsCols = NULL;
995,722✔
907

908
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
995,722!
909
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
995,742✔
910
    if (!pColDataInfo) {
995,647!
911
      pTaskInfo->code = terrno;
×
912
      T_LONG_JMP(pTaskInfo->env, terrno);
×
913
    }
914

915
    tsCols = (int64_t*)pColDataInfo->pData;
995,647✔
916
    if(tsCols[0] == 0) {
995,647✔
917
      qWarn("%s at line %d.block start ts:%" PRId64 ",end ts:%" PRId64, __func__, __LINE__, tsCols[0], tsCols[pBlock->info.rows - 1]);
1!
918
    }
919

920
    if (tsCols[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
995,659!
921
      int32_t code = blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
98,765✔
922
      if (code != TSDB_CODE_SUCCESS) {
98,766!
923
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
924
        pTaskInfo->code = code;
×
925
        T_LONG_JMP(pTaskInfo->env, code);
×
926
      }
927
    }
928
  }
929

930
  return tsCols;
995,640✔
931
}
932

933
static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
263,914✔
934
  if (OPTR_IS_OPENED(pOperator)) {
263,914✔
935
    return TSDB_CODE_SUCCESS;
201,219✔
936
  }
937

938
  int32_t        code = TSDB_CODE_SUCCESS;
62,695✔
939
  int32_t        lino = 0;
62,695✔
940
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
62,695✔
941
  SOperatorInfo* downstream = pOperator->pDownstream[0];
62,695✔
942

943
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
62,695✔
944
  SExprSupp*                pSup = &pOperator->exprSupp;
62,695✔
945

946
  int32_t scanFlag = MAIN_SCAN;
62,695✔
947
  int64_t st = taosGetTimestampUs();
62,751✔
948

949
  pInfo->cleanGroupResInfo = false;
62,751✔
950
  while (1) {
949,167✔
951
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
1,011,918✔
952
    if (pBlock == NULL) {
1,012,051✔
953
      break;
62,631✔
954
    }
955

956
    pInfo->binfo.pRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
949,420✔
957

958
    if (pInfo->scalarSupp.pExprInfo != NULL) {
949,420✔
959
      SExprSupp* pExprSup = &pInfo->scalarSupp;
105,846✔
960
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
105,846✔
961
      QUERY_CHECK_CODE(code, lino, _end);
105,838!
962
    }
963

964
    // the pDataBlock are always the same one, no need to call this again
965
    code = setInputDataBlock(pSup, pBlock, pInfo->binfo.inputTsOrder, scanFlag, true);
949,412✔
966
    QUERY_CHECK_CODE(code, lino, _end);
949,492!
967
    if (hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag)) break;
949,492✔
968
  }
969

970
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->binfo.outputTsOrder);
62,770✔
971
  QUERY_CHECK_CODE(code, lino, _end);
62,774!
972
  pInfo->cleanGroupResInfo = true;
62,774✔
973

974
  OPTR_SET_OPENED(pOperator);
62,774✔
975

976
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
62,776✔
977

978
_end:
62,776✔
979
  if (code != TSDB_CODE_SUCCESS) {
62,776!
980
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
981
    pTaskInfo->code = code;
×
982
    T_LONG_JMP(pTaskInfo->env, code);
×
983
  }
984
  return code;
62,776✔
985
}
986

987
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock) {
8,233✔
988
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
8,233✔
989
  SExprSupp*     pSup = &pOperator->exprSupp;
8,233✔
990

991
  SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->stateCol.slotId);
8,233✔
992
  if (!pStateColInfoData) {
8,235!
993
    pTaskInfo->code = terrno;
×
994
    T_LONG_JMP(pTaskInfo->env, terrno);
×
995
  }
996
  int64_t          gid = pBlock->info.id.groupId;
8,235✔
997

998
  bool    masterScan = true;
8,235✔
999
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
8,235✔
1000
  int32_t bytes = pStateColInfoData->info.bytes;
8,235✔
1001

1002
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
8,235✔
1003
  if (!pColInfoData) {
8,233!
1004
    pTaskInfo->code = terrno;
×
1005
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1006
  }
1007
  TSKEY*           tsList = (TSKEY*)pColInfoData->pData;
8,233✔
1008

1009
  SWindowRowsSup* pRowSup = &pInfo->winSup;
8,233✔
1010
  pRowSup->numOfRows = 0;
8,233✔
1011
  pRowSup->startRowIndex = 0;
8,233✔
1012

1013
  struct SColumnDataAgg* pAgg = NULL;
8,233✔
1014
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
165,666✔
1015
    pAgg = (pBlock->pBlockAgg != NULL) ? &pBlock->pBlockAgg[pInfo->stateCol.slotId] : NULL;
157,431!
1016
    if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pAgg)) {
314,862✔
1017
      continue;
7,930✔
1018
    }
1019

1020
    char* val = colDataGetData(pStateColInfoData, j);
149,501!
1021

1022
    if (gid != pRowSup->groupId || !pInfo->hasKey) {
149,501✔
1023
      // todo extract method
1024
      if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) {
7,425!
1025
        varDataCopy(pInfo->stateKey.pData, val);
393✔
1026
      } else {
1027
        memcpy(pInfo->stateKey.pData, val, bytes);
7,032✔
1028
      }
1029

1030
      pInfo->hasKey = true;
7,425✔
1031

1032
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
7,425✔
1033
      doKeepTuple(pRowSup, tsList[j], gid);
7,427✔
1034
    } else if (compareVal(val, &pInfo->stateKey)) {
142,076✔
1035
      doKeepTuple(pRowSup, tsList[j], gid);
42,809✔
1036
    } else {  // a new state window started
1037
      SResultRow* pResult = NULL;
99,264✔
1038

1039
      // keep the time window for the closed time window.
1040
      STimeWindow window = pRowSup->win;
99,264✔
1041

1042
      pRowSup->win.ekey = pRowSup->win.skey;
99,264✔
1043
      int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
99,264✔
1044
                                           numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1045
      if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
99,267!
1046
        T_LONG_JMP(pTaskInfo->env, ret);
×
1047
      }
1048

1049
      updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
99,267✔
1050
      ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData,
99,266✔
1051
                                            pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
99,266✔
1052
      if (ret != TSDB_CODE_SUCCESS) {
99,265!
1053
        T_LONG_JMP(pTaskInfo->env, ret);
×
1054
      }
1055

1056
      // here we start a new session window
1057
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
99,265✔
1058
      doKeepTuple(pRowSup, tsList[j], gid);
99,265✔
1059

1060
      // todo extract method
1061
      if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) {
99,267!
1062
        varDataCopy(pInfo->stateKey.pData, val);
×
1063
      } else {
1064
        memcpy(pInfo->stateKey.pData, val, bytes);
99,267✔
1065
      }
1066
    }
1067
  }
1068

1069
  SResultRow* pResult = NULL;
8,235✔
1070
  pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
8,235✔
1071
  int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
8,235✔
1072
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1073
  if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
8,235!
1074
    T_LONG_JMP(pTaskInfo->env, ret);
×
1075
  }
1076

1077
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
8,235✔
1078
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
8,235✔
1079
                                        pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
8,235✔
1080
  if (ret != TSDB_CODE_SUCCESS) {
8,235!
1081
    T_LONG_JMP(pTaskInfo->env, ret);
×
1082
  }
1083
}
8,235✔
1084

1085
static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) {
8,228✔
1086
  if (OPTR_IS_OPENED(pOperator)) {
8,228✔
1087
    return TSDB_CODE_SUCCESS;
1,377✔
1088
  }
1089

1090
  int32_t                   code = TSDB_CODE_SUCCESS;
6,851✔
1091
  int32_t                   lino = 0;
6,851✔
1092
  SStateWindowOperatorInfo* pInfo = pOperator->info;
6,851✔
1093
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
6,851✔
1094

1095
  SExprSupp* pSup = &pOperator->exprSupp;
6,851✔
1096
  int32_t    order = pInfo->binfo.inputTsOrder;
6,851✔
1097
  int64_t    st = taosGetTimestampUs();
6,851✔
1098

1099
  SOperatorInfo* downstream = pOperator->pDownstream[0];
6,851✔
1100
  pInfo->cleanGroupResInfo = false;
6,851✔
1101
  while (1) {
8,235✔
1102
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
15,086✔
1103
    if (pBlock == NULL) {
15,086✔
1104
      break;
6,851✔
1105
    }
1106

1107
    pInfo->binfo.pRes->info.scanFlag = pBlock->info.scanFlag;
8,235✔
1108
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
8,235✔
1109
    QUERY_CHECK_CODE(code, lino, _end);
8,235!
1110

1111
    code = blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
8,235✔
1112
    QUERY_CHECK_CODE(code, lino, _end);
8,235!
1113

1114
    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
1115
    if (pInfo->scalarSup.pExprInfo != NULL) {
8,235✔
1116
      pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
5,667✔
1117
                                              pInfo->scalarSup.numOfExprs, NULL);
1118
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
5,665!
1119
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1120
      }
1121
    }
1122

1123
    doStateWindowAggImpl(pOperator, pInfo, pBlock);
8,233✔
1124
  }
1125

1126
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
6,851✔
1127
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
6,851✔
1128
  QUERY_CHECK_CODE(code, lino, _end);
6,851!
1129
  pInfo->cleanGroupResInfo = true;
6,851✔
1130
  pOperator->status = OP_RES_TO_RETURN;
6,851✔
1131

1132
_end:
6,851✔
1133
  if (code != TSDB_CODE_SUCCESS) {
6,851!
1134
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1135
    pTaskInfo->code = code;
×
1136
    T_LONG_JMP(pTaskInfo->env, code);
×
1137
  }
1138
  return code;
6,851✔
1139
}
1140

1141
static int32_t doStateWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
14,376✔
1142
  if (pOperator->status == OP_EXEC_DONE) {
14,376✔
1143
    (*ppRes) = NULL;
6,148✔
1144
    return TSDB_CODE_SUCCESS;
6,148✔
1145
  }
1146

1147
  int32_t                   code = TSDB_CODE_SUCCESS;
8,228✔
1148
  int32_t                   lino = 0;
8,228✔
1149
  SStateWindowOperatorInfo* pInfo = pOperator->info;
8,228✔
1150
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
8,228✔
1151
  SOptrBasicInfo*           pBInfo = &pInfo->binfo;
8,228✔
1152

1153
  code = pOperator->fpSet._openFn(pOperator);
8,228✔
1154
  QUERY_CHECK_CODE(code, lino, _end);
8,228!
1155

1156
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
8,228✔
1157
  QUERY_CHECK_CODE(code, lino, _end);
8,228!
1158

1159
  while (1) {
×
1160
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
8,228✔
1161
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
8,228✔
1162
    QUERY_CHECK_CODE(code, lino, _end);
8,228!
1163

1164
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
8,228✔
1165
    if (!hasRemain) {
8,228✔
1166
      setOperatorCompleted(pOperator);
6,851✔
1167
      break;
6,851✔
1168
    }
1169

1170
    if (pBInfo->pRes->info.rows > 0) {
1,377!
1171
      break;
1,377✔
1172
    }
1173
  }
1174

1175
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
8,228✔
1176

1177
_end:
8,228✔
1178
  if (code != TSDB_CODE_SUCCESS) {
8,228!
1179
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1180
    pTaskInfo->code = code;
×
1181
    T_LONG_JMP(pTaskInfo->env, code);
×
1182
  }
1183
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
8,228✔
1184
  return code;
8,228✔
1185
}
1186

1187
static int32_t doBuildIntervalResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
298,166✔
1188
  int32_t                   code = TSDB_CODE_SUCCESS;
298,166✔
1189
  int32_t                   lino = 0;
298,166✔
1190
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
298,166✔
1191
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
298,166✔
1192

1193
  if (pOperator->status == OP_EXEC_DONE) {
298,166✔
1194
    (*ppRes) = NULL;
34,246✔
1195
    return code;
34,246✔
1196
  }
1197

1198
  SSDataBlock* pBlock = pInfo->binfo.pRes;
263,920✔
1199
  code = pOperator->fpSet._openFn(pOperator);
263,920✔
1200
  QUERY_CHECK_CODE(code, lino, _end);
263,983!
1201

1202
  while (1) {
15✔
1203
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
263,998✔
1204
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
263,925✔
1205
    QUERY_CHECK_CODE(code, lino, _end);
263,929!
1206

1207
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
263,929✔
1208
    if (!hasRemain) {
263,922✔
1209
      setOperatorCompleted(pOperator);
62,711✔
1210
      break;
62,722✔
1211
    }
1212

1213
    if (pBlock->info.rows > 0) {
201,211✔
1214
      break;
201,196✔
1215
    }
1216
  }
1217

1218
  size_t rows = pBlock->info.rows;
263,918✔
1219
  pOperator->resultInfo.totalRows += rows;
263,918✔
1220

1221
_end:
263,918✔
1222
  if (code != TSDB_CODE_SUCCESS) {
263,918!
1223
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1224
    pTaskInfo->code = code;
×
1225
    T_LONG_JMP(pTaskInfo->env, code);
×
1226
  }
1227
  (*ppRes) = (rows == 0) ? NULL : pBlock;
263,918✔
1228
  return code;
263,918✔
1229
}
1230

1231
static void destroyStateWindowOperatorInfo(void* param) {
6,851✔
1232
  if (param == NULL) {
6,851!
1233
    return;
×
1234
  }
1235
  SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
6,851✔
1236
  cleanupBasicInfo(&pInfo->binfo);
6,851✔
1237
  taosMemoryFreeClear(pInfo->stateKey.pData);
6,851!
1238
  if (pInfo->pOperator) {
6,851!
1239
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
6,851✔
1240
                      pInfo->cleanGroupResInfo);
6,851✔
1241
    pInfo->pOperator = NULL;
6,851✔
1242
  }
1243

1244
  cleanupExprSupp(&pInfo->scalarSup);
6,851✔
1245
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
6,851✔
1246
  cleanupAggSup(&pInfo->aggSup);
6,851✔
1247
  cleanupGroupResInfo(&pInfo->groupResInfo);
6,851✔
1248

1249
  taosMemoryFreeClear(param);
6,851!
1250
}
1251

1252
static void freeItem(void* param) {
1,756✔
1253
  SGroupKeys* pKey = (SGroupKeys*)param;
1,756✔
1254
  taosMemoryFree(pKey->pData);
1,756!
1255
}
1,756✔
1256

1257
void destroyIntervalOperatorInfo(void* param) {
71,095✔
1258
  if (param == NULL) {
71,095!
1259
    return;
×
1260
  }
1261

1262
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
71,095✔
1263

1264
  cleanupBasicInfo(&pInfo->binfo);
71,095✔
1265
  if (pInfo->pOperator) {
71,105!
1266
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
71,105✔
1267
                      pInfo->cleanGroupResInfo);
71,105✔
1268
    pInfo->pOperator = NULL;
71,096✔
1269
  }
1270

1271
  cleanupAggSup(&pInfo->aggSup);
71,096✔
1272
  cleanupExprSupp(&pInfo->scalarSupp);
71,101✔
1273

1274
  pInfo->binfo.resultRowInfo.openWindow = tdListFree(pInfo->binfo.resultRowInfo.openWindow);
71,101✔
1275

1276
  taosArrayDestroy(pInfo->pInterpCols);
71,101✔
1277
  pInfo->pInterpCols = NULL;
71,101✔
1278

1279
  taosArrayDestroyEx(pInfo->pPrevValues, freeItem);
71,101✔
1280
  pInfo->pPrevValues = NULL;
71,097✔
1281

1282
  cleanupGroupResInfo(&pInfo->groupResInfo);
71,097✔
1283
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
71,104✔
1284
  destroyBoundedQueue(pInfo->pBQ);
71,106✔
1285
  taosMemoryFreeClear(param);
71,099!
1286
}
1287

1288
static int32_t timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SIntervalAggOperatorInfo* pInfo,
71,067✔
1289
                                   bool* pRes) {
1290
  // the primary timestamp column
1291
  bool    needed = false;
71,067✔
1292
  int32_t code = TSDB_CODE_SUCCESS;
71,067✔
1293
  int32_t lino = 0;
71,067✔
1294
  void*   tmp = NULL;
71,067✔
1295

1296
  for (int32_t i = 0; i < numOfCols; ++i) {
350,735✔
1297
    SExprInfo* pExpr = pCtx[i].pExpr;
280,168✔
1298
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
280,168✔
1299
      needed = true;
495✔
1300
      break;
495✔
1301
    }
1302
  }
1303

1304
  if (needed) {
71,062✔
1305
    pInfo->pInterpCols = taosArrayInit(4, sizeof(SColumn));
495✔
1306
    QUERY_CHECK_NULL(pInfo->pInterpCols, code, lino, _end, terrno);
495!
1307

1308
    pInfo->pPrevValues = taosArrayInit(4, sizeof(SGroupKeys));
495✔
1309
    QUERY_CHECK_NULL(pInfo->pPrevValues, code, lino, _end, terrno);
495!
1310

1311
    {  // ts column
1312
      SColumn c = {0};
495✔
1313
      c.colId = 1;
495✔
1314
      c.slotId = pInfo->primaryTsIndex;
495✔
1315
      c.type = TSDB_DATA_TYPE_TIMESTAMP;
495✔
1316
      c.bytes = sizeof(int64_t);
495✔
1317
      tmp = taosArrayPush(pInfo->pInterpCols, &c);
495✔
1318
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
495!
1319

1320
      SGroupKeys key;
1321
      key.bytes = c.bytes;
495✔
1322
      key.type = c.type;
495✔
1323
      key.isNull = true;  // to denote no value is assigned yet
495✔
1324
      key.pData = taosMemoryCalloc(1, c.bytes);
495!
1325
      QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
495!
1326

1327
      tmp = taosArrayPush(pInfo->pPrevValues, &key);
495✔
1328
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
495!
1329
    }
1330
  }
1331

1332
  for (int32_t i = 0; i < numOfCols; ++i) {
353,837✔
1333
    SExprInfo* pExpr = pCtx[i].pExpr;
282,765✔
1334

1335
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
282,765✔
1336
      SFunctParam* pParam = &pExpr->base.pParam[0];
1,250✔
1337

1338
      SColumn c = *pParam->pCol;
1,250✔
1339
      tmp = taosArrayPush(pInfo->pInterpCols, &c);
1,250✔
1340
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1,261!
1341

1342
      SGroupKeys key = {0};
1,261✔
1343
      key.bytes = c.bytes;
1,261✔
1344
      key.type = c.type;
1,261✔
1345
      key.isNull = false;
1,261✔
1346
      key.pData = taosMemoryCalloc(1, c.bytes);
1,261!
1347
      QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
1,261!
1348

1349
      tmp = taosArrayPush(pInfo->pPrevValues, &key);
1,261✔
1350
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1,261!
1351
    }
1352
  }
1353

1354
_end:
71,072✔
1355
  if (code != TSDB_CODE_SUCCESS) {
71,072!
1356
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1357
  }
1358
  *pRes = needed;
71,076✔
1359
  return code;
71,076✔
1360
}
1361

1362
int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
62,732✔
1363
                                   SOperatorInfo** pOptrInfo) {
1364
  QRY_PARAM_CHECK(pOptrInfo);
62,732!
1365

1366
  int32_t                   code = TSDB_CODE_SUCCESS;
62,732✔
1367
  int32_t                   lino = 0;
62,732✔
1368
  SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
62,732!
1369
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
62,764!
1370
  if (pInfo == NULL || pOperator == NULL) {
62,756!
1371
    code = terrno;
×
1372
    lino = __LINE__;
×
1373
    goto _error;
×
1374
  }
1375

1376
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->window.node.pOutputDataBlockDesc);
62,759✔
1377
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
62,773!
1378
  initBasicInfo(&pInfo->binfo, pResBlock);
62,773✔
1379

1380
  SExprSupp* pSup = &pOperator->exprSupp;
62,762✔
1381
  pSup->hasWindowOrGroup = true;
62,762✔
1382

1383
  pInfo->primaryTsIndex = ((SColumnNode*)pPhyNode->window.pTspk)->slotId;
62,762✔
1384

1385
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
62,762✔
1386
  initResultSizeInfo(&pOperator->resultInfo, 512);
62,762✔
1387
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
62,768✔
1388
  QUERY_CHECK_CODE(code, lino, _error);
62,775!
1389

1390
  int32_t    num = 0;
62,775✔
1391
  SExprInfo* pExprInfo = NULL;
62,775✔
1392
  code = createExprInfo(pPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
62,775✔
1393
  QUERY_CHECK_CODE(code, lino, _error);
62,767!
1394

1395
  code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
62,767✔
1396
                    &pTaskInfo->storageAPI.functionStore);
1397
  QUERY_CHECK_CODE(code, lino, _error);
62,757!
1398

1399
  SInterval interval = {.interval = pPhyNode->interval,
62,757✔
1400
                        .sliding = pPhyNode->sliding,
62,757✔
1401
                        .intervalUnit = pPhyNode->intervalUnit,
62,757✔
1402
                        .slidingUnit = pPhyNode->slidingUnit,
62,757✔
1403
                        .offset = pPhyNode->offset,
62,757✔
1404
                        .precision = ((SColumnNode*)pPhyNode->window.pTspk)->node.resType.precision,
62,757✔
1405
                        .timeRange = pPhyNode->timeRange};
1406
  calcIntervalAutoOffset(&interval);
62,757✔
1407

1408
  STimeWindowAggSupp as = {
62,760✔
1409
      .waterMark = pPhyNode->window.watermark,
62,760✔
1410
      .calTrigger = pPhyNode->window.triggerType,
62,760✔
1411
      .maxTs = INT64_MIN,
1412
  };
1413

1414
  pInfo->win = pTaskInfo->window;
62,760✔
1415
  pInfo->binfo.inputTsOrder = pPhyNode->window.node.inputTsOrder;
62,760✔
1416
  pInfo->binfo.outputTsOrder = pPhyNode->window.node.outputTsOrder;
62,760✔
1417
  pInfo->interval = interval;
62,760✔
1418
  pInfo->twAggSup = as;
62,760✔
1419
  pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock;
62,760✔
1420
  if (pPhyNode->window.node.pLimit && ((SLimitNode*)pPhyNode->window.node.pLimit)->limit) {
62,760!
1421
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pLimit;
6,125✔
1422
    pInfo->limited = true;
6,125✔
1423
    pInfo->limit = pLimit->limit->datum.i + (pLimit->offset ? pLimit->offset->datum.i : 0);
6,125✔
1424
  }
1425
  if (pPhyNode->window.node.pSlimit && ((SLimitNode*)pPhyNode->window.node.pSlimit)->limit) {
62,760!
1426
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pSlimit;
560✔
1427
    pInfo->slimited = true;
560✔
1428
    pInfo->slimit = pLimit->limit->datum.i + (pLimit->offset ? pLimit->offset->datum.i : 0);
560✔
1429
    pInfo->curGroupId = UINT64_MAX;
560✔
1430
  }
1431

1432
  if (pPhyNode->window.pExprs != NULL) {
62,760✔
1433
    int32_t    numOfScalar = 0;
3,714✔
1434
    SExprInfo* pScalarExprInfo = NULL;
3,714✔
1435
    code = createExprInfo(pPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
3,714✔
1436
    QUERY_CHECK_CODE(code, lino, _error);
3,712!
1437

1438
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
3,712✔
1439
    if (code != TSDB_CODE_SUCCESS) {
3,715!
1440
      goto _error;
×
1441
    }
1442
  }
1443

1444
  code = filterInitFromNode((SNode*)pPhyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
62,761✔
1445
  if (code != TSDB_CODE_SUCCESS) {
62,744!
1446
    goto _error;
×
1447
  }
1448

1449
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
62,744✔
1450
  QUERY_CHECK_CODE(code, lino, _error);
62,767!
1451

1452
  pInfo->timeWindowInterpo = false;
62,767✔
1453
  code = timeWindowinterpNeeded(pSup->pCtx, num, pInfo, &pInfo->timeWindowInterpo);
62,767✔
1454
  QUERY_CHECK_CODE(code, lino, _error);
62,750!
1455
  if (pInfo->timeWindowInterpo) {
62,750✔
1456
    pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
495✔
1457
    if (pInfo->binfo.resultRowInfo.openWindow == NULL) {
495!
1458
      goto _error;
×
1459
    }
1460
  }
1461

1462
  pInfo->pOperator = pOperator;
62,750✔
1463
  pInfo->cleanGroupResInfo = false;
62,750✔
1464
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
62,750✔
1465
  setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED,
62,725✔
1466
                  pInfo, pTaskInfo);
1467

1468
  pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResultNext, NULL, destroyIntervalOperatorInfo,
62,727✔
1469
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1470

1471
  code = appendDownstream(pOperator, &downstream, 1);
62,735✔
1472
  if (code != TSDB_CODE_SUCCESS) {
62,744!
1473
    goto _error;
×
1474
  }
1475

1476
  *pOptrInfo = pOperator;
62,744✔
1477
  return TSDB_CODE_SUCCESS;
62,744✔
1478

1479
_error:
×
1480
  if (pInfo != NULL) {
×
1481
    destroyIntervalOperatorInfo(pInfo);
×
1482
  }
1483

1484
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1485
  pTaskInfo->code = code;
×
1486
  qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code));
×
1487
  return code;
×
1488
}
1489

1490
// todo handle multiple timeline cases. assume no timeline interweaving
1491
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) {
12,374✔
1492
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
12,374✔
1493
  SExprSupp*     pSup = &pOperator->exprSupp;
12,374✔
1494

1495
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
12,374✔
1496
  if (!pColInfoData) {
12,374!
1497
    pTaskInfo->code = terrno;
×
1498
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1499
  }
1500

1501
  bool    masterScan = true;
12,374✔
1502
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
12,374✔
1503
  int64_t gid = pBlock->info.id.groupId;
12,374✔
1504

1505
  int64_t gap = pInfo->gap;
12,374✔
1506

1507
  if (!pInfo->reptScan) {
12,374✔
1508
    pInfo->reptScan = true;
4,044✔
1509
    pInfo->winSup.prevTs = INT64_MIN;
4,044✔
1510
  }
1511

1512
  SWindowRowsSup* pRowSup = &pInfo->winSup;
12,374✔
1513
  pRowSup->numOfRows = 0;
12,374✔
1514
  pRowSup->startRowIndex = 0;
12,374✔
1515

1516
  // In case of ascending or descending order scan data, only one time window needs to be kepted for each table.
1517
  TSKEY* tsList = (TSKEY*)pColInfoData->pData;
12,374✔
1518
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
253,998✔
1519
    if (gid != pRowSup->groupId || pInfo->winSup.prevTs == INT64_MIN) {
241,624✔
1520
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
12,364✔
1521
      doKeepTuple(pRowSup, tsList[j], gid);
12,364✔
1522
    } else if (((tsList[j] - pRowSup->prevTs >= 0) && (tsList[j] - pRowSup->prevTs <= gap)) ||
229,260✔
1523
               ((pRowSup->prevTs - tsList[j] >= 0) && (pRowSup->prevTs - tsList[j] <= gap))) {
43,780!
1524
      // The gap is less than the threshold, so it belongs to current session window that has been opened already.
1525
      doKeepTuple(pRowSup, tsList[j], gid);
185,500✔
1526
    } else {  // start a new session window
1527
      // start a new session window
1528
      if (pRowSup->numOfRows > 0) {  // handled data that belongs to the previous session window
43,760✔
1529
        SResultRow* pResult = NULL;
43,751✔
1530

1531
        // keep the time window for the closed time window.
1532
        STimeWindow window = pRowSup->win;
43,751✔
1533

1534
        int32_t ret =
1535
            setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
43,751✔
1536
                                   numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1537
        if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
43,751!
1538
          T_LONG_JMP(pTaskInfo->env, ret);
×
1539
        }
1540

1541
        // pInfo->numOfRows data belong to the current session window
1542
        updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
43,751✔
1543
        ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
43,751✔
1544
                                              pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
43,751✔
1545
        if (ret != TSDB_CODE_SUCCESS) {
43,751!
1546
          T_LONG_JMP(pTaskInfo->env, ret);
×
1547
        }
1548
      }
1549

1550
      // here we start a new session window
1551
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
43,760✔
1552
      doKeepTuple(pRowSup, tsList[j], gid);
43,760✔
1553
    }
1554
  }
1555

1556
  SResultRow* pResult = NULL;
12,374✔
1557
  pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
12,374✔
1558
  int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
12,374✔
1559
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1560
  if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
12,374!
1561
    T_LONG_JMP(pTaskInfo->env, ret);
×
1562
  }
1563

1564
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
12,374✔
1565
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
12,374✔
1566
                                        pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
12,374✔
1567
  if (ret != TSDB_CODE_SUCCESS) {
12,374!
1568
    T_LONG_JMP(pTaskInfo->env, ret);
×
1569
  }
1570
}
12,374✔
1571

1572
static int32_t doSessionWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
15,767✔
1573
  if (pOperator->status == OP_EXEC_DONE) {
15,767✔
1574
    (*ppRes) = NULL;
3,373✔
1575
    return TSDB_CODE_SUCCESS;
3,373✔
1576
  }
1577

1578
  int32_t                  code = TSDB_CODE_SUCCESS;
12,394✔
1579
  int32_t                  lino = 0;
12,394✔
1580
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
12,394✔
1581
  SSessionAggOperatorInfo* pInfo = pOperator->info;
12,394✔
1582
  SOptrBasicInfo*          pBInfo = &pInfo->binfo;
12,394✔
1583
  SExprSupp*               pSup = &pOperator->exprSupp;
12,394✔
1584

1585
  if (pOperator->status == OP_RES_TO_RETURN) {
12,394✔
1586
    while (1) {
×
1587
      doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
8,320✔
1588
      code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
8,320✔
1589
      QUERY_CHECK_CODE(code, lino, _end);
8,320!
1590

1591
      bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
8,320✔
1592
      if (!hasRemain) {
8,320✔
1593
        setOperatorCompleted(pOperator);
1,427✔
1594
        break;
1,427✔
1595
      }
1596

1597
      if (pBInfo->pRes->info.rows > 0) {
6,893!
1598
        break;
6,893✔
1599
      }
1600
    }
1601
    pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
8,320✔
1602
    (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
8,320!
1603
    return code;
8,320✔
1604
  }
1605

1606
  int64_t st = taosGetTimestampUs();
4,074✔
1607
  int32_t order = pInfo->binfo.inputTsOrder;
4,074✔
1608

1609
  SOperatorInfo* downstream = pOperator->pDownstream[0];
4,074✔
1610

1611
  pInfo->cleanGroupResInfo = false;
4,074✔
1612
  while (1) {
12,374✔
1613
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
16,448✔
1614
    if (pBlock == NULL) {
16,448✔
1615
      break;
4,074✔
1616
    }
1617

1618
    pBInfo->pRes->info.scanFlag = pBlock->info.scanFlag;
12,374✔
1619
    if (pInfo->scalarSupp.pExprInfo != NULL) {
12,374✔
1620
      SExprSupp* pExprSup = &pInfo->scalarSupp;
3✔
1621
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
3✔
1622
      QUERY_CHECK_CODE(code, lino, _end);
3!
1623
    }
1624
    // the pDataBlock are always the same one, no need to call this again
1625
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
12,374✔
1626
    QUERY_CHECK_CODE(code, lino, _end);
12,374!
1627

1628
    code = blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
12,374✔
1629
    QUERY_CHECK_CODE(code, lino, _end);
12,374!
1630

1631
    doSessionWindowAggImpl(pOperator, pInfo, pBlock);
12,374✔
1632
  }
1633

1634
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
4,074✔
1635

1636
  // restore the value
1637
  pOperator->status = OP_RES_TO_RETURN;
4,074✔
1638

1639
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
4,074✔
1640
  QUERY_CHECK_CODE(code, lino, _end);
4,074!
1641
  pInfo->cleanGroupResInfo = true;
4,074✔
1642

1643
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
4,074✔
1644
  QUERY_CHECK_CODE(code, lino, _end);
4,074!
1645
  while (1) {
×
1646
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
4,074✔
1647
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
4,074✔
1648
    QUERY_CHECK_CODE(code, lino, _end);
4,074!
1649

1650
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
4,074✔
1651
    if (!hasRemain) {
4,074✔
1652
      setOperatorCompleted(pOperator);
2,647✔
1653
      break;
2,647✔
1654
    }
1655

1656
    if (pBInfo->pRes->info.rows > 0) {
1,427!
1657
      break;
1,427✔
1658
    }
1659
  }
1660
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
4,074✔
1661

1662
_end:
4,074✔
1663
  if (code != TSDB_CODE_SUCCESS) {
4,074!
1664
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1665
    pTaskInfo->code = code;
×
1666
    T_LONG_JMP(pTaskInfo->env, code);
×
1667
  }
1668
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
4,074✔
1669
  return code;
4,074✔
1670
}
1671

1672
// todo make this as an non-blocking operator
1673
int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode,
6,851✔
1674
                                      SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
1675
  QRY_PARAM_CHECK(pOptrInfo);
6,851!
1676

1677
  int32_t                   code = TSDB_CODE_SUCCESS;
6,851✔
1678
  int32_t                   lino = 0;
6,851✔
1679
  SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
6,851!
1680
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
6,851!
1681
  if (pInfo == NULL || pOperator == NULL) {
6,851!
1682
    code = terrno;
×
1683
    goto _error;
×
1684
  }
1685

1686
  pOperator->exprSupp.hasWindowOrGroup = true;
6,851✔
1687
  int32_t      tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
6,851✔
1688
  SColumnNode* pColNode = (SColumnNode*)(pStateNode->pStateKey);
6,851✔
1689

1690
  if (pStateNode->window.pExprs != NULL) {
6,851✔
1691
    int32_t    numOfScalarExpr = 0;
4,987✔
1692
    SExprInfo* pScalarExprInfo = NULL;
4,987✔
1693
    code = createExprInfo(pStateNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
4,987✔
1694
    QUERY_CHECK_CODE(code, lino, _error);
4,987!
1695

1696
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
4,987✔
1697
    if (code != TSDB_CODE_SUCCESS) {
4,987!
1698
      goto _error;
×
1699
    }
1700
  }
1701

1702
  pInfo->stateCol = extractColumnFromColumnNode(pColNode);
6,851✔
1703
  pInfo->stateKey.type = pInfo->stateCol.type;
6,851✔
1704
  pInfo->stateKey.bytes = pInfo->stateCol.bytes;
6,851✔
1705
  pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes);
6,851!
1706
  if (pInfo->stateKey.pData == NULL) {
6,851!
1707
    goto _error;
×
1708
  }
1709
  pInfo->binfo.inputTsOrder = pStateNode->window.node.inputTsOrder;
6,851✔
1710
  pInfo->binfo.outputTsOrder = pStateNode->window.node.outputTsOrder;
6,851✔
1711

1712
  code = filterInitFromNode((SNode*)pStateNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
6,851✔
1713
  if (code != TSDB_CODE_SUCCESS) {
6,851!
1714
    goto _error;
×
1715
  }
1716

1717
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
6,851✔
1718

1719
  int32_t    num = 0;
6,851✔
1720
  SExprInfo* pExprInfo = NULL;
6,851✔
1721
  code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &num);
6,851✔
1722
  QUERY_CHECK_CODE(code, lino, _error);
6,851!
1723

1724
  initResultSizeInfo(&pOperator->resultInfo, 4096);
6,851✔
1725

1726
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
6,851✔
1727
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
6,851✔
1728
  if (code != TSDB_CODE_SUCCESS) {
6,851!
1729
    goto _error;
×
1730
  }
1731

1732
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pStateNode->window.node.pOutputDataBlockDesc);
6,851✔
1733
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
6,851!
1734
  initBasicInfo(&pInfo->binfo, pResBlock);
6,851✔
1735
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
6,851✔
1736

1737
  pInfo->twAggSup =
6,851✔
1738
      (STimeWindowAggSupp){.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};
6,851✔
1739

1740
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
6,851✔
1741
  QUERY_CHECK_CODE(code, lino, _error);
6,851!
1742

1743
  pInfo->tsSlotId = tsSlotId;
6,851✔
1744
  pInfo->pOperator = pOperator;
6,851✔
1745
  pInfo->cleanGroupResInfo = false;
6,851✔
1746
  setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
6,851✔
1747
                  pTaskInfo);
1748
  pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAggNext, NULL, destroyStateWindowOperatorInfo,
6,851✔
1749
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1750

1751
  code = appendDownstream(pOperator, &downstream, 1);
6,851✔
1752
  if (code != TSDB_CODE_SUCCESS) {
6,851!
1753
    goto _error;
×
1754
  }
1755

1756
  *pOptrInfo = pOperator;
6,851✔
1757
  return TSDB_CODE_SUCCESS;
6,851✔
1758

1759
_error:
×
1760
  if (pInfo != NULL) {
×
1761
    destroyStateWindowOperatorInfo(pInfo);
×
1762
  }
1763

1764
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1765
  pTaskInfo->code = code;
×
1766
  return code;
×
1767
}
1768

1769
void destroySWindowOperatorInfo(void* param) {
4,074✔
1770
  SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param;
4,074✔
1771
  if (pInfo == NULL) {
4,074!
1772
    return;
×
1773
  }
1774

1775
  cleanupBasicInfo(&pInfo->binfo);
4,074✔
1776
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
4,074✔
1777
  if (pInfo->pOperator) {
4,074!
1778
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
4,074✔
1779
                      pInfo->cleanGroupResInfo);
4,074✔
1780
    pInfo->pOperator = NULL;
4,074✔
1781
  }
1782

1783
  cleanupAggSup(&pInfo->aggSup);
4,074✔
1784
  cleanupExprSupp(&pInfo->scalarSupp);
4,074✔
1785

1786
  cleanupGroupResInfo(&pInfo->groupResInfo);
4,074✔
1787
  taosMemoryFreeClear(param);
4,074!
1788
}
1789

1790
int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
4,074✔
1791
                                     SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
1792
  QRY_PARAM_CHECK(pOptrInfo);
4,074!
1793

1794
  int32_t                  code = TSDB_CODE_SUCCESS;
4,074✔
1795
  int32_t                  lino = 0;
4,074✔
1796
  SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
4,074!
1797
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
4,074!
1798
  if (pInfo == NULL || pOperator == NULL) {
4,074!
1799
    code = terrno;
×
1800
    goto _error;
×
1801
  }
1802

1803
  pOperator->exprSupp.hasWindowOrGroup = true;
4,074✔
1804

1805
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
4,074✔
1806
  initResultSizeInfo(&pOperator->resultInfo, 4096);
4,074✔
1807

1808
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc);
4,074✔
1809
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
4,074!
1810
  initBasicInfo(&pInfo->binfo, pResBlock);
4,074✔
1811

1812
  int32_t      numOfCols = 0;
4,074✔
1813
  SExprInfo*   pExprInfo = NULL;
4,074✔
1814
  code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
4,074✔
1815
  QUERY_CHECK_CODE(code, lino, _error);
4,074!
1816

1817
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
4,074✔
1818
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
4,074✔
1819
  QUERY_CHECK_CODE(code, lino, _error);
4,074!
1820

1821
  pInfo->twAggSup.waterMark = pSessionNode->window.watermark;
4,074✔
1822
  pInfo->twAggSup.calTrigger = pSessionNode->window.triggerType;
4,074✔
1823
  pInfo->gap = pSessionNode->gap;
4,074✔
1824

1825
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
4,074✔
1826
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
4,074✔
1827
  QUERY_CHECK_CODE(code, lino, _error);
4,074!
1828

1829
  pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
4,074✔
1830
  pInfo->binfo.pRes = pResBlock;
4,074✔
1831
  pInfo->winSup.prevTs = INT64_MIN;
4,074✔
1832
  pInfo->reptScan = false;
4,074✔
1833
  pInfo->binfo.inputTsOrder = pSessionNode->window.node.inputTsOrder;
4,074✔
1834
  pInfo->binfo.outputTsOrder = pSessionNode->window.node.outputTsOrder;
4,074✔
1835

1836
  if (pSessionNode->window.pExprs != NULL) {
4,074✔
1837
    int32_t    numOfScalar = 0;
1✔
1838
    SExprInfo* pScalarExprInfo = NULL;
1✔
1839
    code = createExprInfo(pSessionNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
1✔
1840
    QUERY_CHECK_CODE(code, lino, _error);
1!
1841

1842
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
1✔
1843
    QUERY_CHECK_CODE(code, lino, _error);
1!
1844
  }
1845

1846
  code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
4,074✔
1847
  QUERY_CHECK_CODE(code, lino, _error);
4,074!
1848

1849
  pInfo->pOperator = pOperator;
4,074✔
1850
  pInfo->cleanGroupResInfo = false;
4,074✔
1851
  setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED,
4,074✔
1852
                  pInfo, pTaskInfo);
1853
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSessionWindowAggNext, NULL, destroySWindowOperatorInfo,
4,074✔
1854
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1855
  pOperator->pTaskInfo = pTaskInfo;
4,074✔
1856
  code = appendDownstream(pOperator, &downstream, 1);
4,074✔
1857
  QUERY_CHECK_CODE(code, lino, _error);
4,074!
1858

1859
  *pOptrInfo = pOperator;
4,074✔
1860
  return TSDB_CODE_SUCCESS;
4,074✔
1861

1862
_error:
×
1863
  if (pInfo != NULL) destroySWindowOperatorInfo(pInfo);
×
1864
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1865
  pTaskInfo->code = code;
×
1866
  return code;
×
1867
}
1868

1869
void destroyMAIOperatorInfo(void* param) {
8,328✔
1870
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = (SMergeAlignedIntervalAggOperatorInfo*)param;
8,328✔
1871
  destroyIntervalOperatorInfo(miaInfo->intervalAggOperatorInfo);
8,328✔
1872
  taosMemoryFreeClear(param);
8,328!
1873
}
8,328✔
1874

1875
static SResultRow* doSetSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, SAggSupporter* pSup) {
5,751✔
1876
  SResultRow* pResult = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
5,751✔
1877
  if (NULL == pResult) {
5,751!
1878
    return pResult;
×
1879
  }
1880
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
5,751✔
1881
  return pResult;
5,751✔
1882
}
1883

1884
static int32_t setSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult,
12,223,423✔
1885
                                       SExprSupp* pExprSup, SAggSupporter* pAggSup) {
1886
  if (*pResult == NULL) {
12,223,423✔
1887
    *pResult = doSetSingleOutputTupleBuf(pResultRowInfo, pAggSup);
5,751✔
1888
    if (*pResult == NULL) {
5,751!
1889
      return terrno;
×
1890
    }
1891
  }
1892

1893
  // set time window for current result
1894
  (*pResult)->win = (*win);
12,223,423✔
1895
  return setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
12,223,423✔
1896
}
1897

1898
static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,
46,252✔
1899
                                          SSDataBlock* pBlock, SSDataBlock* pResultBlock) {
1900
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
46,252✔
1901
  SIntervalAggOperatorInfo*             iaInfo = miaInfo->intervalAggOperatorInfo;
46,252✔
1902

1903
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
46,252✔
1904
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
46,252✔
1905
  SInterval*     pInterval = &iaInfo->interval;
46,252✔
1906

1907
  int32_t  startPos = 0;
46,252✔
1908
  int64_t* tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
46,252✔
1909

1910
  TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
46,251✔
1911

1912
  // there is an result exists
1913
  if (miaInfo->curTs != INT64_MIN) {
46,252✔
1914
    if (ts != miaInfo->curTs) {
22,012✔
1915
      finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
18,683✔
1916
      resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
18,683✔
1917
      miaInfo->curTs = ts;
18,683✔
1918
    }
1919
  } else {
1920
    miaInfo->curTs = ts;
24,240✔
1921
  }
1922

1923
  STimeWindow win = {0};
46,252✔
1924
  win.skey = miaInfo->curTs;
46,252✔
1925
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
46,252✔
1926

1927
  int32_t ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
46,252✔
1928
  if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
46,251!
1929
    T_LONG_JMP(pTaskInfo->env, ret);
×
1930
  }
1931

1932
  int32_t currPos = startPos;
46,251✔
1933

1934
  STimeWindow currWin = win;
46,251✔
1935
  while (++currPos < pBlock->info.rows) {
29,844,021✔
1936
    if (tsCols[currPos] == miaInfo->curTs) {
29,799,261✔
1937
      continue;
17,626,868✔
1938
    }
1939

1940
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
12,172,393✔
1941
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
12,175,012✔
1942
                                          currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
12,175,012✔
1943
    if (ret != TSDB_CODE_SUCCESS) {
12,181,892!
1944
      T_LONG_JMP(pTaskInfo->env, ret);
×
1945
    }
1946

1947
    finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
12,181,892✔
1948
    resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
12,152,226✔
1949
    miaInfo->curTs = tsCols[currPos];
12,157,113✔
1950

1951
    currWin.skey = miaInfo->curTs;
12,157,113✔
1952
    currWin.ekey = taosTimeAdd(currWin.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
12,157,113✔
1953

1954
    startPos = currPos;
12,176,299✔
1955
    ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
12,176,299✔
1956
    if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
12,170,563!
1957
      T_LONG_JMP(pTaskInfo->env, ret);
×
1958
    }
1959

1960
    miaInfo->curTs = currWin.skey;
12,170,902✔
1961
  }
1962

1963
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
44,760✔
1964
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
46,252✔
1965
                                        currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
46,252✔
1966
  if (ret != TSDB_CODE_SUCCESS) {
46,252!
1967
    T_LONG_JMP(pTaskInfo->env, ret);
×
1968
  }
1969
}
46,252✔
1970

1971
static void cleanupAfterGroupResultGen(SMergeAlignedIntervalAggOperatorInfo* pMiaInfo, SSDataBlock* pRes) {
24,105✔
1972
  pRes->info.id.groupId = pMiaInfo->groupId;
24,105✔
1973
  pMiaInfo->curTs = INT64_MIN;
24,105✔
1974
  pMiaInfo->groupId = 0;
24,105✔
1975
}
24,105✔
1976

1977
static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
39,382✔
1978
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
39,382✔
1979
  int32_t                               code = TSDB_CODE_SUCCESS;
39,382✔
1980
  int32_t                               lino = 0;
39,382✔
1981
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
39,382✔
1982
  SIntervalAggOperatorInfo*             pIaInfo = pMiaInfo->intervalAggOperatorInfo;
39,382✔
1983

1984
  SExprSupp*      pSup = &pOperator->exprSupp;
39,382✔
1985
  SSDataBlock*    pRes = pIaInfo->binfo.pRes;
39,382✔
1986
  SResultRowInfo* pResultRowInfo = &pIaInfo->binfo.resultRowInfo;
39,382✔
1987
  SOperatorInfo*  downstream = pOperator->pDownstream[0];
39,382✔
1988

1989
  while (1) {
33,552✔
1990
    SSDataBlock* pBlock = NULL;
72,934✔
1991
    if (pMiaInfo->prefetchedBlock == NULL) {
72,934✔
1992
      pBlock = getNextBlockFromDownstream(pOperator, 0);
54,445✔
1993
    } else {
1994
      pBlock = pMiaInfo->prefetchedBlock;
18,489✔
1995
      pMiaInfo->prefetchedBlock = NULL;
18,489✔
1996

1997
      pMiaInfo->groupId = pBlock->info.id.groupId;
18,489✔
1998
    }
1999

2000
    // no data exists, all query processing is done
2001
    if (pBlock == NULL) {
72,933✔
2002
      // close last unclosed time window
2003
      if (pMiaInfo->curTs != INT64_MIN) {
8,193✔
2004
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
5,616✔
2005
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
5,616✔
2006
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
5,616✔
2007
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
5,616✔
2008
        QUERY_CHECK_CODE(code, lino, _end);
5,616!
2009
      }
2010

2011
      setOperatorCompleted(pOperator);
8,193✔
2012
      break;
8,193✔
2013
    }
2014

2015
    if (pMiaInfo->groupId == 0) {
64,740✔
2016
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
20,237✔
2017
        pMiaInfo->groupId = pBlock->info.id.groupId;
1,836✔
2018
        pRes->info.id.groupId = pMiaInfo->groupId;
1,836✔
2019
      }
2020
    } else {
2021
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
44,503✔
2022
        // if there are unclosed time window, close it firstly.
2023
        if (pMiaInfo->curTs == INT64_MIN) {
18,489!
2024
          pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2025
          T_LONG_JMP(pTaskInfo->env, terrno);
×
2026
        }
2027
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
18,489✔
2028
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
18,489✔
2029

2030
        pMiaInfo->prefetchedBlock = pBlock;
18,489✔
2031
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
18,489✔
2032
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
18,489✔
2033
        QUERY_CHECK_CODE(code, lino, _end);
18,489!
2034
        if (pRes->info.rows == 0) {
18,489✔
2035
          // After filtering for last group, the result is empty, so we need to continue to process next group
2036
          continue;
34✔
2037
        } else {
2038
          break;
18,455✔
2039
        }
2040
      } else {
2041
        // continue
2042
        pRes->info.id.groupId = pMiaInfo->groupId;
26,014✔
2043
      }
2044
    }
2045

2046
    pRes->info.scanFlag = pBlock->info.scanFlag;
46,251✔
2047
    code = setInputDataBlock(pSup, pBlock, pIaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
46,251✔
2048
    QUERY_CHECK_CODE(code, lino, _end);
46,252!
2049

2050
    doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes);
46,252✔
2051

2052
    code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
46,252✔
2053
    QUERY_CHECK_CODE(code, lino, _end);
46,252!
2054

2055
    if (pRes->info.rows >= pOperator->resultInfo.capacity) {
46,252✔
2056
      break;
12,734✔
2057
    }
2058
  }
2059

2060
_end:
39,382✔
2061
  if (code != TSDB_CODE_SUCCESS) {
39,382!
2062
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2063
    pTaskInfo->code = code;
×
2064
    T_LONG_JMP(pTaskInfo->env, code);
×
2065
  }
2066
}
39,382✔
2067

2068
static int32_t mergeAlignedIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
42,185✔
2069
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
42,185✔
2070
  int32_t                               code = TSDB_CODE_SUCCESS;
42,185✔
2071
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
42,185✔
2072
  SIntervalAggOperatorInfo*             iaInfo = pMiaInfo->intervalAggOperatorInfo;
42,185✔
2073
  if (pOperator->status == OP_EXEC_DONE) {
42,185✔
2074
    (*ppRes) = NULL;
7,194✔
2075
    return code;
7,194✔
2076
  }
2077

2078
  SSDataBlock* pRes = iaInfo->binfo.pRes;
34,991✔
2079
  blockDataCleanup(pRes);
34,991✔
2080

2081
  if (iaInfo->binfo.mergeResultBlock) {
34,991✔
2082
    while (1) {
2083
      if (pOperator->status == OP_EXEC_DONE) {
29,173✔
2084
        break;
2,485✔
2085
      }
2086

2087
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
26,688✔
2088
        break;
9,906✔
2089
      }
2090

2091
      doMergeAlignedIntervalAgg(pOperator);
16,782✔
2092
    }
2093
  } else {
2094
    doMergeAlignedIntervalAgg(pOperator);
22,600✔
2095
  }
2096

2097
  size_t rows = pRes->info.rows;
34,991✔
2098
  pOperator->resultInfo.totalRows += rows;
34,991✔
2099
  (*ppRes) = (rows == 0) ? NULL : pRes;
34,991✔
2100
  return code;
34,991✔
2101
}
2102

2103
int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
8,328✔
2104
                                                      SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2105
  QRY_PARAM_CHECK(pOptrInfo);
8,328!
2106

2107
  int32_t                               code = TSDB_CODE_SUCCESS;
8,328✔
2108
  int32_t                               lino = 0;
8,328✔
2109
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedIntervalAggOperatorInfo));
8,328!
2110
  SOperatorInfo*                        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
8,328!
2111
  if (miaInfo == NULL || pOperator == NULL) {
8,328!
2112
    code = terrno;
×
2113
    goto _error;
×
2114
  }
2115

2116
  miaInfo->intervalAggOperatorInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
8,328!
2117
  if (miaInfo->intervalAggOperatorInfo == NULL) {
8,328!
2118
    code = terrno;
×
2119
    goto _error;
×
2120
  }
2121

2122
  SInterval interval = {.interval = pNode->interval,
8,328✔
2123
                        .sliding = pNode->sliding,
8,328✔
2124
                        .intervalUnit = pNode->intervalUnit,
8,328✔
2125
                        .slidingUnit = pNode->slidingUnit,
8,328✔
2126
                        .offset = pNode->offset,
8,328✔
2127
                        .precision = ((SColumnNode*)pNode->window.pTspk)->node.resType.precision,
8,328✔
2128
                        .timeRange = pNode->timeRange};
2129
  calcIntervalAutoOffset(&interval);
8,328✔
2130

2131
  SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
8,328✔
2132
  SExprSupp*                pSup = &pOperator->exprSupp;
8,328✔
2133
  pSup->hasWindowOrGroup = true;
8,328✔
2134

2135
  code = filterInitFromNode((SNode*)pNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
8,328✔
2136
  QUERY_CHECK_CODE(code, lino, _error);
8,328!
2137

2138
  miaInfo->curTs = INT64_MIN;
8,328✔
2139
  iaInfo->win = pTaskInfo->window;
8,328✔
2140
  iaInfo->binfo.inputTsOrder = pNode->window.node.inputTsOrder;
8,328✔
2141
  iaInfo->binfo.outputTsOrder = pNode->window.node.outputTsOrder;
8,328✔
2142
  iaInfo->interval = interval;
8,328✔
2143
  iaInfo->primaryTsIndex = ((SColumnNode*)pNode->window.pTspk)->slotId;
8,328✔
2144
  iaInfo->binfo.mergeResultBlock = pNode->window.mergeDataBlock;
8,328✔
2145

2146
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
8,328✔
2147
  initResultSizeInfo(&pOperator->resultInfo, 512);
8,328✔
2148

2149
  int32_t    num = 0;
8,328✔
2150
  SExprInfo* pExprInfo = NULL;
8,328✔
2151
  code = createExprInfo(pNode->window.pFuncs, NULL, &pExprInfo, &num);
8,328✔
2152
  QUERY_CHECK_CODE(code, lino, _error);
8,328!
2153

2154
  code = initAggSup(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
8,328✔
2155
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
8,328✔
2156
  QUERY_CHECK_CODE(code, lino, _error);
8,328!
2157

2158
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pNode->window.node.pOutputDataBlockDesc);
8,328✔
2159
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
8,328!
2160
  initBasicInfo(&iaInfo->binfo, pResBlock);
8,328✔
2161
  code = initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
8,328✔
2162
  QUERY_CHECK_CODE(code, lino, _error);
8,328!
2163

2164
  iaInfo->timeWindowInterpo = false;
8,328✔
2165
  code = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo, &iaInfo->timeWindowInterpo);
8,328✔
2166
  QUERY_CHECK_CODE(code, lino, _error);
8,328!
2167
  if (iaInfo->timeWindowInterpo) {
8,328!
2168
    iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
2169
  }
2170

2171
  initResultRowInfo(&iaInfo->binfo.resultRowInfo);
8,328✔
2172
  code = blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity);
8,328✔
2173
  QUERY_CHECK_CODE(code, lino, _error);
8,328!
2174
  iaInfo->pOperator = pOperator;
8,328✔
2175
  setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL,
8,328✔
2176
                  false, OP_NOT_OPENED, miaInfo, pTaskInfo);
2177

2178
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAggNext, NULL, destroyMAIOperatorInfo,
8,328✔
2179
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2180

2181
  code = appendDownstream(pOperator, &downstream, 1);
8,328✔
2182
  QUERY_CHECK_CODE(code, lino, _error);
8,328!
2183

2184
  *pOptrInfo = pOperator;
8,328✔
2185
  return TSDB_CODE_SUCCESS;
8,328✔
2186

2187
_error:
×
2188
  if (miaInfo != NULL) destroyMAIOperatorInfo(miaInfo);
×
2189
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2190
  pTaskInfo->code = code;
×
2191
  return code;
×
2192
}
2193

2194
//=====================================================================================================================
2195
// merge interval operator
2196
typedef struct SMergeIntervalAggOperatorInfo {
2197
  SIntervalAggOperatorInfo intervalAggOperatorInfo;
2198
  SList*                   groupIntervals;
2199
  SListIter                groupIntervalsIter;
2200
  bool                     hasGroupId;
2201
  uint64_t                 groupId;
2202
  SSDataBlock*             prefetchedBlock;
2203
  bool                     inputBlocksFinished;
2204
} SMergeIntervalAggOperatorInfo;
2205

2206
typedef struct SGroupTimeWindow {
2207
  uint64_t    groupId;
2208
  STimeWindow window;
2209
} SGroupTimeWindow;
2210

2211
void destroyMergeIntervalOperatorInfo(void* param) {
×
2212
  SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
×
2213
  miaInfo->groupIntervals = tdListFree(miaInfo->groupIntervals);
×
2214
  destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo);
×
2215

2216
  taosMemoryFreeClear(param);
×
2217
}
×
2218

2219
static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock,
×
2220
                                        STimeWindow* newWin) {
2221
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
2222
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2223
  bool                           ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
2224

2225
  SGroupTimeWindow groupTimeWindow = {.groupId = tableGroupId, .window = *newWin};
×
2226
  int32_t          code = tdListAppend(miaInfo->groupIntervals, &groupTimeWindow);
×
2227
  if (code != TSDB_CODE_SUCCESS) {
×
2228
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2229
    return code;
×
2230
  }
2231

2232
  SListIter iter = {0};
×
2233
  tdListInitIter(miaInfo->groupIntervals, &iter, TD_LIST_FORWARD);
×
2234
  SListNode* listNode = NULL;
×
2235
  while ((listNode = tdListNext(&iter)) != NULL) {
×
2236
    SGroupTimeWindow* prevGrpWin = (SGroupTimeWindow*)listNode->data;
×
2237
    if (prevGrpWin->groupId != tableGroupId) {
×
2238
      continue;
×
2239
    }
2240

2241
    STimeWindow* prevWin = &prevGrpWin->window;
×
2242
    if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) {
×
2243
      SListNode* tmp = tdListPopNode(miaInfo->groupIntervals, listNode);
×
2244
      taosMemoryFreeClear(tmp);
×
2245
    }
2246
  }
2247

2248
  return TSDB_CODE_SUCCESS;
×
2249
}
2250

2251
static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
×
2252
                                   int32_t scanFlag, SSDataBlock* pResultBlock) {
2253
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
2254
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2255

2256
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
×
2257
  SExprSupp*     pExprSup = &pOperatorInfo->exprSupp;
×
2258

2259
  int32_t     startPos = 0;
×
2260
  int32_t     numOfOutput = pExprSup->numOfExprs;
×
2261
  int64_t*    tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
×
2262
  uint64_t    tableGroupId = pBlock->info.id.groupId;
×
2263
  bool        ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
2264
  TSKEY       blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
×
2265
  SResultRow* pResult = NULL;
×
2266

2267
  STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
×
2268
                                        iaInfo->binfo.inputTsOrder);
2269

2270
  int32_t ret =
2271
      setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
2272
                             numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2273
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
×
2274
    T_LONG_JMP(pTaskInfo->env, ret);
×
2275
  }
2276

2277
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
×
2278
  int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
2279
                                                 iaInfo->binfo.inputTsOrder);
2280
  if(forwardRows <= 0) {
×
2281
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2282
  }
2283

2284
  // prev time window not interpolation yet.
2285
  if (iaInfo->timeWindowInterpo) {
×
2286
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
×
2287
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
×
2288

2289
    // restore current time window
2290
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
2291
                                 numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2292
    if (ret != TSDB_CODE_SUCCESS) {
×
2293
      T_LONG_JMP(pTaskInfo->env, ret);
×
2294
    }
2295

2296
    // window start key interpolation
2297
    ret = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &win, startPos, forwardRows, pExprSup);
×
2298
    if (ret != TSDB_CODE_SUCCESS) {
×
2299
      T_LONG_JMP(pTaskInfo->env, ret);
×
2300
    }
2301
  }
2302

2303
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, 1);
×
2304
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,
×
2305
                                  pBlock->info.rows, numOfOutput);
×
2306
  if (ret != TSDB_CODE_SUCCESS) {
×
2307
    T_LONG_JMP(pTaskInfo->env, ret);
×
2308
  }
2309
  doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
2310

2311
  // output previous interval results after this interval (&win) is closed
2312
  int32_t code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &win);
×
2313
  if (code != TSDB_CODE_SUCCESS) {
×
2314
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2315
    T_LONG_JMP(pTaskInfo->env, code);
×
2316
  }
2317

2318
  STimeWindow nextWin = win;
×
2319
  while (1) {
×
2320
    int32_t prevEndPos = forwardRows - 1 + startPos;
×
2321
    startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos,
×
2322
                                      iaInfo->binfo.inputTsOrder);
2323
    if (startPos < 0) {
×
2324
      break;
×
2325
    }
2326

2327
    // null data, failed to allocate more memory buffer
2328
    code =
2329
        setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
×
2330
                               pExprSup->pCtx, numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2331
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
×
2332
      T_LONG_JMP(pTaskInfo->env, code);
×
2333
    }
2334

2335
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
×
2336
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
2337
                                           iaInfo->binfo.inputTsOrder);
2338

2339
    // window start(end) key interpolation
2340
    code = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup);
×
2341
    if (code != TSDB_CODE_SUCCESS) {
×
2342
      T_LONG_JMP(pTaskInfo->env, code);
×
2343
    }
2344

2345
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, 1);
×
2346
    code = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
×
2347
                                           forwardRows, pBlock->info.rows, numOfOutput);
×
2348
    if (code != TSDB_CODE_SUCCESS) {
×
2349
      T_LONG_JMP(pTaskInfo->env, code);
×
2350
    }
2351
    doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
2352

2353
    // output previous interval results after this interval (&nextWin) is closed
2354
    code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &nextWin);
×
2355
    if (code != TSDB_CODE_SUCCESS) {
×
2356
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2357
      T_LONG_JMP(pTaskInfo->env, code);
×
2358
    }
2359
  }
2360

2361
  if (iaInfo->timeWindowInterpo) {
×
2362
    saveDataBlockLastRow(iaInfo->pPrevValues, pBlock, iaInfo->pInterpCols);
×
2363
  }
2364
}
×
2365

2366
static int32_t doMergeIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
2367
  int32_t        code = TSDB_CODE_SUCCESS;
×
2368
  int32_t        lino = 0;
×
2369
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
×
2370

2371
  SMergeIntervalAggOperatorInfo* miaInfo = pOperator->info;
×
2372
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2373
  SExprSupp*                     pExpSupp = &pOperator->exprSupp;
×
2374

2375
  if (pOperator->status == OP_EXEC_DONE) {
×
2376
    (*ppRes) = NULL;
×
2377
    return code;
×
2378
  }
2379

2380
  SSDataBlock* pRes = iaInfo->binfo.pRes;
×
2381
  blockDataCleanup(pRes);
×
2382
  code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
×
2383
  QUERY_CHECK_CODE(code, lino, _end);
×
2384

2385
  if (!miaInfo->inputBlocksFinished) {
×
2386
    SOperatorInfo* downstream = pOperator->pDownstream[0];
×
2387
    while (1) {
×
2388
      SSDataBlock* pBlock = NULL;
×
2389
      if (miaInfo->prefetchedBlock == NULL) {
×
2390
        pBlock = getNextBlockFromDownstream(pOperator, 0);
×
2391
      } else {
2392
        pBlock = miaInfo->prefetchedBlock;
×
2393
        miaInfo->groupId = pBlock->info.id.groupId;
×
2394
        miaInfo->prefetchedBlock = NULL;
×
2395
      }
2396

2397
      if (pBlock == NULL) {
×
2398
        tdListInitIter(miaInfo->groupIntervals, &miaInfo->groupIntervalsIter, TD_LIST_FORWARD);
×
2399
        miaInfo->inputBlocksFinished = true;
×
2400
        break;
×
2401
      }
2402

2403
      if (!miaInfo->hasGroupId) {
×
2404
        miaInfo->hasGroupId = true;
×
2405
        miaInfo->groupId = pBlock->info.id.groupId;
×
2406
      } else if (miaInfo->groupId != pBlock->info.id.groupId) {
×
2407
        miaInfo->prefetchedBlock = pBlock;
×
2408
        break;
×
2409
      }
2410

2411
      pRes->info.scanFlag = pBlock->info.scanFlag;
×
2412
      code = setInputDataBlock(pExpSupp, pBlock, iaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
×
2413
      QUERY_CHECK_CODE(code, lino, _end);
×
2414

2415
      doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, pBlock->info.scanFlag, pRes);
×
2416

2417
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
×
2418
        break;
×
2419
      }
2420
    }
2421

2422
    pRes->info.id.groupId = miaInfo->groupId;
×
2423
  }
2424

2425
  if (miaInfo->inputBlocksFinished) {
×
2426
    SListNode* listNode = tdListNext(&miaInfo->groupIntervalsIter);
×
2427

2428
    if (listNode != NULL) {
×
2429
      SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data);
×
2430
      pRes->info.id.groupId = grpWin->groupId;
×
2431
    }
2432
  }
2433

2434
  if (pRes->info.rows == 0) {
×
2435
    setOperatorCompleted(pOperator);
×
2436
  }
2437

2438
  size_t rows = pRes->info.rows;
×
2439
  pOperator->resultInfo.totalRows += rows;
×
2440

2441
_end:
×
2442
  if (code != TSDB_CODE_SUCCESS) {
×
2443
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2444
    pTaskInfo->code = code;
×
2445
    T_LONG_JMP(pTaskInfo->env, code);
×
2446
  }
2447
  (*ppRes) = (rows == 0) ? NULL : pRes;
×
2448
  return code;
×
2449
}
2450

2451
int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
×
2452
                                               SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2453
  QRY_PARAM_CHECK(pOptrInfo);
×
2454

2455
  int32_t                        code = TSDB_CODE_SUCCESS;
×
2456
  int32_t                        lino = 0;
×
2457
  SMergeIntervalAggOperatorInfo* pMergeIntervalInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo));
×
2458
  SOperatorInfo*                 pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
2459
  if (pMergeIntervalInfo == NULL || pOperator == NULL) {
×
2460
    code = terrno;
×
2461
    goto _error;
×
2462
  }
2463

2464
  SInterval interval = {.interval = pIntervalPhyNode->interval,
×
2465
                        .sliding = pIntervalPhyNode->sliding,
×
2466
                        .intervalUnit = pIntervalPhyNode->intervalUnit,
×
2467
                        .slidingUnit = pIntervalPhyNode->slidingUnit,
×
2468
                        .offset = pIntervalPhyNode->offset,
×
2469
                        .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision,
×
2470
                        .timeRange = pIntervalPhyNode->timeRange};
2471
  calcIntervalAutoOffset(&interval);
×
2472

2473
  pMergeIntervalInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow));
×
2474

2475
  SIntervalAggOperatorInfo* pIntervalInfo = &pMergeIntervalInfo->intervalAggOperatorInfo;
×
2476
  pIntervalInfo->win = pTaskInfo->window;
×
2477
  pIntervalInfo->binfo.inputTsOrder = pIntervalPhyNode->window.node.inputTsOrder;
×
2478
  pIntervalInfo->interval = interval;
×
2479
  pIntervalInfo->binfo.mergeResultBlock = pIntervalPhyNode->window.mergeDataBlock;
×
2480
  pIntervalInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
×
2481
  pIntervalInfo->binfo.outputTsOrder = pIntervalPhyNode->window.node.outputTsOrder;
×
2482

2483
  SExprSupp* pExprSupp = &pOperator->exprSupp;
×
2484
  pExprSupp->hasWindowOrGroup = true;
×
2485

2486
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
×
2487
  initResultSizeInfo(&pOperator->resultInfo, 4096);
×
2488

2489
  int32_t    num = 0;
×
2490
  SExprInfo* pExprInfo = NULL;
×
2491
  code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
×
2492
  QUERY_CHECK_CODE(code, lino, _error);
×
2493

2494
  code = initAggSup(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
×
2495
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
×
2496
  if (code != TSDB_CODE_SUCCESS) {
×
2497
    goto _error;
×
2498
  }
2499

2500
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pIntervalPhyNode->window.node.pOutputDataBlockDesc);
×
2501
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
×
2502
  initBasicInfo(&pIntervalInfo->binfo, pResBlock);
×
2503
  code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win);
×
2504
  QUERY_CHECK_CODE(code, lino, _error);
×
2505

2506
  pIntervalInfo->timeWindowInterpo = false;
×
2507
  code = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo, &pIntervalInfo->timeWindowInterpo);
×
2508
  QUERY_CHECK_CODE(code, lino, _error);
×
2509
  if (pIntervalInfo->timeWindowInterpo) {
×
2510
    pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
2511
    if (pIntervalInfo->binfo.resultRowInfo.openWindow == NULL) {
×
2512
      goto _error;
×
2513
    }
2514
  }
2515

2516
  pIntervalInfo->pOperator = pOperator;
×
2517
  initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo);
×
2518
  setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false,
×
2519
                  OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo);
2520
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAggNext, NULL, destroyMergeIntervalOperatorInfo,
×
2521
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2522

2523
  code = appendDownstream(pOperator, &downstream, 1);
×
2524
  if (code != TSDB_CODE_SUCCESS) {
×
2525
    goto _error;
×
2526
  }
2527

2528
  *pOptrInfo = pOperator;
×
2529
  return TSDB_CODE_SUCCESS;
×
2530
_error:
×
2531
  if (pMergeIntervalInfo != NULL) {
×
2532
    destroyMergeIntervalOperatorInfo(pMergeIntervalInfo);
×
2533
  }
2534
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2535
  pTaskInfo->code = code;
×
2536
  return code;
×
2537
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc