• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3558

17 Dec 2024 06:05AM UTC coverage: 59.778% (+1.6%) from 58.204%
#3558

push

travis-ci

web-flow
Merge pull request #29179 from taosdata/merge/mainto3.0

merge: form main to 3.0 branch

132787 of 287595 branches covered (46.17%)

Branch coverage included in aggregate %.

104 of 191 new or added lines in 5 files covered. (54.45%)

6085 existing lines in 168 files now uncovered.

209348 of 284746 relevant lines covered (73.52%)

8164844.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.75
/source/libs/executor/src/timewindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "executorInt.h"
16
#include "filter.h"
17
#include "function.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "querytask.h"
21
#include "tchecksum.h"
22
#include "tcommon.h"
23
#include "tcompare.h"
24
#include "tdatablock.h"
25
#include "tfill.h"
26
#include "tglobal.h"
27
#include "tlog.h"
28
#include "ttime.h"
29

30
typedef struct SSessionAggOperatorInfo {
31
  SOptrBasicInfo     binfo;
32
  SAggSupporter      aggSup;
33
  SExprSupp          scalarSupp;  // supporter for perform scalar function
34
  SGroupResInfo      groupResInfo;
35
  SWindowRowsSup     winSup;
36
  bool               reptScan;  // next round scan
37
  int64_t            gap;       // session window gap
38
  int32_t            tsSlotId;  // primary timestamp slot id
39
  STimeWindowAggSupp twAggSup;
40
  SOperatorInfo*     pOperator;
41
  bool               cleanGroupResInfo;
42
} SSessionAggOperatorInfo;
43

44
typedef struct SStateWindowOperatorInfo {
45
  SOptrBasicInfo     binfo;
46
  SAggSupporter      aggSup;
47
  SExprSupp          scalarSup;
48
  SGroupResInfo      groupResInfo;
49
  SWindowRowsSup     winSup;
50
  SColumn            stateCol;  // start row index
51
  bool               hasKey;
52
  SStateKeys         stateKey;
53
  int32_t            tsSlotId;  // primary timestamp column slot id
54
  STimeWindowAggSupp twAggSup;
55
  SOperatorInfo*     pOperator;
56
  bool               cleanGroupResInfo;
57
} SStateWindowOperatorInfo;
58

59
typedef enum SResultTsInterpType {
60
  RESULT_ROW_START_INTERP = 1,
61
  RESULT_ROW_END_INTERP = 2,
62
} SResultTsInterpType;
63

64
typedef struct SOpenWindowInfo {
65
  SResultRowPosition pos;
66
  uint64_t           groupId;
67
} SOpenWindowInfo;
68

69
static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo);
70

71
static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult,
72
                                              uint64_t groupId, SExecTaskInfo* pTaskInfo);
73
static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult);
74

75
static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, bool masterscan,
85,710,375✔
76
                                      SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
77
                                      int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
78
                                      SExecTaskInfo* pTaskInfo) {
79
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
85,710,375✔
80
                                                  masterscan, tableGroupId, pTaskInfo, true, pAggSup, true);
81

82
  if (pResultRow == NULL || pTaskInfo->code != 0) {
86,185,902!
83
    *pResult = NULL;
×
84
    qError("failed to set result output buffer, error:%s", tstrerror(pTaskInfo->code));
×
85
    return pTaskInfo->code;
×
86
  }
87

88
  // set time window for current result
89
  pResultRow->win = (*win);
86,261,513✔
90

91
  *pResult = pResultRow;
86,261,513✔
92
  return setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
86,261,513✔
93
}
94

95
void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) {
384,570✔
96
  pRowSup->win.ekey = ts;
384,570✔
97
  pRowSup->prevTs = ts;
384,570✔
98
  pRowSup->numOfRows += 1;
384,570✔
99
  pRowSup->groupId = groupId;
384,570✔
100
}
384,570✔
101

102
void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex,
171,078✔
103
                                     uint64_t groupId) {
104
  pRowSup->startRowIndex = rowIndex;
171,078✔
105
  pRowSup->numOfRows = 0;
171,078✔
106
  pRowSup->win.skey = tsList[rowIndex];
171,078✔
107
  pRowSup->groupId = groupId;
171,078✔
108
}
171,078✔
109

110
FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos,
×
111
                                            int32_t order, int64_t* pData) {
112
  int32_t forwardRows = 0;
95,050,697✔
113

114
  if (order == TSDB_ORDER_ASC) {
×
115
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
79,401,151✔
116
    if (end >= 0) {
80,994,576!
117
      forwardRows = end;
81,042,056✔
118

119
      while (pData[end + pos] == ekey) {
81,245,558!
120
        forwardRows += 1;
203,502✔
121
        ++pos;
203,502✔
122
      }
123
    }
124
  } else {
125
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
15,649,546✔
126
    if (end >= 0) {
16,102,820!
127
      forwardRows = end;
16,101,223✔
128

129
      while (pData[end + pos] == ekey) {
32,202,563!
130
        forwardRows += 1;
16,101,340✔
131
        ++pos;
16,101,340✔
132
      }
133
    }
134
    //    int32_t end = searchFn((char*)pData, pos + 1, ekey, order);
135
    //    if (end >= 0) {
136
    //      forwardRows = pos - end;
137
    //
138
    //      if (pData[end] == ekey) {
139
    //        forwardRows += 1;
140
    //      }
141
    //    }
142
  }
143

144
  return forwardRows;
97,097,396✔
145
}
146

147
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
102,279,726✔
148
  int32_t midPos = -1;
102,279,726✔
149
  int32_t numOfRows;
150

151
  if (num <= 0) {
102,279,726!
152
    return -1;
×
153
  }
154

155
  TSKEY*  keyList = (TSKEY*)pValue;
102,279,726✔
156
  int32_t firstPos = 0;
102,279,726✔
157
  int32_t lastPos = num - 1;
102,279,726✔
158

159
  if (order == TSDB_ORDER_DESC) {
102,279,726✔
160
    // find the first position which is smaller than the key
161
    while (1) {
162
      if (key >= keyList[firstPos]) return firstPos;
28,482,487✔
163
      if (key == keyList[lastPos]) return lastPos;
13,544,276✔
164

165
      if (key < keyList[lastPos]) {
13,130,667✔
166
        lastPos += 1;
19,562✔
167
        if (lastPos >= num) {
19,562!
168
          return -1;
×
169
        } else {
170
          return lastPos;
19,562✔
171
        }
172
      }
173

174
      numOfRows = lastPos - firstPos + 1;
13,111,105✔
175
      midPos = (numOfRows >> 1) + firstPos;
13,111,105✔
176

177
      if (key < keyList[midPos]) {
13,111,105✔
178
        firstPos = midPos + 1;
965,422✔
179
      } else if (key > keyList[midPos]) {
12,145,683✔
180
        lastPos = midPos - 1;
11,416,494✔
181
      } else {
182
        break;
729,189✔
183
      }
184
    }
185

186
  } else {
187
    // find the first position which is bigger than the key
188
    while (1) {
189
      if (key <= keyList[firstPos]) return firstPos;
787,003,860✔
190
      if (key == keyList[lastPos]) return lastPos;
765,798,356✔
191

192
      if (key > keyList[lastPos]) {
765,244,039✔
193
        lastPos = lastPos + 1;
66,527,159✔
194
        if (lastPos >= num)
66,527,159!
195
          return -1;
×
196
        else
197
          return lastPos;
66,527,159✔
198
      }
199

200
      numOfRows = lastPos - firstPos + 1;
698,716,880✔
201
      midPos = (numOfRows >> 1u) + firstPos;
698,716,880✔
202

203
      if (key < keyList[midPos]) {
698,716,880✔
204
        lastPos = midPos - 1;
614,863,059✔
205
      } else if (key > keyList[midPos]) {
83,853,821!
206
        firstPos = midPos + 1;
85,961,646✔
207
      } else {
UNCOV
208
        break;
×
209
      }
210
    }
211
  }
212

UNCOV
213
  return midPos;
×
214
}
215

216
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
96,988,156✔
217
                                 __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order) {
218
  int32_t num = -1;
96,988,156✔
219
  int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
96,988,156✔
220

221
  if (order == TSDB_ORDER_ASC) {
96,988,156✔
222
    if (ekey < pDataBlockInfo->window.ekey && pPrimaryColumn) {
81,027,304✔
223
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
78,957,680!
224
      if (item != NULL) {
80,994,576!
225
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
226
      }
227
    } else {
228
      num = pDataBlockInfo->rows - startPos;
2,069,624✔
229
      if (item != NULL) {
2,069,624!
230
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
231
      }
232
    }
233
  } else {  // desc
234
    if (ekey > pDataBlockInfo->window.skey && pPrimaryColumn) {
15,960,852!
235
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
16,093,017!
236
      if (item != NULL) {
16,102,820!
237
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
238
      }
239
    } else {
UNCOV
240
      num = pDataBlockInfo->rows - startPos;
×
UNCOV
241
      if (item != NULL) {
×
242
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
243
      }
244
    }
245
  }
246

247
  return num;
99,034,855✔
248
}
249

250
void doTimeWindowInterpolation(SArray* pPrevValues, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs,
22,961✔
251
                               int32_t curRowIndex, TSKEY windowKey, int32_t type, SExprSupp* pSup) {
252
  SqlFunctionCtx* pCtx = pSup->pCtx;
22,961✔
253

254
  int32_t index = 1;
22,961✔
255
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
105,267✔
256
    if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) {
82,306✔
257
      pCtx[k].start.key = INT64_MIN;
52,045✔
258
      continue;
52,045✔
259
    }
260

261
    SFunctParam*     pParam = &pCtx[k].param[0];
30,261✔
262
    SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, pParam->pCol->slotId);
30,261✔
263

264
    double v1 = 0, v2 = 0, v = 0;
30,261✔
265
    if (prevRowIndex == -1) {
30,261!
266
      SGroupKeys* p = taosArrayGet(pPrevValues, index);
×
267
      GET_TYPED_DATA(v1, double, pColInfo->info.type, p->pData);
×
268
    } else {
269
      GET_TYPED_DATA(v1, double, pColInfo->info.type, colDataGetData(pColInfo, prevRowIndex));
30,261!
270
    }
271

272
    GET_TYPED_DATA(v2, double, pColInfo->info.type, colDataGetData(pColInfo, curRowIndex));
30,261!
273

274
#if 0
275
    if (functionId == FUNCTION_INTERP) {
276
      if (type == RESULT_ROW_START_INTERP) {
277
        pCtx[k].start.key = prevTs;
278
        pCtx[k].start.val = v1;
279

280
        pCtx[k].end.key = curTs;
281
        pCtx[k].end.val = v2;
282

283
        if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_VARBINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR ||
284
            pColInfo->info.type == TSDB_DATA_TYPE_GEOMETRY) {
285
          if (prevRowIndex == -1) {
286
            //            pCtx[k].start.ptr = (char*)pRuntimeEnv->prevRow[index];
287
          } else {
288
            pCtx[k].start.ptr = (char*)pColInfo->pData + prevRowIndex * pColInfo->info.bytes;
289
          }
290

291
          pCtx[k].end.ptr = (char*)pColInfo->pData + curRowIndex * pColInfo->info.bytes;
292
        }
293
      }
294
    } else if (functionId == FUNCTION_TWA) {
295
#endif
296

297
    SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
30,261✔
298
    SPoint point2 = (SPoint){.key = curTs, .val = &v2};
30,261✔
299
    SPoint point = (SPoint){.key = windowKey, .val = &v};
30,261✔
300

301
    if (!fmIsElapsedFunc(pCtx[k].functionId)) {
30,261✔
302
      taosGetLinearInterpolationVal(&point, TSDB_DATA_TYPE_DOUBLE, &point1, &point2, TSDB_DATA_TYPE_DOUBLE);
26,315✔
303
    }
304

305
    if (type == RESULT_ROW_START_INTERP) {
30,261✔
306
      pCtx[k].start.key = point.key;
14,990✔
307
      pCtx[k].start.val = v;
14,990✔
308
    } else {
309
      pCtx[k].end.key = point.key;
15,271✔
310
      pCtx[k].end.val = v;
15,271✔
311
    }
312

313
    index += 1;
30,261✔
314
  }
315
#if 0
316
  }
317
#endif
318
}
22,961✔
319

320
static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t type) {
2,261✔
321
  if (type == RESULT_ROW_START_INTERP) {
2,261✔
322
    for (int32_t k = 0; k < numOfOutput; ++k) {
7,242✔
323
      pCtx[k].start.key = INT64_MIN;
5,971✔
324
    }
325
  } else {
326
    for (int32_t k = 0; k < numOfOutput; ++k) {
6,615✔
327
      pCtx[k].end.key = INT64_MIN;
5,625✔
328
    }
329
  }
330
}
2,261✔
331

332
static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, int32_t pos, SSDataBlock* pBlock,
12,599✔
333
                                              const TSKEY* tsCols, STimeWindow* win, SExprSupp* pSup) {
334
  bool ascQuery = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
12,599✔
335

336
  TSKEY curTs = tsCols[pos];
12,599✔
337

338
  SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
12,599✔
339
  TSKEY       lastTs = *(int64_t*)pTsKey->pData;
12,599✔
340

341
  // lastTs == INT64_MIN and pos == 0 means this is the first time window, interpolation is not needed.
342
  // start exactly from this point, no need to do interpolation
343
  TSKEY key = ascQuery ? win->skey : win->ekey;
12,599!
344
  if (key == curTs) {
12,599✔
345
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
873✔
346
    return true;
873✔
347
  }
348

349
  // it is the first time window, no need to do interpolation
350
  if (pTsKey->isNull && pos == 0) {
11,726!
351
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
386✔
352
  } else {
353
    TSKEY prevTs = ((pos == 0) ? lastTs : tsCols[pos - 1]);
11,340!
354
    doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, pos - 1, curTs, pos, key,
11,340✔
355
                              RESULT_ROW_START_INTERP, pSup);
356
  }
357

358
  return true;
11,726✔
359
}
360

361
static int32_t setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex,
12,611✔
362
                                            int32_t nextRowIndex, SArray* pDataBlock, const TSKEY* tsCols,
363
                                            TSKEY blockEkey, STimeWindow* win, bool* pRes) {
364
  int32_t code = TSDB_CODE_SUCCESS;
12,611✔
365
  int32_t lino = 0;
12,611✔
366
  int32_t order = pInfo->binfo.inputTsOrder;
12,611✔
367

368
  TSKEY actualEndKey = tsCols[endRowIndex];
12,611✔
369
  TSKEY key = (order == TSDB_ORDER_ASC) ? win->ekey : win->skey;
12,611!
370

371
  // not ended in current data block, do not invoke interpolation
372
  if ((key > blockEkey && (order == TSDB_ORDER_ASC)) || (key < blockEkey && (order == TSDB_ORDER_DESC))) {
12,611!
373
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
464✔
374
    (*pRes) = false;
464✔
375
    return code;
464✔
376
  }
377

378
  // there is actual end point of current time window, no interpolation needs
379
  if (key == actualEndKey) {
12,147✔
380
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
526✔
381
    (*pRes) = true;
526✔
382
    return code;
526✔
383
  }
384

385
  if (nextRowIndex < 0) {
11,621!
386
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
387
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
388
  }
389

390
  TSKEY nextKey = tsCols[nextRowIndex];
11,621✔
391
  doTimeWindowInterpolation(pInfo->pPrevValues, pDataBlock, actualEndKey, endRowIndex, nextKey, nextRowIndex, key,
11,621✔
392
                            RESULT_ROW_END_INTERP, pSup);
393
  (*pRes) = true;
11,621✔
394
  return code;
11,621✔
395
}
396

397
bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd, EStreamType blockType) {
107,459,050✔
398
  if (pInterval->interval != pInterval->sliding &&
107,459,050✔
399
      ((pWin->ekey < calStart || pWin->skey > calEnd) || (blockType == STREAM_PULL_DATA && pWin->skey < calStart))) {
7,587,908!
400
    return false;
75✔
401
  }
402

403
  return true;
107,458,975✔
404
}
405

406
bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo) {
107,257,686✔
407
  return inCalSlidingWindow(pInterval, pWin, pBlockInfo->calWin.skey, pBlockInfo->calWin.ekey, pBlockInfo->type);
107,257,686✔
408
}
409

410
int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
97,490,901✔
411
                               TSKEY* primaryKeys, int32_t prevPosition, int32_t order) {
412
  bool ascQuery = (order == TSDB_ORDER_ASC);
97,490,901✔
413

414
  int32_t precision = pInterval->precision;
97,490,901✔
415
  getNextTimeWindow(pInterval, pNext, order);
97,490,901✔
416

417
  // next time window is not in current block
418
  if ((pNext->skey > pDataBlockInfo->window.ekey && order == TSDB_ORDER_ASC) ||
97,440,950!
419
      (pNext->ekey < pDataBlockInfo->window.skey && order == TSDB_ORDER_DESC)) {
95,504,825✔
420
    return -1;
1,949,285✔
421
  }
422

423
  if (!inSlidingWindow(pInterval, pNext, pDataBlockInfo) && order == TSDB_ORDER_ASC) {
95,491,665!
424
    return -1;
30✔
425
  }
426

427
  TSKEY   skey = ascQuery ? pNext->skey : pNext->ekey;
95,444,476✔
428
  int32_t startPos = 0;
95,444,476✔
429

430
  // tumbling time window query, a special case of sliding time window query
431
  if (pInterval->sliding == pInterval->interval && prevPosition != -1) {
95,444,476✔
432
    startPos = prevPosition + 1;
87,922,748✔
433
  } else {
434
    if ((skey <= pDataBlockInfo->window.skey && ascQuery) || (skey >= pDataBlockInfo->window.ekey && !ascQuery)) {
7,521,728!
435
      startPos = 0;
208,824✔
436
    } else {
437
      startPos = binarySearchForKey((char*)primaryKeys, pDataBlockInfo->rows, skey, order);
7,312,904✔
438
    }
439
  }
440

441
  /* interp query with fill should not skip time window */
442
  //  if (pQueryAttr->pointInterpQuery && pQueryAttr->fillType != TSDB_FILL_NONE) {
443
  //    return startPos;
444
  //  }
445

446
  /*
447
   * This time window does not cover any data, try next time window,
448
   * this case may happen when the time window is too small
449
   */
450
  if (primaryKeys != NULL) {
95,712,494!
451
    if (ascQuery && primaryKeys[startPos] > pNext->ekey) {
138,402,274✔
452
      TSKEY next = primaryKeys[startPos];
42,556,560✔
453
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
42,556,560!
UNCOV
454
        pNext->skey = taosTimeTruncate(next, pInterval);
×
455
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
×
456
      } else {
457
        pNext->ekey += ((next - pNext->ekey + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
42,584,713✔
458
        pNext->skey = pNext->ekey - pInterval->interval + 1;
42,584,713✔
459
      }
460
    } else if ((!ascQuery) && primaryKeys[startPos] < pNext->skey) {
53,203,691✔
461
      TSKEY next = primaryKeys[startPos];
14,377,299✔
462
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
14,377,299!
463
        pNext->skey = taosTimeTruncate(next, pInterval);
×
464
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
×
465
      } else {
466
        pNext->skey -= ((pNext->skey - next + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
14,399,293✔
467
        pNext->ekey = pNext->skey + pInterval->interval - 1;
14,399,293✔
468
      }
469
    }
470
  }
471

472
  return startPos;
95,819,951✔
473
}
474

475
static bool isResultRowInterpolated(SResultRow* pResult, SResultTsInterpType type) {
37,833✔
476
  if (type == RESULT_ROW_START_INTERP) {
37,833✔
477
    return pResult->startInterp == true;
12,611✔
478
  } else {
479
    return pResult->endInterp == true;
25,222✔
480
  }
481
}
482

483
static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
24,746✔
484
  if (type == RESULT_ROW_START_INTERP) {
24,746✔
485
    pResult->startInterp = true;
12,599✔
486
  } else {
487
    pResult->endInterp = true;
12,147✔
488
  }
489
}
24,746✔
490

491
static int32_t doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataBlock* pBlock, SResultRow* pResult,
86,268,655✔
492
                                           STimeWindow* win, int32_t startPos, int32_t forwardRows, SExprSupp* pSup) {
493
  int32_t code = TSDB_CODE_SUCCESS;
86,268,655✔
494
  int32_t lino = 0;
86,268,655✔
495
  if (!pInfo->timeWindowInterpo) {
86,268,655!
496
    return code;
86,364,952✔
497
  }
498

UNCOV
499
  if (pBlock == NULL) {
×
500
    code = TSDB_CODE_INVALID_PARA;
×
501
    return code;
×
502
  }
503

UNCOV
504
  if (pBlock->pDataBlock == NULL) {
×
505
    return code;
×
506
  }
507

UNCOV
508
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
×
509

510
  TSKEY* tsCols = (TSKEY*)(pColInfo->pData);
12,611✔
511
  bool   done = isResultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
12,611✔
512
  if (!done) {  // it is not interpolated, now start to generated the interpolated value
12,611✔
513
    bool interp = setTimeWindowInterpolationStartTs(pInfo, startPos, pBlock, tsCols, win, pSup);
12,599✔
514
    if (interp) {
12,599!
515
      setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
12,599✔
516
    }
517
  } else {
518
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
12✔
519
  }
520

521
  // point interpolation does not require the end key time window interpolation.
522
  // interpolation query does not generate the time window end interpolation
523
  done = isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
12,611✔
524
  if (!done) {
12,611!
525
    int32_t endRowIndex = startPos + forwardRows - 1;
12,611✔
526
    int32_t nextRowIndex = endRowIndex + 1;
12,611✔
527

528
    // duplicated ts row does not involve in the interpolation of end value for current time window
529
    int32_t x = endRowIndex;
12,611✔
530
    while (x > 0) {
12,618✔
531
      if (tsCols[x] == tsCols[x - 1]) {
12,186✔
532
        x -= 1;
7✔
533
      } else {
534
        endRowIndex = x;
12,179✔
535
        break;
12,179✔
536
      }
537
    }
538

539
    TSKEY endKey = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey;
12,611!
540
    bool  interp = false;
12,611✔
541
    code = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, nextRowIndex, pBlock->pDataBlock, tsCols,
12,611✔
542
                                           endKey, win, &interp);
543
    QUERY_CHECK_CODE(code, lino, _end);
12,611!
544
    if (interp) {
12,611✔
545
      setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
12,147✔
546
    }
547
  } else {
548
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
×
549
  }
550

551
_end:
12,611✔
552
  if (code != TSDB_CODE_SUCCESS) {
12,611!
553
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
554
  }
555
  return code;
12,611✔
556
}
557

558
static void saveDataBlockLastRow(SArray* pPrevKeys, const SSDataBlock* pBlock, SArray* pCols) {
464✔
559
  if (pBlock->pDataBlock == NULL) {
464!
560
    return;
×
561
  }
562

563
  size_t num = taosArrayGetSize(pPrevKeys);
464✔
564
  for (int32_t k = 0; k < num; ++k) {
2,122✔
565
    SColumn* pc = taosArrayGet(pCols, k);
1,658✔
566

567
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pc->slotId);
1,658✔
568

569
    SGroupKeys* pkey = taosArrayGet(pPrevKeys, k);
1,658✔
570
    for (int32_t i = pBlock->info.rows - 1; i >= 0; --i) {
1,658!
571
      if (colDataIsNull_s(pColInfo, i)) {
3,316!
572
        continue;
×
573
      }
574

575
      char* val = colDataGetData(pColInfo, i);
1,658!
576
      if (IS_VAR_DATA_TYPE(pkey->type)) {
1,658!
577
        memcpy(pkey->pData, val, varDataTLen(val));
×
578
      } else {
579
        memcpy(pkey->pData, val, pkey->bytes);
1,658✔
580
      }
581

582
      break;
1,658✔
583
    }
584
  }
585
}
586

587
static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t numOfExprs, SResultRowInfo* pResultRowInfo,
464✔
588
                                       SSDataBlock* pBlock, int32_t scanFlag, int64_t* tsCols, SResultRowPosition* p) {
589
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
464✔
590

591
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
464✔
592
  SExprSupp*                pSup = &pOperatorInfo->exprSupp;
464✔
593

594
  int32_t startPos = 0;
464✔
595
  int32_t numOfOutput = pSup->numOfExprs;
464✔
596

597
  SResultRow* pResult = NULL;
464✔
598

599
  while (1) {
×
600
    SListNode*          pn = tdListGetHead(pResultRowInfo->openWindow);
464✔
601
    SOpenWindowInfo*    pOpenWin = (SOpenWindowInfo*)pn->data;
464✔
602
    uint64_t            groupId = pOpenWin->groupId;
464✔
603
    SResultRowPosition* p1 = &pOpenWin->pos;
464✔
604
    if (p->pageId == p1->pageId && p->offset == p1->offset) {
464!
605
      break;
464✔
606
    }
607

608
    SResultRow* pr = getResultRowByPos(pInfo->aggSup.pResultBuf, p1, false);
×
609
    if (NULL == pr) {
×
610
      T_LONG_JMP(pTaskInfo->env, terrno);
×
611
    }
612

613
    if (!(pr->offset == p1->offset && pr->pageId == p1->pageId)) {
×
614
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
615
      T_LONG_JMP(pTaskInfo->env, terrno);
×
616
    }
617

618
    if (pr->closed) {
×
619
      if (!(isResultRowInterpolated(pr, RESULT_ROW_START_INTERP) &&
×
620
             isResultRowInterpolated(pr, RESULT_ROW_END_INTERP)) ) {
×
621
        pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
622
        T_LONG_JMP(pTaskInfo->env, terrno);
×
623
      }
624
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
×
625
      taosMemoryFree(pNode);
×
626
      continue;
×
627
    }
628

629
    STimeWindow w = pr->win;
×
630
    int32_t     ret = setTimeWindowOutputBuf(pResultRowInfo, &w, (scanFlag == MAIN_SCAN), &pResult, groupId, pSup->pCtx,
×
631
                                             numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
632
    if (ret != TSDB_CODE_SUCCESS) {
×
633
      T_LONG_JMP(pTaskInfo->env, ret);
×
634
    }
635

636
    if(isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
×
637
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
638
      T_LONG_JMP(pTaskInfo->env, terrno);
×
639
    }
640

641
    SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
×
642
    if (!pTsKey) {
×
643
      pTaskInfo->code = terrno;
×
644
      T_LONG_JMP(pTaskInfo->env, terrno);
×
645
    }
646

647
    int64_t     prevTs = *(int64_t*)pTsKey->pData;
×
648
    if (groupId == pBlock->info.id.groupId) {
×
649
      TSKEY curTs = pBlock->info.window.skey;
×
650
      if (tsCols != NULL) {
×
651
        curTs = tsCols[startPos];
×
652
      }
653
      doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, curTs, startPos, w.ekey,
×
654
                                RESULT_ROW_END_INTERP, pSup);
655
    }
656

657
    setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
×
658
    setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
×
659

660
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, 1);
×
661
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0,
×
662
                                          pBlock->info.rows, numOfExprs);
×
663
    if (ret != TSDB_CODE_SUCCESS) {
×
664
      T_LONG_JMP(pTaskInfo->env, ret);
×
665
    }
666

667
    if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
×
668
      closeResultRow(pr);
×
669
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
×
670
      taosMemoryFree(pNode);
×
671
    } else {  // the remains are can not be closed yet.
672
      break;
×
673
    }
674
  }
675
}
464✔
676

677
static bool tsKeyCompFn(void* l, void* r, void* param) {
12,312,883✔
678
  TSKEY*                    lTS = (TSKEY*)l;
12,312,883✔
679
  TSKEY*                    rTS = (TSKEY*)r;
12,312,883✔
680
  SIntervalAggOperatorInfo* pInfo = param;
12,312,883✔
681
  return pInfo->binfo.outputTsOrder == ORDER_ASC ? *lTS < *rTS : *lTS > *rTS;
12,312,883✔
682
}
683

684
static bool isCalculatedWin(SIntervalAggOperatorInfo* pInfo, const STimeWindow* win, uint64_t tableGroupId) {
3,432,375✔
685
  char keyBuf[sizeof(TSKEY) + sizeof(uint64_t)] = {0};
3,432,375✔
686
  SET_RES_WINDOW_KEY(keyBuf, (char*)&win->skey, sizeof(TSKEY), tableGroupId);
3,432,375✔
687
  return tSimpleHashGet(pInfo->aggSup.pResultRowHashTable, keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))) != NULL;
3,432,375✔
688
}
689

690
/**
691
 * @brief check if cur window should be filtered out by limit info
692
 * @retval true if should be filtered out
693
 * @retval false if not filtering out
694
 * @note If no limit info, we skip filtering.
695
 *       If input/output ts order mismatch, we skip filtering too.
696
 *       eg. input ts order: desc, and output ts order: asc, limit: 10
697
 *       IntervalOperator should output the first 10 windows, however, we can't find the first 10 windows until we scan
698
 *       every tuple in every block.
699
 *       And the boundedQueue keeps refreshing all records with smaller ts key.
700
 */
701
static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STimeWindow* win, uint64_t groupId, SExecTaskInfo* pTaskInfo) {
85,725,565✔
702
  int32_t code = TSDB_CODE_SUCCESS;
85,725,565✔
703
  int32_t lino = 0;
85,725,565✔
704
  if (!pOperatorInfo->limited  // if no limit info, no filter will be applied
85,725,565✔
705
      || pOperatorInfo->binfo.inputTsOrder != pOperatorInfo->binfo.outputTsOrder
10,043,400✔
706
      // if input/output ts order mismatch, no filter
707
  ) {
708
    return false;
82,201,732✔
709
  }
710

711
  if (pOperatorInfo->limit == 0) return true;
3,523,833✔
712

713
  if (pOperatorInfo->pBQ == NULL) {
3,523,219✔
714
    pOperatorInfo->pBQ = createBoundedQueue(pOperatorInfo->limit - 1, tsKeyCompFn, taosAutoMemoryFree, pOperatorInfo);
11,042✔
715
    QUERY_CHECK_NULL(pOperatorInfo->pBQ, code, lino, _end, terrno);
11,039!
716
  }
717

718
  bool shouldFilter = false;
3,523,216✔
719
  // if BQ has been full, compare it with top of BQ
720
  if (taosBQSize(pOperatorInfo->pBQ) == taosBQMaxSize(pOperatorInfo->pBQ) + 1) {
3,523,216✔
721
    PriorityQueueNode* top = taosBQTop(pOperatorInfo->pBQ);
1,532,922✔
722
    shouldFilter = tsKeyCompFn(top->data, &win->skey, pOperatorInfo);
1,529,724✔
723
  }
724
  if (shouldFilter) {
3,491,801✔
725
    return true;
54,889✔
726
  } else if (isCalculatedWin(pOperatorInfo, win, groupId)) {
3,436,912✔
727
    return false;
2,049,504✔
728
  }
729

730
  // cur win not been filtered out and not been pushed into BQ yet, push it into BQ
731
  PriorityQueueNode node = {.data = taosMemoryMalloc(sizeof(TSKEY))};
1,500,236!
732
  QUERY_CHECK_NULL(node.data, code, lino, _end, terrno);
1,507,245!
733

734
  *((TSKEY*)node.data) = win->skey;
1,507,245✔
735

736
  if (NULL == taosBQPush(pOperatorInfo->pBQ, &node)) {
1,507,245✔
737
    taosMemoryFree(node.data);
13!
738
    return true;
×
739
  }
740

741
_end:
1,415,178✔
742
  if (code != TSDB_CODE_SUCCESS) {
1,415,178!
743
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
744
    pTaskInfo->code = code;
×
745
    T_LONG_JMP(pTaskInfo->env, code);
×
746
  }
747
  return false;
1,415,178✔
748
}
749

750
static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
842,870✔
751
                            int32_t scanFlag) {
752
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
842,870✔
753

754
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
842,870✔
755
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
842,870✔
756

757
  int32_t     startPos = 0;
842,870✔
758
  int32_t     numOfOutput = pSup->numOfExprs;
842,870✔
759
  int64_t*    tsCols = extractTsCol(pBlock, pInfo, pTaskInfo);
842,870✔
760
  uint64_t    tableGroupId = pBlock->info.id.groupId;
842,706✔
761
  bool        ascScan = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
842,706✔
762
  TSKEY       ts = getStartTsKey(&pBlock->info.window, tsCols);
842,706✔
763
  SResultRow* pResult = NULL;
842,659✔
764

765
  if (tableGroupId != pInfo->curGroupId) {
842,659✔
766
    pInfo->handledGroupNum += 1;
85,322✔
767
    if (pInfo->slimited && pInfo->handledGroupNum > pInfo->slimit) {
85,322✔
768
      return true;
138✔
769
    } else {
770
      pInfo->curGroupId = tableGroupId;
85,184✔
771
      destroyBoundedQueue(pInfo->pBQ);
85,184✔
772
      pInfo->pBQ = NULL;
85,180✔
773
    }
774
  }
775

776
  STimeWindow win =
777
      getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->binfo.inputTsOrder);
842,517✔
778
  if (filterWindowWithLimit(pInfo, &win, tableGroupId, pTaskInfo)) return false;
842,233✔
779

780
  int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
790,442✔
781
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
782
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
790,856!
783
    T_LONG_JMP(pTaskInfo->env, ret);
19!
784
  }
785

786
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
790,837✔
787
  int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
790,837✔
788
                                                 pInfo->binfo.inputTsOrder);
789

790
  // prev time window not interpolation yet.
791
  if (pInfo->timeWindowInterpo) {
791,051✔
792
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
464✔
793
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
464✔
794

795
    // restore current time window
796
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
464✔
797
                                 numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
798
    if (ret != TSDB_CODE_SUCCESS) {
464!
799
      T_LONG_JMP(pTaskInfo->env, ret);
×
800
    }
801

802
    // window start key interpolation
803
    ret = doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup);
464✔
804
    if (ret != TSDB_CODE_SUCCESS) {
464!
805
      T_LONG_JMP(pTaskInfo->env, ret);
×
806
    }
807
  }
808

809
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, 1);
791,051✔
810
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
790,998✔
811
                                  pBlock->info.rows, numOfOutput);
790,998✔
812
  if (ret != TSDB_CODE_SUCCESS) {
790,782!
813
    T_LONG_JMP(pTaskInfo->env, ret);
×
814
  }
815

816
  doCloseWindow(pResultRowInfo, pInfo, pResult);
790,782✔
817

818
  STimeWindow nextWin = win;
790,715✔
819
  while (1) {
84,964,970✔
820
    int32_t prevEndPos = forwardRows - 1 + startPos;
85,755,685✔
821
    startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos,
85,755,685✔
822
                                      pInfo->binfo.inputTsOrder);
823
    if (startPos < 0 || filterWindowWithLimit(pInfo, &nextWin, tableGroupId, pTaskInfo)) {
85,758,691!
824
      break;
825
    }
826
    // null data, failed to allocate more memory buffer
827
    int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
84,822,384✔
828
                                          pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
829
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
84,317,233!
830
      T_LONG_JMP(pTaskInfo->env, code);
×
831
    }
832

833
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
84,445,985✔
834
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
84,445,985✔
835
                                           pInfo->binfo.inputTsOrder);
836
    // window start(end) key interpolation
837
    code = doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
86,401,025✔
838
    if (code != TSDB_CODE_SUCCESS) {
86,133,092!
839
      T_LONG_JMP(pTaskInfo->env, code);
×
840
    }
841
    // TODO: add to open window? how to close the open windows after input blocks exhausted?
842
#if 0
843
    if ((ascScan && ekey <= pBlock->info.window.ekey) ||
844
        (!ascScan && ekey >= pBlock->info.window.skey)) {
845
      // window start(end) key interpolation
846
      doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
847
    } else if (pInfo->timeWindowInterpo) {
848
      addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
849
    }
850
#endif
851
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, 1);
86,133,092✔
852
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
85,896,680✔
853
                                          pBlock->info.rows, numOfOutput);
85,896,680✔
854
    if (ret != TSDB_CODE_SUCCESS) {
84,993,537!
855
      T_LONG_JMP(pTaskInfo->env, ret);
×
856
    }
857
    doCloseWindow(pResultRowInfo, pInfo, pResult);
84,993,537✔
858
  }
859

860
  if (pInfo->timeWindowInterpo) {
694,053✔
861
    saveDataBlockLastRow(pInfo->pPrevValues, pBlock, pInfo->pInterpCols);
464✔
862
  }
863
  return false;
790,932✔
864
}
865

866
void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult) {
85,744,157✔
867
  // current result is done in computing final results.
868
  if (pInfo->timeWindowInterpo && isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
85,744,157✔
869
    closeResultRow(pResult);
12,147✔
870
    SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
12,147✔
871
    taosMemoryFree(pNode);
12,147!
872
  }
873
}
85,744,157✔
874

875
SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId,
464✔
876
                                       SExecTaskInfo* pTaskInfo) {
877
  int32_t         code = TSDB_CODE_SUCCESS;
464✔
878
  int32_t         lino = 0;
464✔
879
  SOpenWindowInfo openWin = {0};
464✔
880
  openWin.pos.pageId = pResult->pageId;
464✔
881
  openWin.pos.offset = pResult->offset;
464✔
882
  openWin.groupId = groupId;
464✔
883
  SListNode* pn = tdListGetTail(pResultRowInfo->openWindow);
464✔
884
  if (pn == NULL) {
464✔
885
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
458✔
886
    QUERY_CHECK_CODE(code, lino, _end);
458!
887
    return openWin.pos;
458✔
888
  }
889

890
  SOpenWindowInfo* px = (SOpenWindowInfo*)pn->data;
6✔
891
  if (px->pos.pageId != openWin.pos.pageId || px->pos.offset != openWin.pos.offset || px->groupId != openWin.groupId) {
6!
892
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
×
893
    QUERY_CHECK_CODE(code, lino, _end);
×
894
  }
895

896
_end:
6✔
897
  if (code != TSDB_CODE_SUCCESS) {
6!
898
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
899
    pTaskInfo->code = code;
×
900
    T_LONG_JMP(pTaskInfo->env, code);
×
901
  }
902
  return openWin.pos;
6✔
903
}
904

905
int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo) {
887,245✔
906
  TSKEY* tsCols = NULL;
887,245✔
907

908
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
887,245!
909
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
887,264✔
910
    if (!pColDataInfo) {
887,127!
911
      pTaskInfo->code = terrno;
×
912
      T_LONG_JMP(pTaskInfo->env, terrno);
×
913
    }
914

915
    tsCols = (int64_t*)pColDataInfo->pData;
887,127✔
916
    if(tsCols[0] == 0) {
887,127✔
917
      qWarn("%s at line %d.block start ts:%" PRId64 ",end ts:%" PRId64, __func__, __LINE__, tsCols[0], tsCols[pBlock->info.rows - 1]);
1!
918
    }
919

920
    if (tsCols[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
887,113!
921
      int32_t code = blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
96,695✔
922
      if (code != TSDB_CODE_SUCCESS) {
96,697!
923
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
924
        pTaskInfo->code = code;
×
925
        T_LONG_JMP(pTaskInfo->env, code);
×
926
      }
927
    }
928
  }
929

930
  return tsCols;
887,096✔
931
}
932

933
static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
235,720✔
934
  if (OPTR_IS_OPENED(pOperator)) {
235,720✔
935
    return TSDB_CODE_SUCCESS;
170,904✔
936
  }
937

938
  int32_t        code = TSDB_CODE_SUCCESS;
64,816✔
939
  int32_t        lino = 0;
64,816✔
940
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
64,816✔
941
  SOperatorInfo* downstream = pOperator->pDownstream[0];
64,816✔
942

943
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
64,816✔
944
  SExprSupp*                pSup = &pOperator->exprSupp;
64,816✔
945

946
  int32_t scanFlag = MAIN_SCAN;
64,816✔
947
  int64_t st = taosGetTimestampUs();
64,870✔
948

949
  pInfo->cleanGroupResInfo = false;
64,870✔
950
  while (1) {
842,652✔
951
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
907,522✔
952
    if (pBlock == NULL) {
907,523✔
953
      break;
64,768✔
954
    }
955

956
    pInfo->binfo.pRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
842,755✔
957

958
    if (pInfo->scalarSupp.pExprInfo != NULL) {
842,755✔
959
      SExprSupp* pExprSup = &pInfo->scalarSupp;
64,365✔
960
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
64,365✔
961
      QUERY_CHECK_CODE(code, lino, _end);
64,353!
962
    }
963

964
    // the pDataBlock are always the same one, no need to call this again
965
    code = setInputDataBlock(pSup, pBlock, pInfo->binfo.inputTsOrder, scanFlag, true);
842,743✔
966
    QUERY_CHECK_CODE(code, lino, _end);
842,911!
967
    if (hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag)) break;
842,911✔
968
  }
969

970
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->binfo.outputTsOrder);
64,906✔
971
  QUERY_CHECK_CODE(code, lino, _end);
64,902!
972
  pInfo->cleanGroupResInfo = true;
64,902✔
973

974
  OPTR_SET_OPENED(pOperator);
64,902✔
975

976
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
64,909✔
977

978
_end:
64,909✔
979
  if (code != TSDB_CODE_SUCCESS) {
64,909!
980
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
981
    pTaskInfo->code = code;
×
982
    T_LONG_JMP(pTaskInfo->env, code);
×
983
  }
984
  return code;
64,909✔
985
}
986

987
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock) {
6,769✔
988
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
6,769✔
989
  SExprSupp*     pSup = &pOperator->exprSupp;
6,769✔
990

991
  SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->stateCol.slotId);
6,769✔
992
  if (!pStateColInfoData) {
6,769!
993
    pTaskInfo->code = terrno;
×
994
    T_LONG_JMP(pTaskInfo->env, terrno);
×
995
  }
996
  int64_t          gid = pBlock->info.id.groupId;
6,769✔
997

998
  bool    masterScan = true;
6,769✔
999
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
6,769✔
1000
  int32_t bytes = pStateColInfoData->info.bytes;
6,769✔
1001

1002
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
6,769✔
1003
  if (!pColInfoData) {
6,769!
1004
    pTaskInfo->code = terrno;
×
1005
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1006
  }
1007
  TSKEY*           tsList = (TSKEY*)pColInfoData->pData;
6,769✔
1008

1009
  SWindowRowsSup* pRowSup = &pInfo->winSup;
6,769✔
1010
  pRowSup->numOfRows = 0;
6,769✔
1011
  pRowSup->startRowIndex = 0;
6,769✔
1012

1013
  struct SColumnDataAgg* pAgg = NULL;
6,769✔
1014
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
135,192✔
1015
    pAgg = (pBlock->pBlockAgg != NULL) ? &pBlock->pBlockAgg[pInfo->stateCol.slotId] : NULL;
128,423!
1016
    if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pAgg)) {
256,846✔
1017
      continue;
4,510✔
1018
    }
1019

1020
    char* val = colDataGetData(pStateColInfoData, j);
123,913!
1021

1022
    if (gid != pRowSup->groupId || !pInfo->hasKey) {
123,913✔
1023
      // todo extract method
1024
      if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) {
6,271!
1025
        varDataCopy(pInfo->stateKey.pData, val);
108✔
1026
      } else {
1027
        memcpy(pInfo->stateKey.pData, val, bytes);
6,163✔
1028
      }
1029

1030
      pInfo->hasKey = true;
6,271✔
1031

1032
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
6,271✔
1033
      doKeepTuple(pRowSup, tsList[j], gid);
6,271✔
1034
    } else if (compareVal(val, &pInfo->stateKey)) {
117,642✔
1035
      doKeepTuple(pRowSup, tsList[j], gid);
36,337✔
1036
    } else {  // a new state window started
1037
      SResultRow* pResult = NULL;
81,305✔
1038

1039
      // keep the time window for the closed time window.
1040
      STimeWindow window = pRowSup->win;
81,305✔
1041

1042
      pRowSup->win.ekey = pRowSup->win.skey;
81,305✔
1043
      int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
81,305✔
1044
                                           numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1045
      if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
81,305!
1046
        T_LONG_JMP(pTaskInfo->env, ret);
×
1047
      }
1048

1049
      updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
81,305✔
1050
      ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData,
81,305✔
1051
                                            pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
81,305✔
1052
      if (ret != TSDB_CODE_SUCCESS) {
81,305!
1053
        T_LONG_JMP(pTaskInfo->env, ret);
×
1054
      }
1055

1056
      // here we start a new session window
1057
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
81,305✔
1058
      doKeepTuple(pRowSup, tsList[j], gid);
81,305✔
1059

1060
      // todo extract method
1061
      if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) {
81,305!
1062
        varDataCopy(pInfo->stateKey.pData, val);
×
1063
      } else {
1064
        memcpy(pInfo->stateKey.pData, val, bytes);
81,305✔
1065
      }
1066
    }
1067
  }
1068

1069
  SResultRow* pResult = NULL;
6,769✔
1070
  pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
6,769✔
1071
  int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
6,769✔
1072
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1073
  if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
6,769!
1074
    T_LONG_JMP(pTaskInfo->env, ret);
×
1075
  }
1076

1077
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
6,769✔
1078
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
6,769✔
1079
                                        pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
6,769✔
1080
  if (ret != TSDB_CODE_SUCCESS) {
6,769!
1081
    T_LONG_JMP(pTaskInfo->env, ret);
×
1082
  }
1083
}
6,769✔
1084

1085
static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) {
6,936✔
1086
  if (OPTR_IS_OPENED(pOperator)) {
6,936✔
1087
    return TSDB_CODE_SUCCESS;
2,342✔
1088
  }
1089

1090
  int32_t                   code = TSDB_CODE_SUCCESS;
4,594✔
1091
  int32_t                   lino = 0;
4,594✔
1092
  SStateWindowOperatorInfo* pInfo = pOperator->info;
4,594✔
1093
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
4,594✔
1094

1095
  SExprSupp* pSup = &pOperator->exprSupp;
4,594✔
1096
  int32_t    order = pInfo->binfo.inputTsOrder;
4,594✔
1097
  int64_t    st = taosGetTimestampUs();
4,595✔
1098

1099
  SOperatorInfo* downstream = pOperator->pDownstream[0];
4,595✔
1100
  pInfo->cleanGroupResInfo = false;
4,595✔
1101
  while (1) {
6,769✔
1102
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
11,364✔
1103
    if (pBlock == NULL) {
11,365✔
1104
      break;
4,596✔
1105
    }
1106

1107
    pInfo->binfo.pRes->info.scanFlag = pBlock->info.scanFlag;
6,769✔
1108
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
6,769✔
1109
    QUERY_CHECK_CODE(code, lino, _end);
6,769!
1110

1111
    code = blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
6,769✔
1112
    QUERY_CHECK_CODE(code, lino, _end);
6,769!
1113

1114
    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
1115
    if (pInfo->scalarSup.pExprInfo != NULL) {
6,769✔
1116
      pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
3,475✔
1117
                                              pInfo->scalarSup.numOfExprs, NULL);
1118
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
3,475!
1119
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1120
      }
1121
    }
1122

1123
    doStateWindowAggImpl(pOperator, pInfo, pBlock);
6,769✔
1124
  }
1125

1126
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
4,596✔
1127
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
4,596✔
1128
  QUERY_CHECK_CODE(code, lino, _end);
4,596!
1129
  pInfo->cleanGroupResInfo = true;
4,596✔
1130
  pOperator->status = OP_RES_TO_RETURN;
4,596✔
1131

1132
_end:
4,596✔
1133
  if (code != TSDB_CODE_SUCCESS) {
4,596!
1134
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1135
    pTaskInfo->code = code;
×
1136
    T_LONG_JMP(pTaskInfo->env, code);
×
1137
  }
1138
  return code;
4,596✔
1139
}
1140

1141
static int32_t doStateWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
10,640✔
1142
  if (pOperator->status == OP_EXEC_DONE) {
10,640✔
1143
    (*ppRes) = NULL;
3,704✔
1144
    return TSDB_CODE_SUCCESS;
3,704✔
1145
  }
1146

1147
  int32_t                   code = TSDB_CODE_SUCCESS;
6,936✔
1148
  int32_t                   lino = 0;
6,936✔
1149
  SStateWindowOperatorInfo* pInfo = pOperator->info;
6,936✔
1150
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
6,936✔
1151
  SOptrBasicInfo*           pBInfo = &pInfo->binfo;
6,936✔
1152

1153
  code = pOperator->fpSet._openFn(pOperator);
6,936✔
1154
  QUERY_CHECK_CODE(code, lino, _end);
6,938!
1155

1156
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
6,938✔
1157
  QUERY_CHECK_CODE(code, lino, _end);
6,938!
1158

1159
  while (1) {
×
1160
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
6,938✔
1161
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
6,938✔
1162
    QUERY_CHECK_CODE(code, lino, _end);
6,938!
1163

1164
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
6,938✔
1165
    if (!hasRemain) {
6,938✔
1166
      setOperatorCompleted(pOperator);
4,596✔
1167
      break;
4,596✔
1168
    }
1169

1170
    if (pBInfo->pRes->info.rows > 0) {
2,342!
1171
      break;
2,342✔
1172
    }
1173
  }
1174

1175
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
6,938✔
1176

1177
_end:
6,938✔
1178
  if (code != TSDB_CODE_SUCCESS) {
6,938!
1179
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1180
    pTaskInfo->code = code;
×
1181
    T_LONG_JMP(pTaskInfo->env, code);
×
1182
  }
1183
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
6,938✔
1184
  return code;
6,938✔
1185
}
1186

1187
static int32_t doBuildIntervalResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
270,512✔
1188
  int32_t                   code = TSDB_CODE_SUCCESS;
270,512✔
1189
  int32_t                   lino = 0;
270,512✔
1190
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
270,512✔
1191
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
270,512✔
1192

1193
  if (pOperator->status == OP_EXEC_DONE) {
270,512✔
1194
    (*ppRes) = NULL;
34,778✔
1195
    return code;
34,778✔
1196
  }
1197

1198
  SSDataBlock* pBlock = pInfo->binfo.pRes;
235,734✔
1199
  code = pOperator->fpSet._openFn(pOperator);
235,734✔
1200
  QUERY_CHECK_CODE(code, lino, _end);
235,811!
1201

1202
  while (1) {
15✔
1203
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
235,826✔
1204
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
235,809✔
1205
    QUERY_CHECK_CODE(code, lino, _end);
235,804!
1206

1207
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
235,804✔
1208
    if (!hasRemain) {
235,808✔
1209
      setOperatorCompleted(pOperator);
64,858✔
1210
      break;
64,865✔
1211
    }
1212

1213
    if (pBlock->info.rows > 0) {
170,950✔
1214
      break;
170,935✔
1215
    }
1216
  }
1217

1218
  size_t rows = pBlock->info.rows;
235,800✔
1219
  pOperator->resultInfo.totalRows += rows;
235,800✔
1220

1221
_end:
235,800✔
1222
  if (code != TSDB_CODE_SUCCESS) {
235,800!
1223
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1224
    pTaskInfo->code = code;
×
1225
    T_LONG_JMP(pTaskInfo->env, code);
×
1226
  }
1227
  (*ppRes) = (rows == 0) ? NULL : pBlock;
235,800✔
1228
  return code;
235,800✔
1229
}
1230

1231
static void destroyStateWindowOperatorInfo(void* param) {
4,596✔
1232
  if (param == NULL) {
4,596!
1233
    return;
×
1234
  }
1235
  SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
4,596✔
1236
  cleanupBasicInfo(&pInfo->binfo);
4,596✔
1237
  taosMemoryFreeClear(pInfo->stateKey.pData);
4,596!
1238
  if (pInfo->pOperator) {
4,596!
1239
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
4,596✔
1240
                      pInfo->cleanGroupResInfo);
4,596✔
1241
    pInfo->pOperator = NULL;
4,596✔
1242
  }
1243

1244
  cleanupExprSupp(&pInfo->scalarSup);
4,596✔
1245
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
4,596✔
1246
  cleanupAggSup(&pInfo->aggSup);
4,596✔
1247
  cleanupGroupResInfo(&pInfo->groupResInfo);
4,596✔
1248

1249
  taosMemoryFreeClear(param);
4,596!
1250
}
1251

1252
static void freeItem(void* param) {
1,630✔
1253
  SGroupKeys* pKey = (SGroupKeys*)param;
1,630✔
1254
  taosMemoryFree(pKey->pData);
1,630!
1255
}
1,630✔
1256

1257
void destroyIntervalOperatorInfo(void* param) {
73,636✔
1258
  if (param == NULL) {
73,636!
1259
    return;
×
1260
  }
1261

1262
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
73,636✔
1263

1264
  cleanupBasicInfo(&pInfo->binfo);
73,636✔
1265
  if (pInfo->pOperator) {
73,645!
1266
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
73,645✔
1267
                      pInfo->cleanGroupResInfo);
73,645✔
1268
    pInfo->pOperator = NULL;
73,643✔
1269
  }
1270

1271
  cleanupAggSup(&pInfo->aggSup);
73,643✔
1272
  cleanupExprSupp(&pInfo->scalarSupp);
73,643✔
1273

1274
  pInfo->binfo.resultRowInfo.openWindow = tdListFree(pInfo->binfo.resultRowInfo.openWindow);
73,645✔
1275

1276
  taosArrayDestroy(pInfo->pInterpCols);
73,646✔
1277
  pInfo->pInterpCols = NULL;
73,637✔
1278

1279
  taosArrayDestroyEx(pInfo->pPrevValues, freeItem);
73,637✔
1280
  pInfo->pPrevValues = NULL;
73,636✔
1281

1282
  cleanupGroupResInfo(&pInfo->groupResInfo);
73,636✔
1283
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
73,646✔
1284
  destroyBoundedQueue(pInfo->pBQ);
73,646✔
1285
  taosMemoryFreeClear(param);
73,635!
1286
}
1287

1288
static int32_t timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SIntervalAggOperatorInfo* pInfo,
73,568✔
1289
                                   bool* pRes) {
1290
  // the primary timestamp column
1291
  bool    needed = false;
73,568✔
1292
  int32_t code = TSDB_CODE_SUCCESS;
73,568✔
1293
  int32_t lino = 0;
73,568✔
1294
  void*   tmp = NULL;
73,568✔
1295

1296
  for (int32_t i = 0; i < numOfCols; ++i) {
410,065✔
1297
    SExprInfo* pExpr = pCtx[i].pExpr;
336,946✔
1298
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
336,946✔
1299
      needed = true;
450✔
1300
      break;
450✔
1301
    }
1302
  }
1303

1304
  if (needed) {
73,569✔
1305
    pInfo->pInterpCols = taosArrayInit(4, sizeof(SColumn));
450✔
1306
    QUERY_CHECK_NULL(pInfo->pInterpCols, code, lino, _end, terrno);
450!
1307

1308
    pInfo->pPrevValues = taosArrayInit(4, sizeof(SGroupKeys));
450✔
1309
    QUERY_CHECK_NULL(pInfo->pPrevValues, code, lino, _end, terrno);
450!
1310

1311
    {  // ts column
1312
      SColumn c = {0};
450✔
1313
      c.colId = 1;
450✔
1314
      c.slotId = pInfo->primaryTsIndex;
450✔
1315
      c.type = TSDB_DATA_TYPE_TIMESTAMP;
450✔
1316
      c.bytes = sizeof(int64_t);
450✔
1317
      tmp = taosArrayPush(pInfo->pInterpCols, &c);
450✔
1318
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
450!
1319

1320
      SGroupKeys key;
1321
      key.bytes = c.bytes;
450✔
1322
      key.type = c.type;
450✔
1323
      key.isNull = true;  // to denote no value is assigned yet
450✔
1324
      key.pData = taosMemoryCalloc(1, c.bytes);
450!
1325
      QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
450!
1326

1327
      tmp = taosArrayPush(pInfo->pPrevValues, &key);
450✔
1328
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
450!
1329
    }
1330
  }
1331

1332
  for (int32_t i = 0; i < numOfCols; ++i) {
413,020✔
1333
    SExprInfo* pExpr = pCtx[i].pExpr;
339,414✔
1334

1335
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
339,414✔
1336
      SFunctParam* pParam = &pExpr->base.pParam[0];
1,175✔
1337

1338
      SColumn c = *pParam->pCol;
1,175✔
1339
      tmp = taosArrayPush(pInfo->pInterpCols, &c);
1,175✔
1340
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1,180!
1341

1342
      SGroupKeys key = {0};
1,180✔
1343
      key.bytes = c.bytes;
1,180✔
1344
      key.type = c.type;
1,180✔
1345
      key.isNull = false;
1,180✔
1346
      key.pData = taosMemoryCalloc(1, c.bytes);
1,180!
1347
      QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
1,180!
1348

1349
      tmp = taosArrayPush(pInfo->pPrevValues, &key);
1,180✔
1350
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1,180!
1351
    }
1352
  }
1353

1354
_end:
73,606✔
1355
  if (code != TSDB_CODE_SUCCESS) {
73,606!
1356
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1357
  }
1358
  *pRes = needed;
73,619✔
1359
  return code;
73,619✔
1360
}
1361

1362
int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
64,849✔
1363
                                   SOperatorInfo** pOptrInfo) {
1364
  QRY_PARAM_CHECK(pOptrInfo);
64,849!
1365

1366
  int32_t                   code = TSDB_CODE_SUCCESS;
64,849✔
1367
  int32_t                   lino = 0;
64,849✔
1368
  SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
64,849!
1369
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
64,893!
1370
  if (pInfo == NULL || pOperator == NULL) {
64,895!
1371
    code = terrno;
×
1372
    lino = __LINE__;
×
1373
    goto _error;
×
1374
  }
1375

1376
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->window.node.pOutputDataBlockDesc);
64,898✔
1377
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
64,902!
1378
  initBasicInfo(&pInfo->binfo, pResBlock);
64,902✔
1379

1380
  SExprSupp* pSup = &pOperator->exprSupp;
64,896✔
1381
  pSup->hasWindowOrGroup = true;
64,896✔
1382

1383
  pInfo->primaryTsIndex = ((SColumnNode*)pPhyNode->window.pTspk)->slotId;
64,896✔
1384

1385
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
64,896✔
1386
  initResultSizeInfo(&pOperator->resultInfo, 512);
64,896✔
1387
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
64,902✔
1388
  QUERY_CHECK_CODE(code, lino, _error);
64,907!
1389

1390
  int32_t    num = 0;
64,907✔
1391
  SExprInfo* pExprInfo = NULL;
64,907✔
1392
  code = createExprInfo(pPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
64,907✔
1393
  QUERY_CHECK_CODE(code, lino, _error);
64,892!
1394

1395
  code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
64,892✔
1396
                    &pTaskInfo->storageAPI.functionStore);
1397
  QUERY_CHECK_CODE(code, lino, _error);
64,893!
1398

1399
  SInterval interval = {.interval = pPhyNode->interval,
64,893✔
1400
                        .sliding = pPhyNode->sliding,
64,893✔
1401
                        .intervalUnit = pPhyNode->intervalUnit,
64,893✔
1402
                        .slidingUnit = pPhyNode->slidingUnit,
64,893✔
1403
                        .offset = pPhyNode->offset,
64,893✔
1404
                        .precision = ((SColumnNode*)pPhyNode->window.pTspk)->node.resType.precision,
64,893✔
1405
                        .timeRange = pPhyNode->timeRange};
1406
  calcIntervalAutoOffset(&interval);
64,893✔
1407

1408
  STimeWindowAggSupp as = {
64,897✔
1409
      .waterMark = pPhyNode->window.watermark,
64,897✔
1410
      .calTrigger = pPhyNode->window.triggerType,
64,897✔
1411
      .maxTs = INT64_MIN,
1412
  };
1413

1414
  pInfo->win = pTaskInfo->window;
64,897✔
1415
  pInfo->binfo.inputTsOrder = pPhyNode->window.node.inputTsOrder;
64,897✔
1416
  pInfo->binfo.outputTsOrder = pPhyNode->window.node.outputTsOrder;
64,897✔
1417
  pInfo->interval = interval;
64,897✔
1418
  pInfo->twAggSup = as;
64,897✔
1419
  pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock;
64,897✔
1420
  if (pPhyNode->window.node.pLimit) {
64,897✔
1421
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pLimit;
5,950✔
1422
    pInfo->limited = true;
5,950✔
1423
    pInfo->limit = pLimit->limit + pLimit->offset;
5,950✔
1424
  }
1425
  if (pPhyNode->window.node.pSlimit) {
64,897✔
1426
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pSlimit;
555✔
1427
    pInfo->slimited = true;
555✔
1428
    pInfo->slimit = pLimit->limit + pLimit->offset;
555✔
1429
    pInfo->curGroupId = UINT64_MAX;
555✔
1430
  }
1431

1432
  if (pPhyNode->window.pExprs != NULL) {
64,897✔
1433
    int32_t    numOfScalar = 0;
2,974✔
1434
    SExprInfo* pScalarExprInfo = NULL;
2,974✔
1435
    code = createExprInfo(pPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
2,974✔
1436
    QUERY_CHECK_CODE(code, lino, _error);
2,974!
1437

1438
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
2,974✔
1439
    if (code != TSDB_CODE_SUCCESS) {
2,973!
1440
      goto _error;
×
1441
    }
1442
  }
1443

1444
  code = filterInitFromNode((SNode*)pPhyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
64,896✔
1445
  if (code != TSDB_CODE_SUCCESS) {
64,854!
1446
    goto _error;
×
1447
  }
1448

1449
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
64,854✔
1450
  QUERY_CHECK_CODE(code, lino, _error);
64,905!
1451

1452
  pInfo->timeWindowInterpo = false;
64,905✔
1453
  code = timeWindowinterpNeeded(pSup->pCtx, num, pInfo, &pInfo->timeWindowInterpo);
64,905✔
1454
  QUERY_CHECK_CODE(code, lino, _error);
64,868!
1455
  if (pInfo->timeWindowInterpo) {
64,868✔
1456
    pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
450✔
1457
    if (pInfo->binfo.resultRowInfo.openWindow == NULL) {
450!
1458
      goto _error;
×
1459
    }
1460
  }
1461

1462
  pInfo->pOperator = pOperator;
64,868✔
1463
  pInfo->cleanGroupResInfo = false;
64,868✔
1464
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
64,868✔
1465
  setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED,
64,861✔
1466
                  pInfo, pTaskInfo);
1467

1468
  pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResultNext, NULL, destroyIntervalOperatorInfo,
64,853✔
1469
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1470

1471
  code = appendDownstream(pOperator, &downstream, 1);
64,844✔
1472
  if (code != TSDB_CODE_SUCCESS) {
64,874!
1473
    goto _error;
×
1474
  }
1475

1476
  *pOptrInfo = pOperator;
64,874✔
1477
  return TSDB_CODE_SUCCESS;
64,874✔
1478

1479
_error:
×
1480
  if (pInfo != NULL) {
×
1481
    destroyIntervalOperatorInfo(pInfo);
×
1482
  }
1483

1484
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1485
  pTaskInfo->code = code;
×
1486
  qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code));
×
1487
  return code;
×
1488
}
1489

1490
// todo handle multiple timeline cases. assume no timeline interweaving
1491
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) {
27,272✔
1492
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
27,272✔
1493
  SExprSupp*     pSup = &pOperator->exprSupp;
27,272✔
1494

1495
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
27,272✔
1496
  if (!pColInfoData) {
27,272!
1497
    pTaskInfo->code = terrno;
×
1498
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1499
  }
1500

1501
  bool    masterScan = true;
27,272✔
1502
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
27,272✔
1503
  int64_t gid = pBlock->info.id.groupId;
27,272✔
1504

1505
  int64_t gap = pInfo->gap;
27,272✔
1506

1507
  if (!pInfo->reptScan) {
27,272✔
1508
    pInfo->reptScan = true;
6,931✔
1509
    pInfo->winSup.prevTs = INT64_MIN;
6,931✔
1510
  }
1511

1512
  SWindowRowsSup* pRowSup = &pInfo->winSup;
27,272✔
1513
  pRowSup->numOfRows = 0;
27,272✔
1514
  pRowSup->startRowIndex = 0;
27,272✔
1515

1516
  // In case of ascending or descending order scan data, only one time window needs to be kepted for each table.
1517
  TSKEY* tsList = (TSKEY*)pColInfoData->pData;
27,272✔
1518
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
283,653✔
1519
    if (gid != pRowSup->groupId || pInfo->winSup.prevTs == INT64_MIN) {
256,381✔
1520
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
27,234✔
1521
      doKeepTuple(pRowSup, tsList[j], gid);
27,234✔
1522
    } else if (((tsList[j] - pRowSup->prevTs >= 0) && (tsList[j] - pRowSup->prevTs <= gap)) ||
229,147✔
1523
               ((pRowSup->prevTs - tsList[j] >= 0) && (pRowSup->prevTs - tsList[j] <= gap))) {
52,012!
1524
      // The gap is less than the threshold, so it belongs to current session window that has been opened already.
1525
      doKeepTuple(pRowSup, tsList[j], gid);
177,155✔
1526
    } else {  // start a new session window
1527
      // start a new session window
1528
      if (pRowSup->numOfRows > 0) {  // handled data that belongs to the previous session window
51,992✔
1529
        SResultRow* pResult = NULL;
51,955✔
1530

1531
        // keep the time window for the closed time window.
1532
        STimeWindow window = pRowSup->win;
51,955✔
1533

1534
        int32_t ret =
1535
            setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
51,955✔
1536
                                   numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1537
        if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
51,955!
1538
          T_LONG_JMP(pTaskInfo->env, ret);
×
1539
        }
1540

1541
        // pInfo->numOfRows data belong to the current session window
1542
        updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
51,955✔
1543
        ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
51,955✔
1544
                                              pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
51,955✔
1545
        if (ret != TSDB_CODE_SUCCESS) {
51,955!
1546
          T_LONG_JMP(pTaskInfo->env, ret);
×
1547
        }
1548
      }
1549

1550
      // here we start a new session window
1551
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
51,992✔
1552
      doKeepTuple(pRowSup, tsList[j], gid);
51,992✔
1553
    }
1554
  }
1555

1556
  SResultRow* pResult = NULL;
27,272✔
1557
  pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
27,272✔
1558
  int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
27,272✔
1559
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1560
  if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
27,272!
1561
    T_LONG_JMP(pTaskInfo->env, ret);
×
1562
  }
1563

1564
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
27,272✔
1565
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
27,272✔
1566
                                        pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
27,272✔
1567
  if (ret != TSDB_CODE_SUCCESS) {
27,272!
1568
    T_LONG_JMP(pTaskInfo->env, ret);
×
1569
  }
1570
}
27,272✔
1571

1572
static int32_t doSessionWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
34,181✔
1573
  if (pOperator->status == OP_EXEC_DONE) {
34,181✔
1574
    (*ppRes) = NULL;
6,132✔
1575
    return TSDB_CODE_SUCCESS;
6,132✔
1576
  }
1577

1578
  int32_t                  code = TSDB_CODE_SUCCESS;
28,049✔
1579
  int32_t                  lino = 0;
28,049✔
1580
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
28,049✔
1581
  SSessionAggOperatorInfo* pInfo = pOperator->info;
28,049✔
1582
  SOptrBasicInfo*          pBInfo = &pInfo->binfo;
28,049✔
1583
  SExprSupp*               pSup = &pOperator->exprSupp;
28,049✔
1584

1585
  pInfo->cleanGroupResInfo = false;
28,049✔
1586
  if (pOperator->status == OP_RES_TO_RETURN) {
28,049✔
1587
    while (1) {
×
1588
      doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
20,303✔
1589
      code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
20,303✔
1590
      QUERY_CHECK_CODE(code, lino, _end);
20,303!
1591

1592
      bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
20,303✔
1593
      if (!hasRemain) {
20,303✔
1594
        setOperatorCompleted(pOperator);
3,515✔
1595
        break;
3,515✔
1596
      }
1597

1598
      if (pBInfo->pRes->info.rows > 0) {
16,788!
1599
        break;
16,788✔
1600
      }
1601
    }
1602
    pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
20,303✔
1603
    (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
20,303!
1604
    return code;
20,303✔
1605
  }
1606

1607
  int64_t st = taosGetTimestampUs();
7,747✔
1608
  int32_t order = pInfo->binfo.inputTsOrder;
7,747✔
1609

1610
  SOperatorInfo* downstream = pOperator->pDownstream[0];
7,747✔
1611

1612
  while (1) {
27,272✔
1613
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
35,019✔
1614
    if (pBlock == NULL) {
35,023✔
1615
      break;
7,751✔
1616
    }
1617

1618
    pBInfo->pRes->info.scanFlag = pBlock->info.scanFlag;
27,272✔
1619
    if (pInfo->scalarSupp.pExprInfo != NULL) {
27,272✔
1620
      SExprSupp* pExprSup = &pInfo->scalarSupp;
3✔
1621
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
3✔
1622
      QUERY_CHECK_CODE(code, lino, _end);
3!
1623
    }
1624
    // the pDataBlock are always the same one, no need to call this again
1625
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
27,272✔
1626
    QUERY_CHECK_CODE(code, lino, _end);
27,272!
1627

1628
    code = blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
27,272✔
1629
    QUERY_CHECK_CODE(code, lino, _end);
27,272!
1630

1631
    doSessionWindowAggImpl(pOperator, pInfo, pBlock);
27,272✔
1632
  }
1633

1634
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
7,751✔
1635

1636
  // restore the value
1637
  pOperator->status = OP_RES_TO_RETURN;
7,751✔
1638

1639
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
7,751✔
1640
  QUERY_CHECK_CODE(code, lino, _end);
7,751!
1641
  pInfo->cleanGroupResInfo = true;
7,751✔
1642

1643
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
7,751✔
1644
  QUERY_CHECK_CODE(code, lino, _end);
7,751!
1645
  while (1) {
×
1646
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
7,751✔
1647
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
7,749✔
1648
    QUERY_CHECK_CODE(code, lino, _end);
7,749!
1649

1650
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
7,749✔
1651
    if (!hasRemain) {
7,749✔
1652
      setOperatorCompleted(pOperator);
4,234✔
1653
      break;
4,234✔
1654
    }
1655

1656
    if (pBInfo->pRes->info.rows > 0) {
3,515!
1657
      break;
3,515✔
1658
    }
1659
  }
1660
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
7,749✔
1661

1662
_end:
7,749✔
1663
  if (code != TSDB_CODE_SUCCESS) {
7,749!
1664
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1665
    pTaskInfo->code = code;
×
1666
    T_LONG_JMP(pTaskInfo->env, code);
×
1667
  }
1668
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
7,749✔
1669
  return code;
7,749✔
1670
}
1671

1672
// todo make this as an non-blocking operator
1673
int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode,
4,596✔
1674
                                      SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
1675
  QRY_PARAM_CHECK(pOptrInfo);
4,596!
1676

1677
  int32_t                   code = TSDB_CODE_SUCCESS;
4,596✔
1678
  int32_t                   lino = 0;
4,596✔
1679
  SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
4,596!
1680
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
4,596!
1681
  if (pInfo == NULL || pOperator == NULL) {
4,596!
1682
    code = terrno;
×
1683
    goto _error;
×
1684
  }
1685

1686
  pOperator->exprSupp.hasWindowOrGroup = true;
4,596✔
1687
  int32_t      tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
4,596✔
1688
  SColumnNode* pColNode = (SColumnNode*)(pStateNode->pStateKey);
4,596✔
1689

1690
  if (pStateNode->window.pExprs != NULL) {
4,596✔
1691
    int32_t    numOfScalarExpr = 0;
3,067✔
1692
    SExprInfo* pScalarExprInfo = NULL;
3,067✔
1693
    code = createExprInfo(pStateNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
3,067✔
1694
    QUERY_CHECK_CODE(code, lino, _error);
3,067!
1695

1696
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
3,067✔
1697
    if (code != TSDB_CODE_SUCCESS) {
3,067!
1698
      goto _error;
×
1699
    }
1700
  }
1701

1702
  pInfo->stateCol = extractColumnFromColumnNode(pColNode);
4,596✔
1703
  pInfo->stateKey.type = pInfo->stateCol.type;
4,596✔
1704
  pInfo->stateKey.bytes = pInfo->stateCol.bytes;
4,596✔
1705
  pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes);
4,596!
1706
  if (pInfo->stateKey.pData == NULL) {
4,596!
1707
    goto _error;
×
1708
  }
1709
  pInfo->binfo.inputTsOrder = pStateNode->window.node.inputTsOrder;
4,596✔
1710
  pInfo->binfo.outputTsOrder = pStateNode->window.node.outputTsOrder;
4,596✔
1711

1712
  code = filterInitFromNode((SNode*)pStateNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
4,596✔
1713
  if (code != TSDB_CODE_SUCCESS) {
4,596!
1714
    goto _error;
×
1715
  }
1716

1717
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
4,596✔
1718

1719
  int32_t    num = 0;
4,596✔
1720
  SExprInfo* pExprInfo = NULL;
4,596✔
1721
  code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &num);
4,596✔
1722
  QUERY_CHECK_CODE(code, lino, _error);
4,596!
1723

1724
  initResultSizeInfo(&pOperator->resultInfo, 4096);
4,596✔
1725

1726
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
4,596✔
1727
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
4,596✔
1728
  if (code != TSDB_CODE_SUCCESS) {
4,596!
1729
    goto _error;
×
1730
  }
1731

1732
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pStateNode->window.node.pOutputDataBlockDesc);
4,596✔
1733
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
4,596!
1734
  initBasicInfo(&pInfo->binfo, pResBlock);
4,596✔
1735
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
4,596✔
1736

1737
  pInfo->twAggSup =
4,596✔
1738
      (STimeWindowAggSupp){.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};
4,596✔
1739

1740
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
4,596✔
1741
  QUERY_CHECK_CODE(code, lino, _error);
4,595!
1742

1743
  pInfo->tsSlotId = tsSlotId;
4,595✔
1744
  pInfo->pOperator = pOperator;
4,595✔
1745
  pInfo->cleanGroupResInfo = false;
4,595✔
1746
  setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
4,595✔
1747
                  pTaskInfo);
1748
  pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAggNext, NULL, destroyStateWindowOperatorInfo,
4,595✔
1749
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1750

1751
  code = appendDownstream(pOperator, &downstream, 1);
4,595✔
1752
  if (code != TSDB_CODE_SUCCESS) {
4,596!
1753
    goto _error;
×
1754
  }
1755

1756
  *pOptrInfo = pOperator;
4,596✔
1757
  return TSDB_CODE_SUCCESS;
4,596✔
1758

1759
_error:
×
1760
  if (pInfo != NULL) {
×
1761
    destroyStateWindowOperatorInfo(pInfo);
×
1762
  }
1763

1764
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1765
  pTaskInfo->code = code;
×
1766
  return code;
×
1767
}
1768

1769
void destroySWindowOperatorInfo(void* param) {
7,751✔
1770
  SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param;
7,751✔
1771
  if (pInfo == NULL) {
7,751!
1772
    return;
×
1773
  }
1774

1775
  cleanupBasicInfo(&pInfo->binfo);
7,751✔
1776
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
7,751✔
1777
  if (pInfo->pOperator) {
7,751!
1778
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
7,751✔
1779
                      pInfo->cleanGroupResInfo);
7,751✔
1780
    pInfo->pOperator = NULL;
7,751✔
1781
  }
1782

1783
  cleanupAggSup(&pInfo->aggSup);
7,751✔
1784
  cleanupExprSupp(&pInfo->scalarSupp);
7,751✔
1785

1786
  cleanupGroupResInfo(&pInfo->groupResInfo);
7,751✔
1787
  taosMemoryFreeClear(param);
7,751!
1788
}
1789

1790
int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
7,747✔
1791
                                     SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
1792
  QRY_PARAM_CHECK(pOptrInfo);
7,747!
1793

1794
  int32_t                  code = TSDB_CODE_SUCCESS;
7,747✔
1795
  int32_t                  lino = 0;
7,747✔
1796
  SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
7,747!
1797
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
7,749!
1798
  if (pInfo == NULL || pOperator == NULL) {
7,748!
1799
    code = terrno;
×
1800
    goto _error;
×
1801
  }
1802

1803
  pOperator->exprSupp.hasWindowOrGroup = true;
7,748✔
1804

1805
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
7,748✔
1806
  initResultSizeInfo(&pOperator->resultInfo, 4096);
7,748✔
1807

1808
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc);
7,748✔
1809
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
7,751!
1810
  initBasicInfo(&pInfo->binfo, pResBlock);
7,751✔
1811

1812
  int32_t      numOfCols = 0;
7,751✔
1813
  SExprInfo*   pExprInfo = NULL;
7,751✔
1814
  code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
7,751✔
1815
  QUERY_CHECK_CODE(code, lino, _error);
7,750!
1816

1817
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
7,750✔
1818
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
7,750✔
1819
  QUERY_CHECK_CODE(code, lino, _error);
7,749!
1820

1821
  pInfo->twAggSup.waterMark = pSessionNode->window.watermark;
7,749✔
1822
  pInfo->twAggSup.calTrigger = pSessionNode->window.triggerType;
7,749✔
1823
  pInfo->gap = pSessionNode->gap;
7,749✔
1824

1825
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
7,749✔
1826
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
7,748✔
1827
  QUERY_CHECK_CODE(code, lino, _error);
7,750!
1828

1829
  pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
7,750✔
1830
  pInfo->binfo.pRes = pResBlock;
7,750✔
1831
  pInfo->winSup.prevTs = INT64_MIN;
7,750✔
1832
  pInfo->reptScan = false;
7,750✔
1833
  pInfo->binfo.inputTsOrder = pSessionNode->window.node.inputTsOrder;
7,750✔
1834
  pInfo->binfo.outputTsOrder = pSessionNode->window.node.outputTsOrder;
7,750✔
1835

1836
  if (pSessionNode->window.pExprs != NULL) {
7,750✔
1837
    int32_t    numOfScalar = 0;
1✔
1838
    SExprInfo* pScalarExprInfo = NULL;
1✔
1839
    code = createExprInfo(pSessionNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
1✔
1840
    QUERY_CHECK_CODE(code, lino, _error);
1!
1841

1842
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
1✔
1843
    QUERY_CHECK_CODE(code, lino, _error);
1!
1844
  }
1845

1846
  code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
7,750✔
1847
  QUERY_CHECK_CODE(code, lino, _error);
7,747!
1848

1849
  pInfo->pOperator = pOperator;
7,747✔
1850
  pInfo->cleanGroupResInfo = false;
7,747✔
1851
  setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED,
7,747✔
1852
                  pInfo, pTaskInfo);
1853
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSessionWindowAggNext, NULL, destroySWindowOperatorInfo,
7,747✔
1854
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1855
  pOperator->pTaskInfo = pTaskInfo;
7,746✔
1856
  code = appendDownstream(pOperator, &downstream, 1);
7,746✔
1857
  QUERY_CHECK_CODE(code, lino, _error);
7,751!
1858

1859
  *pOptrInfo = pOperator;
7,751✔
1860
  return TSDB_CODE_SUCCESS;
7,751✔
1861

1862
_error:
×
1863
  if (pInfo != NULL) destroySWindowOperatorInfo(pInfo);
×
1864
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1865
  pTaskInfo->code = code;
×
1866
  return code;
×
1867
}
1868

1869
void destroyMAIOperatorInfo(void* param) {
8,725✔
1870
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = (SMergeAlignedIntervalAggOperatorInfo*)param;
8,725✔
1871
  destroyIntervalOperatorInfo(miaInfo->intervalAggOperatorInfo);
8,725✔
1872
  taosMemoryFreeClear(param);
8,725!
1873
}
8,725✔
1874

1875
static SResultRow* doSetSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, SAggSupporter* pSup) {
6,110✔
1876
  SResultRow* pResult = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
6,110✔
1877
  if (NULL == pResult) {
6,110!
1878
    return pResult;
×
1879
  }
1880
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
6,110✔
1881
  return pResult;
6,110✔
1882
}
1883

1884
static int32_t setSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult,
11,297,909✔
1885
                                       SExprSupp* pExprSup, SAggSupporter* pAggSup) {
1886
  if (*pResult == NULL) {
11,297,909✔
1887
    *pResult = doSetSingleOutputTupleBuf(pResultRowInfo, pAggSup);
6,110✔
1888
    if (*pResult == NULL) {
6,110!
1889
      return terrno;
×
1890
    }
1891
  }
1892

1893
  // set time window for current result
1894
  (*pResult)->win = (*win);
11,297,909✔
1895
  return setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
11,297,909✔
1896
}
1897

1898
static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,
44,383✔
1899
                                          SSDataBlock* pBlock, SSDataBlock* pResultBlock) {
1900
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
44,383✔
1901
  SIntervalAggOperatorInfo*             iaInfo = miaInfo->intervalAggOperatorInfo;
44,383✔
1902

1903
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
44,383✔
1904
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
44,383✔
1905
  SInterval*     pInterval = &iaInfo->interval;
44,383✔
1906

1907
  int32_t  startPos = 0;
44,383✔
1908
  int64_t* tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
44,383✔
1909

1910
  TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
44,383✔
1911

1912
  // there is an result exists
1913
  if (miaInfo->curTs != INT64_MIN) {
44,383✔
1914
    if (ts != miaInfo->curTs) {
21,871✔
1915
      finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
18,630✔
1916
      resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
18,629✔
1917
      miaInfo->curTs = ts;
18,629✔
1918
    }
1919
  } else {
1920
    miaInfo->curTs = ts;
22,512✔
1921
  }
1922

1923
  STimeWindow win = {0};
44,382✔
1924
  win.skey = miaInfo->curTs;
44,382✔
1925
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
44,382✔
1926

1927
  int32_t ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
44,383✔
1928
  if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
44,383!
1929
    T_LONG_JMP(pTaskInfo->env, ret);
×
1930
  }
1931

1932
  int32_t currPos = startPos;
44,383✔
1933

1934
  STimeWindow currWin = win;
44,383✔
1935
  while (++currPos < pBlock->info.rows) {
28,956,929✔
1936
    if (tsCols[currPos] == miaInfo->curTs) {
28,913,846✔
1937
      continue;
17,667,053✔
1938
    }
1939

1940
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
11,246,793✔
1941
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
11,249,793✔
1942
                                          currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
11,249,793✔
1943
    if (ret != TSDB_CODE_SUCCESS) {
11,256,020!
1944
      T_LONG_JMP(pTaskInfo->env, ret);
×
1945
    }
1946

1947
    finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
11,256,020✔
1948
    resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
11,237,798✔
1949
    miaInfo->curTs = tsCols[currPos];
11,240,908✔
1950

1951
    currWin.skey = miaInfo->curTs;
11,240,908✔
1952
    currWin.ekey = taosTimeAdd(currWin.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
11,240,908✔
1953

1954
    startPos = currPos;
11,252,573✔
1955
    ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
11,252,573✔
1956
    if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
11,246,924✔
1957
      T_LONG_JMP(pTaskInfo->env, ret);
1,431!
1958
    }
1959

1960
    miaInfo->curTs = currWin.skey;
11,245,493✔
1961
  }
1962

1963
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
43,083✔
1964
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
44,383✔
1965
                                        currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
44,383✔
1966
  if (ret != TSDB_CODE_SUCCESS) {
44,383!
1967
    T_LONG_JMP(pTaskInfo->env, ret);
×
1968
  }
1969
}
44,383✔
1970

1971
static void cleanupAfterGroupResultGen(SMergeAlignedIntervalAggOperatorInfo* pMiaInfo, SSDataBlock* pRes) {
22,377✔
1972
  pRes->info.id.groupId = pMiaInfo->groupId;
22,377✔
1973
  pMiaInfo->curTs = INT64_MIN;
22,377✔
1974
  pMiaInfo->groupId = 0;
22,377✔
1975
}
22,377✔
1976

1977
static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
37,529✔
1978
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
37,529✔
1979
  int32_t                               code = TSDB_CODE_SUCCESS;
37,529✔
1980
  int32_t                               lino = 0;
37,529✔
1981
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
37,529✔
1982
  SIntervalAggOperatorInfo*             pIaInfo = pMiaInfo->intervalAggOperatorInfo;
37,529✔
1983

1984
  SExprSupp*      pSup = &pOperator->exprSupp;
37,529✔
1985
  SSDataBlock*    pRes = pIaInfo->binfo.pRes;
37,529✔
1986
  SResultRowInfo* pResultRowInfo = &pIaInfo->binfo.resultRowInfo;
37,529✔
1987
  SOperatorInfo*  downstream = pOperator->pDownstream[0];
37,529✔
1988

1989
  while (1) {
31,845✔
1990
    SSDataBlock* pBlock = NULL;
69,374✔
1991
    if (pMiaInfo->prefetchedBlock == NULL) {
69,374✔
1992
      pBlock = getNextBlockFromDownstream(pOperator, 0);
52,972✔
1993
    } else {
1994
      pBlock = pMiaInfo->prefetchedBlock;
16,402✔
1995
      pMiaInfo->prefetchedBlock = NULL;
16,402✔
1996

1997
      pMiaInfo->groupId = pBlock->info.id.groupId;
16,402✔
1998
    }
1999

2000
    // no data exists, all query processing is done
2001
    if (pBlock == NULL) {
69,374✔
2002
      // close last unclosed time window
2003
      if (pMiaInfo->curTs != INT64_MIN) {
8,590✔
2004
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
5,975✔
2005
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
5,975✔
2006
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
5,975✔
2007
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
5,975✔
2008
        QUERY_CHECK_CODE(code, lino, _end);
5,975!
2009
      }
2010

2011
      setOperatorCompleted(pOperator);
8,590✔
2012
      break;
8,590✔
2013
    }
2014

2015
    if (pMiaInfo->groupId == 0) {
60,784✔
2016
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
20,933✔
2017
        pMiaInfo->groupId = pBlock->info.id.groupId;
2,048✔
2018
        pRes->info.id.groupId = pMiaInfo->groupId;
2,048✔
2019
      }
2020
    } else {
2021
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
39,851✔
2022
        // if there are unclosed time window, close it firstly.
2023
        if (pMiaInfo->curTs == INT64_MIN) {
16,402!
2024
          pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2025
          T_LONG_JMP(pTaskInfo->env, terrno);
×
2026
        }
2027
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
16,402✔
2028
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
16,402✔
2029

2030
        pMiaInfo->prefetchedBlock = pBlock;
16,402✔
2031
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
16,402✔
2032
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
16,402✔
2033
        QUERY_CHECK_CODE(code, lino, _end);
16,402!
2034
        if (pRes->info.rows == 0) {
16,402✔
2035
          // After filtering for last group, the result is empty, so we need to continue to process next group
2036
          continue;
34✔
2037
        } else {
2038
          break;
16,368✔
2039
        }
2040
      } else {
2041
        // continue
2042
        pRes->info.id.groupId = pMiaInfo->groupId;
23,449✔
2043
      }
2044
    }
2045

2046
    pRes->info.scanFlag = pBlock->info.scanFlag;
44,382✔
2047
    code = setInputDataBlock(pSup, pBlock, pIaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
44,382✔
2048
    QUERY_CHECK_CODE(code, lino, _end);
44,383!
2049

2050
    doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes);
44,383✔
2051

2052
    code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
44,383✔
2053
    QUERY_CHECK_CODE(code, lino, _end);
44,382!
2054

2055
    if (pRes->info.rows >= pOperator->resultInfo.capacity) {
44,382✔
2056
      break;
12,571✔
2057
    }
2058
  }
2059

2060
_end:
37,529✔
2061
  if (code != TSDB_CODE_SUCCESS) {
37,529!
2062
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2063
    pTaskInfo->code = code;
×
2064
    T_LONG_JMP(pTaskInfo->env, code);
×
2065
  }
2066
}
37,529✔
2067

2068
static int32_t mergeAlignedIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
40,692✔
2069
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
40,692✔
2070
  int32_t                               code = TSDB_CODE_SUCCESS;
40,692✔
2071
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
40,692✔
2072
  SIntervalAggOperatorInfo*             iaInfo = pMiaInfo->intervalAggOperatorInfo;
40,692✔
2073
  if (pOperator->status == OP_EXEC_DONE) {
40,692✔
2074
    (*ppRes) = NULL;
7,600✔
2075
    return code;
7,600✔
2076
  }
2077

2078
  SSDataBlock* pRes = iaInfo->binfo.pRes;
33,092✔
2079
  blockDataCleanup(pRes);
33,092✔
2080

2081
  if (iaInfo->binfo.mergeResultBlock) {
33,092✔
2082
    while (1) {
2083
      if (pOperator->status == OP_EXEC_DONE) {
28,157✔
2084
        break;
2,957✔
2085
      }
2086

2087
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
25,200✔
2088
        break;
8,903✔
2089
      }
2090

2091
      doMergeAlignedIntervalAgg(pOperator);
16,297✔
2092
    }
2093
  } else {
2094
    doMergeAlignedIntervalAgg(pOperator);
21,232✔
2095
  }
2096

2097
  size_t rows = pRes->info.rows;
33,092✔
2098
  pOperator->resultInfo.totalRows += rows;
33,092✔
2099
  (*ppRes) = (rows == 0) ? NULL : pRes;
33,092✔
2100
  return code;
33,092✔
2101
}
2102

2103
int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
8,725✔
2104
                                                      SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2105
  QRY_PARAM_CHECK(pOptrInfo);
8,725!
2106

2107
  int32_t                               code = TSDB_CODE_SUCCESS;
8,725✔
2108
  int32_t                               lino = 0;
8,725✔
2109
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedIntervalAggOperatorInfo));
8,725!
2110
  SOperatorInfo*                        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
8,725!
2111
  if (miaInfo == NULL || pOperator == NULL) {
8,725!
2112
    code = terrno;
×
2113
    goto _error;
×
2114
  }
2115

2116
  miaInfo->intervalAggOperatorInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
8,725!
2117
  if (miaInfo->intervalAggOperatorInfo == NULL) {
8,725!
2118
    code = terrno;
×
2119
    goto _error;
×
2120
  }
2121

2122
  SInterval interval = {.interval = pNode->interval,
8,725✔
2123
                        .sliding = pNode->sliding,
8,725✔
2124
                        .intervalUnit = pNode->intervalUnit,
8,725✔
2125
                        .slidingUnit = pNode->slidingUnit,
8,725✔
2126
                        .offset = pNode->offset,
8,725✔
2127
                        .precision = ((SColumnNode*)pNode->window.pTspk)->node.resType.precision,
8,725✔
2128
                        .timeRange = pNode->timeRange};
2129
  calcIntervalAutoOffset(&interval);
8,725✔
2130

2131
  SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
8,725✔
2132
  SExprSupp*                pSup = &pOperator->exprSupp;
8,725✔
2133
  pSup->hasWindowOrGroup = true;
8,725✔
2134

2135
  code = filterInitFromNode((SNode*)pNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
8,725✔
2136
  QUERY_CHECK_CODE(code, lino, _error);
8,725!
2137

2138
  miaInfo->curTs = INT64_MIN;
8,725✔
2139
  iaInfo->win = pTaskInfo->window;
8,725✔
2140
  iaInfo->binfo.inputTsOrder = pNode->window.node.inputTsOrder;
8,725✔
2141
  iaInfo->binfo.outputTsOrder = pNode->window.node.outputTsOrder;
8,725✔
2142
  iaInfo->interval = interval;
8,725✔
2143
  iaInfo->primaryTsIndex = ((SColumnNode*)pNode->window.pTspk)->slotId;
8,725✔
2144
  iaInfo->binfo.mergeResultBlock = pNode->window.mergeDataBlock;
8,725✔
2145

2146
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
8,725✔
2147
  initResultSizeInfo(&pOperator->resultInfo, 512);
8,725✔
2148

2149
  int32_t    num = 0;
8,725✔
2150
  SExprInfo* pExprInfo = NULL;
8,725✔
2151
  code = createExprInfo(pNode->window.pFuncs, NULL, &pExprInfo, &num);
8,725✔
2152
  QUERY_CHECK_CODE(code, lino, _error);
8,725!
2153

2154
  code = initAggSup(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
8,725✔
2155
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
8,725✔
2156
  QUERY_CHECK_CODE(code, lino, _error);
8,724!
2157

2158
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pNode->window.node.pOutputDataBlockDesc);
8,724✔
2159
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
8,725!
2160
  initBasicInfo(&iaInfo->binfo, pResBlock);
8,725✔
2161
  code = initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
8,725✔
2162
  QUERY_CHECK_CODE(code, lino, _error);
8,725!
2163

2164
  iaInfo->timeWindowInterpo = false;
8,725✔
2165
  code = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo, &iaInfo->timeWindowInterpo);
8,725✔
2166
  QUERY_CHECK_CODE(code, lino, _error);
8,725!
2167
  if (iaInfo->timeWindowInterpo) {
8,725!
2168
    iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
2169
  }
2170

2171
  initResultRowInfo(&iaInfo->binfo.resultRowInfo);
8,725✔
2172
  code = blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity);
8,725✔
2173
  QUERY_CHECK_CODE(code, lino, _error);
8,725!
2174
  iaInfo->pOperator = pOperator;
8,725✔
2175
  setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL,
8,725✔
2176
                  false, OP_NOT_OPENED, miaInfo, pTaskInfo);
2177

2178
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAggNext, NULL, destroyMAIOperatorInfo,
8,725✔
2179
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2180

2181
  code = appendDownstream(pOperator, &downstream, 1);
8,725✔
2182
  QUERY_CHECK_CODE(code, lino, _error);
8,725!
2183

2184
  *pOptrInfo = pOperator;
8,725✔
2185
  return TSDB_CODE_SUCCESS;
8,725✔
2186

2187
_error:
×
2188
  if (miaInfo != NULL) destroyMAIOperatorInfo(miaInfo);
×
2189
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2190
  pTaskInfo->code = code;
×
2191
  return code;
×
2192
}
2193

2194
//=====================================================================================================================
2195
// merge interval operator
2196
typedef struct SMergeIntervalAggOperatorInfo {
2197
  SIntervalAggOperatorInfo intervalAggOperatorInfo;
2198
  SList*                   groupIntervals;
2199
  SListIter                groupIntervalsIter;
2200
  bool                     hasGroupId;
2201
  uint64_t                 groupId;
2202
  SSDataBlock*             prefetchedBlock;
2203
  bool                     inputBlocksFinished;
2204
} SMergeIntervalAggOperatorInfo;
2205

2206
typedef struct SGroupTimeWindow {
2207
  uint64_t    groupId;
2208
  STimeWindow window;
2209
} SGroupTimeWindow;
2210

2211
void destroyMergeIntervalOperatorInfo(void* param) {
×
2212
  SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
×
2213
  miaInfo->groupIntervals = tdListFree(miaInfo->groupIntervals);
×
2214
  destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo);
×
2215

2216
  taosMemoryFreeClear(param);
×
2217
}
×
2218

2219
static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock,
×
2220
                                        STimeWindow* newWin) {
2221
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
2222
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2223
  bool                           ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
2224

2225
  SGroupTimeWindow groupTimeWindow = {.groupId = tableGroupId, .window = *newWin};
×
2226
  int32_t          code = tdListAppend(miaInfo->groupIntervals, &groupTimeWindow);
×
2227
  if (code != TSDB_CODE_SUCCESS) {
×
2228
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2229
    return code;
×
2230
  }
2231

2232
  SListIter iter = {0};
×
2233
  tdListInitIter(miaInfo->groupIntervals, &iter, TD_LIST_FORWARD);
×
2234
  SListNode* listNode = NULL;
×
2235
  while ((listNode = tdListNext(&iter)) != NULL) {
×
2236
    SGroupTimeWindow* prevGrpWin = (SGroupTimeWindow*)listNode->data;
×
2237
    if (prevGrpWin->groupId != tableGroupId) {
×
2238
      continue;
×
2239
    }
2240

2241
    STimeWindow* prevWin = &prevGrpWin->window;
×
2242
    if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) {
×
2243
      SListNode* tmp = tdListPopNode(miaInfo->groupIntervals, listNode);
×
2244
      taosMemoryFreeClear(tmp);
×
2245
    }
2246
  }
2247

2248
  return TSDB_CODE_SUCCESS;
×
2249
}
2250

2251
static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
×
2252
                                   int32_t scanFlag, SSDataBlock* pResultBlock) {
2253
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
2254
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2255

2256
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
×
2257
  SExprSupp*     pExprSup = &pOperatorInfo->exprSupp;
×
2258

2259
  int32_t     startPos = 0;
×
2260
  int32_t     numOfOutput = pExprSup->numOfExprs;
×
2261
  int64_t*    tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
×
2262
  uint64_t    tableGroupId = pBlock->info.id.groupId;
×
2263
  bool        ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
2264
  TSKEY       blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
×
2265
  SResultRow* pResult = NULL;
×
2266

2267
  STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
×
2268
                                        iaInfo->binfo.inputTsOrder);
2269

2270
  int32_t ret =
2271
      setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
2272
                             numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2273
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
×
2274
    T_LONG_JMP(pTaskInfo->env, ret);
×
2275
  }
2276

2277
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
×
2278
  int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
2279
                                                 iaInfo->binfo.inputTsOrder);
2280
  if(forwardRows <= 0) {
×
2281
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2282
  }
2283

2284
  // prev time window not interpolation yet.
2285
  if (iaInfo->timeWindowInterpo) {
×
2286
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
×
2287
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
×
2288

2289
    // restore current time window
2290
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
2291
                                 numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2292
    if (ret != TSDB_CODE_SUCCESS) {
×
2293
      T_LONG_JMP(pTaskInfo->env, ret);
×
2294
    }
2295

2296
    // window start key interpolation
2297
    ret = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &win, startPos, forwardRows, pExprSup);
×
2298
    if (ret != TSDB_CODE_SUCCESS) {
×
2299
      T_LONG_JMP(pTaskInfo->env, ret);
×
2300
    }
2301
  }
2302

2303
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, 1);
×
2304
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,
×
2305
                                  pBlock->info.rows, numOfOutput);
×
2306
  if (ret != TSDB_CODE_SUCCESS) {
×
2307
    T_LONG_JMP(pTaskInfo->env, ret);
×
2308
  }
2309
  doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
2310

2311
  // output previous interval results after this interval (&win) is closed
2312
  int32_t code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &win);
×
2313
  if (code != TSDB_CODE_SUCCESS) {
×
2314
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2315
    T_LONG_JMP(pTaskInfo->env, code);
×
2316
  }
2317

2318
  STimeWindow nextWin = win;
×
2319
  while (1) {
×
2320
    int32_t prevEndPos = forwardRows - 1 + startPos;
×
2321
    startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos,
×
2322
                                      iaInfo->binfo.inputTsOrder);
2323
    if (startPos < 0) {
×
2324
      break;
×
2325
    }
2326

2327
    // null data, failed to allocate more memory buffer
2328
    code =
2329
        setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
×
2330
                               pExprSup->pCtx, numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2331
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
×
2332
      T_LONG_JMP(pTaskInfo->env, code);
×
2333
    }
2334

2335
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
×
2336
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
2337
                                           iaInfo->binfo.inputTsOrder);
2338

2339
    // window start(end) key interpolation
2340
    code = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup);
×
2341
    if (code != TSDB_CODE_SUCCESS) {
×
2342
      T_LONG_JMP(pTaskInfo->env, code);
×
2343
    }
2344

2345
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, 1);
×
2346
    code = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
×
2347
                                           forwardRows, pBlock->info.rows, numOfOutput);
×
2348
    if (code != TSDB_CODE_SUCCESS) {
×
2349
      T_LONG_JMP(pTaskInfo->env, code);
×
2350
    }
2351
    doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
2352

2353
    // output previous interval results after this interval (&nextWin) is closed
2354
    code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &nextWin);
×
2355
    if (code != TSDB_CODE_SUCCESS) {
×
2356
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2357
      T_LONG_JMP(pTaskInfo->env, code);
×
2358
    }
2359
  }
2360

2361
  if (iaInfo->timeWindowInterpo) {
×
2362
    saveDataBlockLastRow(iaInfo->pPrevValues, pBlock, iaInfo->pInterpCols);
×
2363
  }
2364
}
×
2365

2366
static int32_t doMergeIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
2367
  int32_t        code = TSDB_CODE_SUCCESS;
×
2368
  int32_t        lino = 0;
×
2369
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
×
2370

2371
  SMergeIntervalAggOperatorInfo* miaInfo = pOperator->info;
×
2372
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2373
  SExprSupp*                     pExpSupp = &pOperator->exprSupp;
×
2374

2375
  if (pOperator->status == OP_EXEC_DONE) {
×
2376
    (*ppRes) = NULL;
×
2377
    return code;
×
2378
  }
2379

2380
  SSDataBlock* pRes = iaInfo->binfo.pRes;
×
2381
  blockDataCleanup(pRes);
×
2382
  code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
×
2383
  QUERY_CHECK_CODE(code, lino, _end);
×
2384

2385
  if (!miaInfo->inputBlocksFinished) {
×
2386
    SOperatorInfo* downstream = pOperator->pDownstream[0];
×
2387
    while (1) {
×
2388
      SSDataBlock* pBlock = NULL;
×
2389
      if (miaInfo->prefetchedBlock == NULL) {
×
2390
        pBlock = getNextBlockFromDownstream(pOperator, 0);
×
2391
      } else {
2392
        pBlock = miaInfo->prefetchedBlock;
×
2393
        miaInfo->groupId = pBlock->info.id.groupId;
×
2394
        miaInfo->prefetchedBlock = NULL;
×
2395
      }
2396

2397
      if (pBlock == NULL) {
×
2398
        tdListInitIter(miaInfo->groupIntervals, &miaInfo->groupIntervalsIter, TD_LIST_FORWARD);
×
2399
        miaInfo->inputBlocksFinished = true;
×
2400
        break;
×
2401
      }
2402

2403
      if (!miaInfo->hasGroupId) {
×
2404
        miaInfo->hasGroupId = true;
×
2405
        miaInfo->groupId = pBlock->info.id.groupId;
×
2406
      } else if (miaInfo->groupId != pBlock->info.id.groupId) {
×
2407
        miaInfo->prefetchedBlock = pBlock;
×
2408
        break;
×
2409
      }
2410

2411
      pRes->info.scanFlag = pBlock->info.scanFlag;
×
2412
      code = setInputDataBlock(pExpSupp, pBlock, iaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
×
2413
      QUERY_CHECK_CODE(code, lino, _end);
×
2414

2415
      doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, pBlock->info.scanFlag, pRes);
×
2416

2417
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
×
2418
        break;
×
2419
      }
2420
    }
2421

2422
    pRes->info.id.groupId = miaInfo->groupId;
×
2423
  }
2424

2425
  if (miaInfo->inputBlocksFinished) {
×
2426
    SListNode* listNode = tdListNext(&miaInfo->groupIntervalsIter);
×
2427

2428
    if (listNode != NULL) {
×
2429
      SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data);
×
2430
      pRes->info.id.groupId = grpWin->groupId;
×
2431
    }
2432
  }
2433

2434
  if (pRes->info.rows == 0) {
×
2435
    setOperatorCompleted(pOperator);
×
2436
  }
2437

2438
  size_t rows = pRes->info.rows;
×
2439
  pOperator->resultInfo.totalRows += rows;
×
2440

2441
_end:
×
2442
  if (code != TSDB_CODE_SUCCESS) {
×
2443
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2444
    pTaskInfo->code = code;
×
2445
    T_LONG_JMP(pTaskInfo->env, code);
×
2446
  }
2447
  (*ppRes) = (rows == 0) ? NULL : pRes;
×
2448
  return code;
×
2449
}
2450

2451
int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
×
2452
                                               SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2453
  QRY_PARAM_CHECK(pOptrInfo);
×
2454

2455
  int32_t                        code = TSDB_CODE_SUCCESS;
×
2456
  int32_t                        lino = 0;
×
2457
  SMergeIntervalAggOperatorInfo* pMergeIntervalInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo));
×
2458
  SOperatorInfo*                 pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
2459
  if (pMergeIntervalInfo == NULL || pOperator == NULL) {
×
2460
    code = terrno;
×
2461
    goto _error;
×
2462
  }
2463

2464
  SInterval interval = {.interval = pIntervalPhyNode->interval,
×
2465
                        .sliding = pIntervalPhyNode->sliding,
×
2466
                        .intervalUnit = pIntervalPhyNode->intervalUnit,
×
2467
                        .slidingUnit = pIntervalPhyNode->slidingUnit,
×
2468
                        .offset = pIntervalPhyNode->offset,
×
2469
                        .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision,
×
2470
                        .timeRange = pIntervalPhyNode->timeRange};
2471
  calcIntervalAutoOffset(&interval);
×
2472

2473
  pMergeIntervalInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow));
×
2474

2475
  SIntervalAggOperatorInfo* pIntervalInfo = &pMergeIntervalInfo->intervalAggOperatorInfo;
×
2476
  pIntervalInfo->win = pTaskInfo->window;
×
2477
  pIntervalInfo->binfo.inputTsOrder = pIntervalPhyNode->window.node.inputTsOrder;
×
2478
  pIntervalInfo->interval = interval;
×
2479
  pIntervalInfo->binfo.mergeResultBlock = pIntervalPhyNode->window.mergeDataBlock;
×
2480
  pIntervalInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
×
2481
  pIntervalInfo->binfo.outputTsOrder = pIntervalPhyNode->window.node.outputTsOrder;
×
2482

2483
  SExprSupp* pExprSupp = &pOperator->exprSupp;
×
2484
  pExprSupp->hasWindowOrGroup = true;
×
2485

2486
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
×
2487
  initResultSizeInfo(&pOperator->resultInfo, 4096);
×
2488

2489
  int32_t    num = 0;
×
2490
  SExprInfo* pExprInfo = NULL;
×
2491
  code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
×
2492
  QUERY_CHECK_CODE(code, lino, _error);
×
2493

2494
  code = initAggSup(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
×
2495
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
×
2496
  if (code != TSDB_CODE_SUCCESS) {
×
2497
    goto _error;
×
2498
  }
2499

2500
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pIntervalPhyNode->window.node.pOutputDataBlockDesc);
×
2501
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
×
2502
  initBasicInfo(&pIntervalInfo->binfo, pResBlock);
×
2503
  code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win);
×
2504
  QUERY_CHECK_CODE(code, lino, _error);
×
2505

2506
  pIntervalInfo->timeWindowInterpo = false;
×
2507
  code = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo, &pIntervalInfo->timeWindowInterpo);
×
2508
  QUERY_CHECK_CODE(code, lino, _error);
×
2509
  if (pIntervalInfo->timeWindowInterpo) {
×
2510
    pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
2511
    if (pIntervalInfo->binfo.resultRowInfo.openWindow == NULL) {
×
2512
      goto _error;
×
2513
    }
2514
  }
2515

2516
  pIntervalInfo->pOperator = pOperator;
×
2517
  initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo);
×
2518
  setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false,
×
2519
                  OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo);
2520
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAggNext, NULL, destroyMergeIntervalOperatorInfo,
×
2521
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2522

2523
  code = appendDownstream(pOperator, &downstream, 1);
×
2524
  if (code != TSDB_CODE_SUCCESS) {
×
2525
    goto _error;
×
2526
  }
2527

2528
  *pOptrInfo = pOperator;
×
2529
  return TSDB_CODE_SUCCESS;
×
2530
_error:
×
2531
  if (pMergeIntervalInfo != NULL) {
×
2532
    destroyMergeIntervalOperatorInfo(pMergeIntervalInfo);
×
2533
  }
2534
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2535
  pTaskInfo->code = code;
×
2536
  return code;
×
2537
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc