• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3525

10 Nov 2024 03:50AM UTC coverage: 60.818% (-0.08%) from 60.898%
#3525

push

travis-ci

web-flow
Merge pull request #28709 from taosdata/main

merge: from main to 3.0 branch

118634 of 249004 branches covered (47.64%)

Branch coverage included in aggregate %.

136 of 169 new or added lines in 23 files covered. (80.47%)

542 existing lines in 129 files now uncovered.

199071 of 273386 relevant lines covered (72.82%)

15691647.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.42
/source/libs/executor/src/timewindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "executorInt.h"
16
#include "filter.h"
17
#include "function.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "querytask.h"
21
#include "tchecksum.h"
22
#include "tcommon.h"
23
#include "tcompare.h"
24
#include "tdatablock.h"
25
#include "tfill.h"
26
#include "tglobal.h"
27
#include "tlog.h"
28
#include "ttime.h"
29

30
typedef struct SSessionAggOperatorInfo {
31
  SOptrBasicInfo     binfo;
32
  SAggSupporter      aggSup;
33
  SExprSupp          scalarSupp;  // supporter for perform scalar function
34
  SGroupResInfo      groupResInfo;
35
  SWindowRowsSup     winSup;
36
  bool               reptScan;  // next round scan
37
  int64_t            gap;       // session window gap
38
  int32_t            tsSlotId;  // primary timestamp slot id
39
  STimeWindowAggSupp twAggSup;
40
  SOperatorInfo*     pOperator;
41
  bool               cleanGroupResInfo;
42
} SSessionAggOperatorInfo;
43

44
typedef struct SStateWindowOperatorInfo {
45
  SOptrBasicInfo     binfo;
46
  SAggSupporter      aggSup;
47
  SExprSupp          scalarSup;
48
  SGroupResInfo      groupResInfo;
49
  SWindowRowsSup     winSup;
50
  SColumn            stateCol;  // start row index
51
  bool               hasKey;
52
  SStateKeys         stateKey;
53
  int32_t            tsSlotId;  // primary timestamp column slot id
54
  STimeWindowAggSupp twAggSup;
55
  SOperatorInfo*     pOperator;
56
  bool               cleanGroupResInfo;
57
} SStateWindowOperatorInfo;
58

59
typedef enum SResultTsInterpType {
60
  RESULT_ROW_START_INTERP = 1,
61
  RESULT_ROW_END_INTERP = 2,
62
} SResultTsInterpType;
63

64
typedef struct SOpenWindowInfo {
65
  SResultRowPosition pos;
66
  uint64_t           groupId;
67
} SOpenWindowInfo;
68

69
static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo);
70

71
static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult,
72
                                              uint64_t groupId, SExecTaskInfo* pTaskInfo);
73
static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult);
74

75
static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, bool masterscan,
257,820,136✔
76
                                      SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
77
                                      int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
78
                                      SExecTaskInfo* pTaskInfo) {
79
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
257,820,136✔
80
                                                  masterscan, tableGroupId, pTaskInfo, true, pAggSup, true);
81

82
  if (pResultRow == NULL || pTaskInfo->code != 0) {
259,123,362!
83
    *pResult = NULL;
×
84
    return pTaskInfo->code;
×
85
  }
86

87
  // set time window for current result
88
  pResultRow->win = (*win);
259,198,826✔
89

90
  *pResult = pResultRow;
259,198,826✔
91
  return setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
259,198,826✔
92
}
93

94
void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) {
135,198,109✔
95
  pRowSup->win.ekey = ts;
135,198,109✔
96
  pRowSup->prevTs = ts;
135,198,109✔
97
  pRowSup->numOfRows += 1;
135,198,109✔
98
  pRowSup->groupId = groupId;
135,198,109✔
99
}
135,198,109✔
100

101
void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex,
111,391,169✔
102
                                     uint64_t groupId) {
103
  pRowSup->startRowIndex = rowIndex;
111,391,169✔
104
  pRowSup->numOfRows = 0;
111,391,169✔
105
  pRowSup->win.skey = tsList[rowIndex];
111,391,169✔
106
  pRowSup->groupId = groupId;
111,391,169✔
107
}
111,391,169✔
108

109
FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos,
×
110
                                            int32_t order, int64_t* pData) {
111
  int32_t forwardRows = 0;
237,737,067✔
112

113
  if (order == TSDB_ORDER_ASC) {
×
114
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
188,375,401✔
115
    if (end >= 0) {
190,623,422!
116
      forwardRows = end;
190,654,677✔
117

118
      while (pData[end + pos] == ekey) {
192,668,067!
119
        forwardRows += 1;
2,013,390✔
120
        ++pos;
2,013,390✔
121
      }
122
    }
123
  } else {
124
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
49,361,666✔
125
    if (end >= 0) {
49,720,641!
126
      forwardRows = end;
49,720,938✔
127

128
      while (pData[end + pos] == ekey) {
67,461,734!
129
        forwardRows += 1;
17,740,796✔
130
        ++pos;
17,740,796✔
131
      }
132
    }
133
    //    int32_t end = searchFn((char*)pData, pos + 1, ekey, order);
134
    //    if (end >= 0) {
135
    //      forwardRows = pos - end;
136
    //
137
    //      if (pData[end] == ekey) {
138
    //        forwardRows += 1;
139
    //      }
140
    //    }
141
  }
142

143
  return forwardRows;
240,344,063✔
144
}
145

146
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
350,147,173✔
147
  int32_t midPos = -1;
350,147,173✔
148
  int32_t numOfRows;
149

150
  if (num <= 0) {
350,147,173!
151
    return -1;
×
152
  }
153

154
  TSKEY*  keyList = (TSKEY*)pValue;
350,147,173✔
155
  int32_t firstPos = 0;
350,147,173✔
156
  int32_t lastPos = num - 1;
350,147,173✔
157

158
  if (order == TSDB_ORDER_DESC) {
350,147,173✔
159
    // find the first position which is smaller than the key
160
    while (1) {
161
      if (key >= keyList[firstPos]) return firstPos;
388,230,656✔
162
      if (key == keyList[lastPos]) return lastPos;
356,089,263✔
163

164
      if (key < keyList[lastPos]) {
355,534,534✔
165
        lastPos += 1;
44,089,810✔
166
        if (lastPos >= num) {
44,089,810!
167
          return -1;
×
168
        } else {
169
          return lastPos;
44,089,810✔
170
        }
171
      }
172

173
      numOfRows = lastPos - firstPos + 1;
311,444,724✔
174
      midPos = (numOfRows >> 1) + firstPos;
311,444,724✔
175

176
      if (key < keyList[midPos]) {
311,444,724✔
177
        firstPos = midPos + 1;
70,051,302✔
178
      } else if (key > keyList[midPos]) {
241,393,422✔
179
        lastPos = midPos - 1;
240,922,546✔
180
      } else {
181
        break;
470,876✔
182
      }
183
    }
184

185
  } else {
186
    // find the first position which is bigger than the key
187
    while (1) {
188
      if (key <= keyList[firstPos]) return firstPos;
2,128,367,186✔
189
      if (key == keyList[lastPos]) return lastPos;
2,046,637,735✔
190

191
      if (key > keyList[lastPos]) {
2,044,955,997✔
192
        lastPos = lastPos + 1;
190,808,363✔
193
        if (lastPos >= num)
190,808,363!
194
          return -1;
×
195
        else
196
          return lastPos;
190,808,363✔
197
      }
198

199
      numOfRows = lastPos - firstPos + 1;
1,854,147,634✔
200
      midPos = (numOfRows >> 1u) + firstPos;
1,854,147,634✔
201

202
      if (key < keyList[midPos]) {
1,854,147,634✔
203
        lastPos = midPos - 1;
1,494,090,250✔
204
      } else if (key > keyList[midPos]) {
360,057,384!
205
        firstPos = midPos + 1;
361,386,571✔
206
      } else {
UNCOV
207
        break;
×
208
      }
209
    }
210
  }
211

212
  return midPos;
×
213
}
214

215
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
243,230,784✔
216
                                 __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order) {
217
  int32_t num = -1;
243,230,784✔
218
  int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
243,230,784✔
219

220
  if (order == TSDB_ORDER_ASC) {
243,230,784✔
221
    if (ekey < pDataBlockInfo->window.ekey && pPrimaryColumn) {
192,116,602✔
222
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
187,999,031!
223
      if (item != NULL) {
190,623,422!
224
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
225
      }
226
    } else {
227
      num = pDataBlockInfo->rows - startPos;
4,117,571✔
228
      if (item != NULL) {
4,117,571!
229
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
230
      }
231
    }
232
  } else {  // desc
233
    if (ekey > pDataBlockInfo->window.skey && pPrimaryColumn) {
51,114,182!
234
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
49,738,036!
235
      if (item != NULL) {
49,720,641!
236
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
237
      }
238
    } else {
239
      num = pDataBlockInfo->rows - startPos;
1,376,146✔
240
      if (item != NULL) {
1,376,146!
241
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
242
      }
243
    }
244
  }
245

246
  return num;
245,837,780✔
247
}
248

249
void doTimeWindowInterpolation(SArray* pPrevValues, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs,
21,480,306✔
250
                               int32_t curRowIndex, TSKEY windowKey, int32_t type, SExprSupp* pSup) {
251
  SqlFunctionCtx* pCtx = pSup->pCtx;
21,480,306✔
252

253
  int32_t index = 1;
21,480,306✔
254
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
45,719,424✔
255
    if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) {
24,238,689✔
256
      pCtx[k].start.key = INT64_MIN;
2,749,997✔
257
      continue;
2,749,997✔
258
    }
259

260
    SFunctParam*     pParam = &pCtx[k].param[0];
21,488,561✔
261
    SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, pParam->pCol->slotId);
21,488,561✔
262

263
    double v1 = 0, v2 = 0, v = 0;
21,488,349✔
264
    if (prevRowIndex == -1) {
21,488,349!
265
      SGroupKeys* p = taosArrayGet(pPrevValues, index);
×
266
      GET_TYPED_DATA(v1, double, pColInfo->info.type, p->pData);
×
267
    } else {
268
      GET_TYPED_DATA(v1, double, pColInfo->info.type, colDataGetData(pColInfo, prevRowIndex));
21,490,344!
269
    }
270

271
    GET_TYPED_DATA(v2, double, pColInfo->info.type, colDataGetData(pColInfo, curRowIndex));
21,490,344!
272

273
#if 0
274
    if (functionId == FUNCTION_INTERP) {
275
      if (type == RESULT_ROW_START_INTERP) {
276
        pCtx[k].start.key = prevTs;
277
        pCtx[k].start.val = v1;
278

279
        pCtx[k].end.key = curTs;
280
        pCtx[k].end.val = v2;
281

282
        if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_VARBINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR ||
283
            pColInfo->info.type == TSDB_DATA_TYPE_GEOMETRY) {
284
          if (prevRowIndex == -1) {
285
            //            pCtx[k].start.ptr = (char*)pRuntimeEnv->prevRow[index];
286
          } else {
287
            pCtx[k].start.ptr = (char*)pColInfo->pData + prevRowIndex * pColInfo->info.bytes;
288
          }
289

290
          pCtx[k].end.ptr = (char*)pColInfo->pData + curRowIndex * pColInfo->info.bytes;
291
        }
292
      }
293
    } else if (functionId == FUNCTION_TWA) {
294
#endif
295

296
    SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
21,490,344✔
297
    SPoint point2 = (SPoint){.key = curTs, .val = &v2};
21,490,344✔
298
    SPoint point = (SPoint){.key = windowKey, .val = &v};
21,490,344✔
299

300
    if (!fmIsElapsedFunc(pCtx[k].functionId)) {
21,490,344✔
301
      taosGetLinearInterpolationVal(&point, TSDB_DATA_TYPE_DOUBLE, &point1, &point2, TSDB_DATA_TYPE_DOUBLE);
8,880,067✔
302
    }
303

304
    if (type == RESULT_ROW_START_INTERP) {
21,489,121✔
305
      pCtx[k].start.key = point.key;
10,559,858✔
306
      pCtx[k].start.val = v;
10,559,858✔
307
    } else {
308
      pCtx[k].end.key = point.key;
10,929,263✔
309
      pCtx[k].end.val = v;
10,929,263✔
310
    }
311

312
    index += 1;
21,489,121✔
313
  }
314
#if 0
315
  }
316
#endif
317
}
21,480,735✔
318

319
static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t type) {
1,343,977✔
320
  if (type == RESULT_ROW_START_INTERP) {
1,343,977✔
321
    for (int32_t k = 0; k < numOfOutput; ++k) {
1,855,044✔
322
      pCtx[k].start.key = INT64_MIN;
998,354✔
323
    }
324
  } else {
325
    for (int32_t k = 0; k < numOfOutput; ++k) {
1,073,466✔
326
      pCtx[k].end.key = INT64_MIN;
586,179✔
327
    }
328
  }
329
}
1,343,977✔
330

331
static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, int32_t pos, SSDataBlock* pBlock,
11,412,347✔
332
                                              const TSKEY* tsCols, STimeWindow* win, SExprSupp* pSup) {
333
  bool ascQuery = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
11,412,347✔
334

335
  TSKEY curTs = tsCols[pos];
11,412,347✔
336

337
  SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
11,412,347✔
338
  TSKEY       lastTs = *(int64_t*)pTsKey->pData;
11,412,166✔
339

340
  // lastTs == INT64_MIN and pos == 0 means this is the first time window, interpolation is not needed.
341
  // start exactly from this point, no need to do interpolation
342
  TSKEY key = ascQuery ? win->skey : win->ekey;
11,412,166!
343
  if (key == curTs) {
11,412,166✔
344
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
704,027✔
345
    return true;
704,027✔
346
  }
347

348
  // it is the first time window, no need to do interpolation
349
  if (pTsKey->isNull && pos == 0) {
10,708,139!
350
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
152,647✔
351
  } else {
352
    TSKEY prevTs = ((pos == 0) ? lastTs : tsCols[pos - 1]);
10,555,492!
353
    doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, pos - 1, curTs, pos, key,
10,555,492✔
354
                              RESULT_ROW_START_INTERP, pSup);
355
  }
356

357
  return true;
10,708,240✔
358
}
359

360
static int32_t setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex,
11,412,166✔
361
                                            int32_t nextRowIndex, SArray* pDataBlock, const TSKEY* tsCols,
362
                                            TSKEY blockEkey, STimeWindow* win, bool* pRes) {
363
  int32_t code = TSDB_CODE_SUCCESS;
11,412,166✔
364
  int32_t lino = 0;
11,412,166✔
365
  int32_t order = pInfo->binfo.inputTsOrder;
11,412,166✔
366

367
  TSKEY actualEndKey = tsCols[endRowIndex];
11,412,166✔
368
  TSKEY key = (order == TSDB_ORDER_ASC) ? win->ekey : win->skey;
11,412,166!
369

370
  // not ended in current data block, do not invoke interpolation
371
  if ((key > blockEkey && (order == TSDB_ORDER_ASC)) || (key < blockEkey && (order == TSDB_ORDER_DESC))) {
11,412,166!
372
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
154,592✔
373
    (*pRes) = false;
154,800✔
374
    return code;
154,800✔
375
  }
376

377
  // there is actual end point of current time window, no interpolation needs
378
  if (key == actualEndKey) {
11,257,574✔
379
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
332,487✔
380
    (*pRes) = true;
332,487✔
381
    return code;
332,487✔
382
  }
383

384
  if (nextRowIndex < 0) {
10,925,087!
385
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
386
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
387
  }
388

389
  TSKEY nextKey = tsCols[nextRowIndex];
10,925,087✔
390
  doTimeWindowInterpolation(pInfo->pPrevValues, pDataBlock, actualEndKey, endRowIndex, nextKey, nextRowIndex, key,
10,925,087✔
391
                            RESULT_ROW_END_INTERP, pSup);
392
  (*pRes) = true;
10,924,998✔
393
  return code;
10,924,998✔
394
}
395

396
bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd, EStreamType blockType) {
255,551,306✔
397
  if (pInterval->interval != pInterval->sliding &&
255,551,306✔
398
      ((pWin->ekey < calStart || pWin->skey > calEnd) || (blockType == STREAM_PULL_DATA && pWin->skey < calStart))) {
115,932,459!
399
    return false;
75✔
400
  }
401

402
  return true;
255,551,231✔
403
}
404

405
bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo) {
255,262,635✔
406
  return inCalSlidingWindow(pInterval, pWin, pBlockInfo->calWin.skey, pBlockInfo->calWin.ekey, pBlockInfo->type);
255,262,635✔
407
}
408

409
int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
244,124,810✔
410
                               TSKEY* primaryKeys, int32_t prevPosition, int32_t order) {
411
  bool ascQuery = (order == TSDB_ORDER_ASC);
244,124,810✔
412

413
  int32_t precision = pInterval->precision;
244,124,810✔
414
  getNextTimeWindow(pInterval, pNext, order);
244,124,810✔
415

416
  // next time window is not in current block
417
  if ((pNext->skey > pDataBlockInfo->window.ekey && order == TSDB_ORDER_ASC) ||
243,987,414!
418
      (pNext->ekey < pDataBlockInfo->window.skey && order == TSDB_ORDER_DESC)) {
241,763,515✔
419
    return -1;
2,742,309✔
420
  }
421

422
  if (!inSlidingWindow(pInterval, pNext, pDataBlockInfo) && order == TSDB_ORDER_ASC) {
241,245,105!
423
    return -1;
30✔
424
  }
425

426
  TSKEY   skey = ascQuery ? pNext->skey : pNext->ekey;
241,393,489✔
427
  int32_t startPos = 0;
241,393,489✔
428

429
  // tumbling time window query, a special case of sliding time window query
430
  if (pInterval->sliding == pInterval->interval && prevPosition != -1) {
241,393,489!
431
    startPos = prevPosition + 1;
125,580,563✔
432
  } else {
433
    if ((skey <= pDataBlockInfo->window.skey && ascQuery) || (skey >= pDataBlockInfo->window.ekey && !ascQuery)) {
115,812,926✔
434
      startPos = 0;
3,093,367✔
435
    } else {
436
      startPos = binarySearchForKey((char*)primaryKeys, pDataBlockInfo->rows, skey, order);
112,719,559✔
437
    }
438
  }
439

440
  /* interp query with fill should not skip time window */
441
  //  if (pQueryAttr->pointInterpQuery && pQueryAttr->fillType != TSDB_FILL_NONE) {
442
  //    return startPos;
443
  //  }
444

445
  /*
446
   * This time window does not cover any data, try next time window,
447
   * this case may happen when the time window is too small
448
   */
449
  if (primaryKeys != NULL) {
241,861,352!
450
    if (ascQuery && primaryKeys[startPos] > pNext->ekey) {
312,792,780✔
451
      TSKEY next = primaryKeys[startPos];
70,869,668✔
452
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
70,869,668✔
453
        pNext->skey = taosTimeTruncate(next, pInterval);
61,114✔
454
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
2✔
455
      } else {
456
        pNext->ekey += ((next - pNext->ekey + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
70,808,554✔
457
        pNext->skey = pNext->ekey - pInterval->interval + 1;
70,808,554✔
458
      }
459
    } else if ((!ascQuery) && primaryKeys[startPos] < pNext->skey) {
171,060,249✔
460
      TSKEY next = primaryKeys[startPos];
24,230,072✔
461
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
24,230,072!
462
        pNext->skey = taosTimeTruncate(next, pInterval);
614✔
463
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
×
464
      } else {
465
        pNext->skey -= ((pNext->skey - next + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
24,229,458✔
466
        pNext->ekey = pNext->skey + pInterval->interval - 1;
24,229,458✔
467
      }
468
    }
469
  }
470

471
  return startPos;
241,853,933✔
472
}
473

474
static bool isResultRowInterpolated(SResultRow* pResult, SResultTsInterpType type) {
34,235,621✔
475
  if (type == RESULT_ROW_START_INTERP) {
34,235,621✔
476
    return pResult->startInterp == true;
11,412,391✔
477
  } else {
478
    return pResult->endInterp == true;
22,823,230✔
479
  }
480
}
481

482
static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
22,669,158✔
483
  if (type == RESULT_ROW_START_INTERP) {
22,669,158✔
484
    pResult->startInterp = true;
11,412,222✔
485
  } else {
486
    pResult->endInterp = true;
11,256,936✔
487
  }
488
}
22,669,158✔
489

490
static int32_t doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataBlock* pBlock, SResultRow* pResult,
229,376,093✔
491
                                           STimeWindow* win, int32_t startPos, int32_t forwardRows, SExprSupp* pSup) {
492
  int32_t code = TSDB_CODE_SUCCESS;
229,376,093✔
493
  int32_t lino = 0;
229,376,093✔
494
  if (!pInfo->timeWindowInterpo) {
229,376,093✔
495
    return code;
218,131,391✔
496
  }
497

498
  if (pBlock == NULL) {
11,244,702!
499
    code = TSDB_CODE_INVALID_PARA;
×
500
    return code;
×
501
  }
502

503
  if (pBlock->pDataBlock == NULL) {
11,244,702!
504
    return code;
×
505
  }
506

507
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
11,244,702✔
508

509
  TSKEY* tsCols = (TSKEY*)(pColInfo->pData);
11,412,409✔
510
  bool   done = isResultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
11,412,409✔
511
  if (!done) {  // it is not interpolated, now start to generated the interpolated value
11,412,370✔
512
    bool interp = setTimeWindowInterpolationStartTs(pInfo, startPos, pBlock, tsCols, win, pSup);
11,412,354✔
513
    if (interp) {
11,412,238!
514
      setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
11,412,239✔
515
    }
516
  } else {
517
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
16✔
518
  }
519

520
  // point interpolation does not require the end key time window interpolation.
521
  // interpolation query does not generate the time window end interpolation
522
  done = isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
11,412,192✔
523
  if (!done) {
11,412,154✔
524
    int32_t endRowIndex = startPos + forwardRows - 1;
11,412,147✔
525
    int32_t nextRowIndex = endRowIndex + 1;
11,412,147✔
526

527
    // duplicated ts row does not involve in the interpolation of end value for current time window
528
    int32_t x = endRowIndex;
11,412,147✔
529
    while (x > 0) {
11,412,165✔
530
      if (tsCols[x] == tsCols[x - 1]) {
11,389,334✔
531
        x -= 1;
18✔
532
      } else {
533
        endRowIndex = x;
11,389,316✔
534
        break;
11,389,316✔
535
      }
536
    }
537

538
    TSKEY endKey = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey;
11,412,147✔
539
    bool  interp = false;
11,412,147✔
540
    code = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, nextRowIndex, pBlock->pDataBlock, tsCols,
11,412,147✔
541
                                           endKey, win, &interp);
542
    QUERY_CHECK_CODE(code, lino, _end);
11,412,271!
543
    if (interp) {
11,412,271✔
544
      setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
11,257,475✔
545
    }
546
  } else {
547
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
7✔
548
  }
549

550
_end:
11,412,239✔
551
  if (code != TSDB_CODE_SUCCESS) {
11,412,239!
552
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
553
  }
554
  return code;
11,412,234✔
555
}
556

557
static void saveDataBlockLastRow(SArray* pPrevKeys, const SSDataBlock* pBlock, SArray* pCols) {
54,269✔
558
  if (pBlock->pDataBlock == NULL) {
54,269!
559
    return;
×
560
  }
561

562
  size_t num = taosArrayGetSize(pPrevKeys);
54,269✔
563
  for (int32_t k = 0; k < num; ++k) {
163,579✔
564
    SColumn* pc = taosArrayGet(pCols, k);
109,310✔
565

566
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pc->slotId);
109,310✔
567

568
    SGroupKeys* pkey = taosArrayGet(pPrevKeys, k);
109,310✔
569
    for (int32_t i = pBlock->info.rows - 1; i >= 0; --i) {
109,310!
570
      if (colDataIsNull_s(pColInfo, i)) {
218,620!
571
        continue;
×
572
      }
573

574
      char* val = colDataGetData(pColInfo, i);
109,310!
575
      if (IS_VAR_DATA_TYPE(pkey->type)) {
109,310!
576
        memcpy(pkey->pData, val, varDataTLen(val));
×
577
      } else {
578
        memcpy(pkey->pData, val, pkey->bytes);
109,310✔
579
      }
580

581
      break;
109,310✔
582
    }
583
  }
584
}
585

586
static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t numOfExprs, SResultRowInfo* pResultRowInfo,
54,269✔
587
                                       SSDataBlock* pBlock, int32_t scanFlag, int64_t* tsCols, SResultRowPosition* p) {
588
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
54,269✔
589

590
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
54,269✔
591
  SExprSupp*                pSup = &pOperatorInfo->exprSupp;
54,269✔
592

593
  int32_t startPos = 0;
54,269✔
594
  int32_t numOfOutput = pSup->numOfExprs;
54,269✔
595

596
  SResultRow* pResult = NULL;
54,269✔
597

598
  while (1) {
×
599
    SListNode*          pn = tdListGetHead(pResultRowInfo->openWindow);
54,269✔
600
    SOpenWindowInfo*    pOpenWin = (SOpenWindowInfo*)pn->data;
54,269✔
601
    uint64_t            groupId = pOpenWin->groupId;
54,269✔
602
    SResultRowPosition* p1 = &pOpenWin->pos;
54,269✔
603
    if (p->pageId == p1->pageId && p->offset == p1->offset) {
54,269!
604
      break;
54,269✔
605
    }
606

607
    SResultRow* pr = getResultRowByPos(pInfo->aggSup.pResultBuf, p1, false);
×
608
    if (NULL == pr) {
×
609
      T_LONG_JMP(pTaskInfo->env, terrno);
×
610
    }
611

612
    if (!(pr->offset == p1->offset && pr->pageId == p1->pageId)) {
×
613
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
614
      T_LONG_JMP(pTaskInfo->env, terrno);
×
615
    }
616

617
    if (pr->closed) {
×
618
      if (!(isResultRowInterpolated(pr, RESULT_ROW_START_INTERP) &&
×
619
             isResultRowInterpolated(pr, RESULT_ROW_END_INTERP)) ) {
×
620
        pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
621
        T_LONG_JMP(pTaskInfo->env, terrno);
×
622
      }
623
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
×
624
      taosMemoryFree(pNode);
×
625
      continue;
×
626
    }
627

628
    STimeWindow w = pr->win;
×
629
    int32_t     ret = setTimeWindowOutputBuf(pResultRowInfo, &w, (scanFlag == MAIN_SCAN), &pResult, groupId, pSup->pCtx,
×
630
                                             numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
631
    if (ret != TSDB_CODE_SUCCESS) {
×
632
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
×
633
    }
634

635
    if(isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
×
636
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
637
      T_LONG_JMP(pTaskInfo->env, terrno);
×
638
    }
639

640
    SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
×
641
    if (!pTsKey) {
×
642
      pTaskInfo->code = terrno;
×
643
      T_LONG_JMP(pTaskInfo->env, terrno);
×
644
    }
645

646
    int64_t     prevTs = *(int64_t*)pTsKey->pData;
×
647
    if (groupId == pBlock->info.id.groupId) {
×
648
      TSKEY curTs = pBlock->info.window.skey;
×
649
      if (tsCols != NULL) {
×
650
        curTs = tsCols[startPos];
×
651
      }
652
      doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, curTs, startPos, w.ekey,
×
653
                                RESULT_ROW_END_INTERP, pSup);
654
    }
655

656
    setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
×
657
    setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
×
658

659
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, 1);
×
660
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0,
×
661
                                          pBlock->info.rows, numOfExprs);
×
662
    if (ret != TSDB_CODE_SUCCESS) {
×
663
      T_LONG_JMP(pTaskInfo->env, ret);
×
664
    }
665

666
    if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
×
667
      closeResultRow(pr);
×
668
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
×
669
      taosMemoryFree(pNode);
×
670
    } else {  // the remains are can not be closed yet.
671
      break;
×
672
    }
673
  }
674
}
54,269✔
675

676
static bool tsKeyCompFn(void* l, void* r, void* param) {
12,610,193✔
677
  TSKEY*                    lTS = (TSKEY*)l;
12,610,193✔
678
  TSKEY*                    rTS = (TSKEY*)r;
12,610,193✔
679
  SIntervalAggOperatorInfo* pInfo = param;
12,610,193✔
680
  return pInfo->binfo.outputTsOrder == ORDER_ASC ? *lTS < *rTS : *lTS > *rTS;
12,610,193✔
681
}
682

683
static bool isCalculatedWin(SIntervalAggOperatorInfo* pInfo, const STimeWindow* win, uint64_t tableGroupId) {
3,561,933✔
684
  char keyBuf[sizeof(TSKEY) + sizeof(uint64_t)] = {0};
3,561,933✔
685
  SET_RES_WINDOW_KEY(keyBuf, (char*)&win->skey, sizeof(TSKEY), tableGroupId);
3,561,933✔
686
  return tSimpleHashGet(pInfo->aggSup.pResultRowHashTable, keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))) != NULL;
3,561,933✔
687
}
688

689
/**
690
 * @brief check if cur window should be filtered out by limit info
691
 * @retval true if should be filtered out
692
 * @retval false if not filtering out
693
 * @note If no limit info, we skip filtering.
694
 *       If input/output ts order mismatch, we skip filtering too.
695
 *       eg. input ts order: desc, and output ts order: asc, limit: 10
696
 *       IntervalOperator should output the first 10 windows, however, we can't find the first 10 windows until we scan
697
 *       every tuple in every block.
698
 *       And the boundedQueue keeps refreshing all records with smaller ts key.
699
 */
700
static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STimeWindow* win, uint64_t groupId, SExecTaskInfo* pTaskInfo) {
230,036,174✔
701
  int32_t code = TSDB_CODE_SUCCESS;
230,036,174✔
702
  int32_t lino = 0;
230,036,174✔
703
  if (!pOperatorInfo->limited  // if no limit info, no filter will be applied
230,036,174✔
704
      || pOperatorInfo->binfo.inputTsOrder != pOperatorInfo->binfo.outputTsOrder
10,890,403✔
705
      // if input/output ts order mismatch, no filter
706
  ) {
707
    return false;
226,342,705✔
708
  }
709

710
  if (pOperatorInfo->limit == 0) return true;
3,693,469✔
711

712
  if (pOperatorInfo->pBQ == NULL) {
3,692,854✔
713
    pOperatorInfo->pBQ = createBoundedQueue(pOperatorInfo->limit - 1, tsKeyCompFn, taosMemoryFree, pOperatorInfo);
22,355✔
714
    QUERY_CHECK_NULL(pOperatorInfo->pBQ, code, lino, _end, terrno);
22,354!
715
  }
716

717
  bool shouldFilter = false;
3,692,853✔
718
  // if BQ has been full, compare it with top of BQ
719
  if (taosBQSize(pOperatorInfo->pBQ) == taosBQMaxSize(pOperatorInfo->pBQ) + 1) {
3,692,853✔
720
    PriorityQueueNode* top = taosBQTop(pOperatorInfo->pBQ);
1,587,205✔
721
    shouldFilter = tsKeyCompFn(top->data, &win->skey, pOperatorInfo);
1,585,019✔
722
  }
723
  if (shouldFilter) {
3,661,939✔
724
    return true;
94,396✔
725
  } else if (isCalculatedWin(pOperatorInfo, win, groupId)) {
3,567,543✔
726
    return false;
2,066,242✔
727
  }
728

729
  // cur win not been filtered out and not been pushed into BQ yet, push it into BQ
730
  PriorityQueueNode node = {.data = taosMemoryMalloc(sizeof(TSKEY))};
1,575,209✔
731
  QUERY_CHECK_NULL(node.data, code, lino, _end, terrno);
1,620,018!
732

733
  *((TSKEY*)node.data) = win->skey;
1,620,018✔
734

735
  if (NULL == taosBQPush(pOperatorInfo->pBQ, &node)) {
1,620,018✔
736
    taosMemoryFree(node.data);
11✔
737
    return true;
×
738
  }
739

740
_end:
1,504,976✔
741
  if (code != TSDB_CODE_SUCCESS) {
1,504,976!
742
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
743
    pTaskInfo->code = code;
×
744
    T_LONG_JMP(pTaskInfo->env, code);
×
745
  }
746
  return false;
1,504,976✔
747
}
748

749
static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
2,327,302✔
750
                            int32_t scanFlag) {
751
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
2,327,302✔
752

753
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
2,327,302✔
754
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
2,327,302✔
755

756
  int32_t     startPos = 0;
2,327,302✔
757
  int32_t     numOfOutput = pSup->numOfExprs;
2,327,302✔
758
  int64_t*    tsCols = extractTsCol(pBlock, pInfo, pTaskInfo);
2,327,302✔
759
  uint64_t    tableGroupId = pBlock->info.id.groupId;
2,327,236✔
760
  bool        ascScan = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
2,327,236✔
761
  TSKEY       ts = getStartTsKey(&pBlock->info.window, tsCols);
2,327,236✔
762
  SResultRow* pResult = NULL;
2,327,211✔
763

764
  if (tableGroupId != pInfo->curGroupId) {
2,327,211✔
765
    pInfo->handledGroupNum += 1;
117,322✔
766
    if (pInfo->slimited && pInfo->handledGroupNum > pInfo->slimit) {
117,322✔
767
      return true;
138✔
768
    } else {
769
      pInfo->curGroupId = tableGroupId;
117,184✔
770
      destroyBoundedQueue(pInfo->pBQ);
117,184✔
771
      pInfo->pBQ = NULL;
117,182✔
772
    }
773
  }
774

775
  STimeWindow win =
776
      getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->binfo.inputTsOrder);
2,327,071✔
777
  if (filterWindowWithLimit(pInfo, &win, tableGroupId, pTaskInfo)) return false;
2,326,885✔
778

779
  int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
2,240,044✔
780
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
781
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
2,240,224!
782
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
1!
783
  }
784

785
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
2,240,223✔
786
  int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
2,240,223✔
787
                                                 pInfo->binfo.inputTsOrder);
788

789
  // prev time window not interpolation yet.
790
  if (pInfo->timeWindowInterpo) {
2,240,283✔
791
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
54,269✔
792
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
54,269✔
793

794
    // restore current time window
795
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
54,269✔
796
                                 numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
797
    if (ret != TSDB_CODE_SUCCESS) {
54,269!
798
      T_LONG_JMP(pTaskInfo->env, ret);
×
799
    }
800

801
    // window start key interpolation
802
    ret = doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup);
54,269✔
803
    if (ret != TSDB_CODE_SUCCESS) {
54,269!
804
      T_LONG_JMP(pTaskInfo->env, ret);
×
805
    }
806
  }
807

808
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, 1);
2,240,283✔
809
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
2,240,256✔
810
                                  pBlock->info.rows, numOfOutput);
2,240,256✔
811
  if (ret != TSDB_CODE_SUCCESS) {
2,240,067!
812
    T_LONG_JMP(pTaskInfo->env, ret);
×
813
  }
814

815
  doCloseWindow(pResultRowInfo, pInfo, pResult);
2,240,067✔
816

817
  STimeWindow nextWin = win;
2,240,056✔
818
  while (1) {
227,831,714✔
819
    int32_t prevEndPos = forwardRows - 1 + startPos;
230,071,770✔
820
    startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos,
230,071,770✔
821
                                      pInfo->binfo.inputTsOrder);
822
    if (startPos < 0 || filterWindowWithLimit(pInfo, &nextWin, tableGroupId, pTaskInfo)) {
230,175,169!
823
      break;
824
    }
825
    // null data, failed to allocate more memory buffer
826
    int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
227,501,675✔
827
                                          pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
828
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
226,773,031!
829
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
×
830
    }
831

832
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
226,932,629✔
833
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
226,932,629✔
834
                                           pInfo->binfo.inputTsOrder);
835
    // window start(end) key interpolation
836
    code = doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
229,424,916✔
837
    if (code != TSDB_CODE_SUCCESS) {
229,274,748!
838
      T_LONG_JMP(pTaskInfo->env, code);
×
839
    }
840
    // TODO: add to open window? how to close the open windows after input blocks exhausted?
841
#if 0
842
    if ((ascScan && ekey <= pBlock->info.window.ekey) ||
843
        (!ascScan && ekey >= pBlock->info.window.skey)) {
844
      // window start(end) key interpolation
845
      doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
846
    } else if (pInfo->timeWindowInterpo) {
847
      addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
848
    }
849
#endif
850
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, 1);
229,274,748✔
851
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
229,073,913✔
852
                                          pBlock->info.rows, numOfOutput);
229,073,913✔
853
    if (ret != TSDB_CODE_SUCCESS) {
227,870,634!
854
      T_LONG_JMP(pTaskInfo->env, ret);
×
855
    }
856
    doCloseWindow(pResultRowInfo, pInfo, pResult);
227,870,634✔
857
  }
858

859
  if (pInfo->timeWindowInterpo) {
2,074,901✔
860
    saveDataBlockLastRow(pInfo->pPrevValues, pBlock, pInfo->pInterpCols);
54,269✔
861
  }
862
  return false;
2,240,109✔
863
}
864

865
void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult) {
230,123,324✔
866
  // current result is done in computing final results.
867
  if (pInfo->timeWindowInterpo && isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
230,123,324✔
868
    closeResultRow(pResult);
11,257,799✔
869
    SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
11,257,784✔
870
    taosMemoryFree(pNode);
11,257,733✔
871
  }
872
}
230,123,199✔
873

874
SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId,
54,269✔
875
                                       SExecTaskInfo* pTaskInfo) {
876
  int32_t         code = TSDB_CODE_SUCCESS;
54,269✔
877
  int32_t         lino = 0;
54,269✔
878
  SOpenWindowInfo openWin = {0};
54,269✔
879
  openWin.pos.pageId = pResult->pageId;
54,269✔
880
  openWin.pos.offset = pResult->offset;
54,269✔
881
  openWin.groupId = groupId;
54,269✔
882
  SListNode* pn = tdListGetTail(pResultRowInfo->openWindow);
54,269✔
883
  if (pn == NULL) {
54,269✔
884
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
54,261✔
885
    QUERY_CHECK_CODE(code, lino, _end);
54,261!
886
    return openWin.pos;
54,261✔
887
  }
888

889
  SOpenWindowInfo* px = (SOpenWindowInfo*)pn->data;
8✔
890
  if (px->pos.pageId != openWin.pos.pageId || px->pos.offset != openWin.pos.offset || px->groupId != openWin.groupId) {
8!
891
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
×
892
    QUERY_CHECK_CODE(code, lino, _end);
×
893
  }
894

895
_end:
8✔
896
  if (code != TSDB_CODE_SUCCESS) {
8!
897
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
898
    pTaskInfo->code = code;
×
899
    T_LONG_JMP(pTaskInfo->env, code);
×
900
  }
901
  return openWin.pos;
8✔
902
}
903

904
int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo) {
2,760,358✔
905
  TSKEY* tsCols = NULL;
2,760,358✔
906

907
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
2,760,358!
908
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
2,760,384✔
909
    if (!pColDataInfo) {
2,760,296!
910
      pTaskInfo->code = terrno;
×
911
      T_LONG_JMP(pTaskInfo->env, terrno);
×
912
    }
913

914
    tsCols = (int64_t*)pColDataInfo->pData;
2,760,296✔
915
    if(tsCols[0] == 0) {
2,760,296✔
916
      qWarn("%s at line %d.block start ts:%" PRId64 ",end ts:%" PRId64, __func__, __LINE__, tsCols[0], tsCols[pBlock->info.rows - 1]);
1!
917
    }
918

919
    if (tsCols[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
2,760,299!
920
      int32_t code = blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
815,320✔
921
      if (code != TSDB_CODE_SUCCESS) {
815,321!
922
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
923
        pTaskInfo->code = code;
×
924
        T_LONG_JMP(pTaskInfo->env, code);
×
925
      }
926
    }
927
  }
928

929
  return tsCols;
2,760,274✔
930
}
931

932
static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
1,956,271✔
933
  if (OPTR_IS_OPENED(pOperator)) {
1,956,271✔
934
    return TSDB_CODE_SUCCESS;
474,749✔
935
  }
936

937
  int32_t        code = TSDB_CODE_SUCCESS;
1,481,522✔
938
  int32_t        lino = 0;
1,481,522✔
939
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1,481,522✔
940
  SOperatorInfo* downstream = pOperator->pDownstream[0];
1,481,522✔
941

942
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
1,481,522✔
943
  SExprSupp*                pSup = &pOperator->exprSupp;
1,481,522✔
944

945
  int32_t scanFlag = MAIN_SCAN;
1,481,522✔
946
  int64_t st = taosGetTimestampUs();
1,486,062✔
947

948
  pInfo->cleanGroupResInfo = false;
1,486,062✔
949
  while (1) {
2,326,905✔
950
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
3,812,967✔
951
    if (pBlock == NULL) {
3,815,536✔
952
      break;
1,488,354✔
953
    }
954

955
    pInfo->binfo.pRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
2,327,182✔
956

957
    if (pInfo->scalarSupp.pExprInfo != NULL) {
2,327,182✔
958
      SExprSupp* pExprSup = &pInfo->scalarSupp;
105,970✔
959
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
105,970✔
960
      QUERY_CHECK_CODE(code, lino, _end);
105,969!
961
    }
962

963
    // the pDataBlock are always the same one, no need to call this again
964
    code = setInputDataBlock(pSup, pBlock, pInfo->binfo.inputTsOrder, scanFlag, true);
2,327,181✔
965
    QUERY_CHECK_CODE(code, lino, _end);
2,327,322!
966
    if (hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag)) break;
2,327,322✔
967
  }
968

969
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->binfo.outputTsOrder);
1,488,492✔
970
  QUERY_CHECK_CODE(code, lino, _end);
1,488,559!
971
  pInfo->cleanGroupResInfo = true;
1,488,559✔
972

973
  OPTR_SET_OPENED(pOperator);
1,488,559✔
974

975
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
1,488,620✔
976

977
_end:
1,488,620✔
978
  if (code != TSDB_CODE_SUCCESS) {
1,488,620!
979
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
980
    pTaskInfo->code = code;
×
981
    T_LONG_JMP(pTaskInfo->env, code);
×
982
  }
983
  return code;
1,488,620✔
984
}
985

986
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock) {
67,155✔
987
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
67,155✔
988
  SExprSupp*     pSup = &pOperator->exprSupp;
67,155✔
989

990
  SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->stateCol.slotId);
67,155✔
991
  if (!pStateColInfoData) {
67,155!
992
    pTaskInfo->code = terrno;
×
993
    T_LONG_JMP(pTaskInfo->env, terrno);
×
994
  }
995
  int64_t          gid = pBlock->info.id.groupId;
67,155✔
996

997
  bool    masterScan = true;
67,155✔
998
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
67,155✔
999
  int32_t bytes = pStateColInfoData->info.bytes;
67,155✔
1000

1001
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
67,155✔
1002
  if (!pColInfoData) {
67,155!
1003
    pTaskInfo->code = terrno;
×
1004
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1005
  }
1006
  TSKEY*           tsList = (TSKEY*)pColInfoData->pData;
67,155✔
1007

1008
  SWindowRowsSup* pRowSup = &pInfo->winSup;
67,155✔
1009
  pRowSup->numOfRows = 0;
67,155✔
1010
  pRowSup->startRowIndex = 0;
67,155✔
1011

1012
  struct SColumnDataAgg* pAgg = NULL;
67,155✔
1013
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
22,655,840✔
1014
    pAgg = (pBlock->pBlockAgg != NULL) ? &pBlock->pBlockAgg[pInfo->stateCol.slotId] : NULL;
22,588,685!
1015
    if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pAgg)) {
45,177,370✔
1016
      continue;
8,650✔
1017
    }
1018

1019
    char* val = colDataGetData(pStateColInfoData, j);
22,580,035!
1020

1021
    if (gid != pRowSup->groupId || !pInfo->hasKey) {
22,580,035✔
1022
      // todo extract method
1023
      if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) {
60,262!
1024
        varDataCopy(pInfo->stateKey.pData, val);
90✔
1025
      } else {
1026
        memcpy(pInfo->stateKey.pData, val, bytes);
60,172✔
1027
      }
1028

1029
      pInfo->hasKey = true;
60,262✔
1030

1031
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
60,262✔
1032
      doKeepTuple(pRowSup, tsList[j], gid);
60,442✔
1033
    } else if (compareVal(val, &pInfo->stateKey)) {
22,519,773✔
1034
      doKeepTuple(pRowSup, tsList[j], gid);
2,379,503✔
1035
    } else {  // a new state window started
1036
      SResultRow* pResult = NULL;
20,140,418✔
1037

1038
      // keep the time window for the closed time window.
1039
      STimeWindow window = pRowSup->win;
20,140,418✔
1040

1041
      pRowSup->win.ekey = pRowSup->win.skey;
20,140,418✔
1042
      int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
20,140,418✔
1043
                                           numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1044
      if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
20,140,204!
1045
        T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
×
1046
      }
1047

1048
      updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
20,140,204✔
1049
      ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData,
20,140,203✔
1050
                                            pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
20,140,203✔
1051
      if (ret != TSDB_CODE_SUCCESS) {
20,140,425!
1052
        T_LONG_JMP(pTaskInfo->env, ret);
×
1053
      }
1054

1055
      // here we start a new session window
1056
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
20,140,425✔
1057
      doKeepTuple(pRowSup, tsList[j], gid);
20,140,376✔
1058

1059
      // todo extract method
1060
      if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) {
20,140,352!
1061
        varDataCopy(pInfo->stateKey.pData, val);
×
1062
      } else {
1063
        memcpy(pInfo->stateKey.pData, val, bytes);
20,140,352✔
1064
      }
1065
    }
1066
  }
1067

1068
  SResultRow* pResult = NULL;
67,155✔
1069
  pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
67,155✔
1070
  int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
67,155✔
1071
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1072
  if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
67,155!
1073
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
×
1074
  }
1075

1076
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
67,155✔
1077
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
67,155✔
1078
                                        pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
67,155✔
1079
  if (ret != TSDB_CODE_SUCCESS) {
67,155!
1080
    T_LONG_JMP(pTaskInfo->env, ret);
×
1081
  }
1082
}
67,155✔
1083

1084
static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) {
77,582✔
1085
  if (OPTR_IS_OPENED(pOperator)) {
77,582✔
1086
    return TSDB_CODE_SUCCESS;
3,063✔
1087
  }
1088

1089
  int32_t                   code = TSDB_CODE_SUCCESS;
74,519✔
1090
  int32_t                   lino = 0;
74,519✔
1091
  SStateWindowOperatorInfo* pInfo = pOperator->info;
74,519✔
1092
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
74,519✔
1093

1094
  SExprSupp* pSup = &pOperator->exprSupp;
74,519✔
1095
  int32_t    order = pInfo->binfo.inputTsOrder;
74,519✔
1096
  int64_t    st = taosGetTimestampUs();
74,522✔
1097

1098
  SOperatorInfo* downstream = pOperator->pDownstream[0];
74,522✔
1099
  pInfo->cleanGroupResInfo = false;
74,522✔
1100
  while (1) {
67,155✔
1101
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
141,677✔
1102
    if (pBlock == NULL) {
141,674✔
1103
      break;
74,519✔
1104
    }
1105

1106
    pInfo->binfo.pRes->info.scanFlag = pBlock->info.scanFlag;
67,155✔
1107
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
67,155✔
1108
    QUERY_CHECK_CODE(code, lino, _end);
67,155!
1109

1110
    code = blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
67,155✔
1111
    QUERY_CHECK_CODE(code, lino, _end);
67,155!
1112

1113
    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
1114
    if (pInfo->scalarSup.pExprInfo != NULL) {
67,155✔
1115
      pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
4,531✔
1116
                                              pInfo->scalarSup.numOfExprs, NULL);
1117
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
4,531!
1118
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1119
      }
1120
    }
1121

1122
    doStateWindowAggImpl(pOperator, pInfo, pBlock);
67,155✔
1123
  }
1124

1125
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
74,517✔
1126
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
74,517✔
1127
  QUERY_CHECK_CODE(code, lino, _end);
74,522!
1128
  pInfo->cleanGroupResInfo = true;
74,522✔
1129
  pOperator->status = OP_RES_TO_RETURN;
74,522✔
1130

1131
_end:
74,522✔
1132
  if (code != TSDB_CODE_SUCCESS) {
74,522!
1133
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1134
    pTaskInfo->code = code;
×
1135
    T_LONG_JMP(pTaskInfo->env, code);
×
1136
  }
1137
  return code;
74,522✔
1138
}
1139

1140
static int32_t doStateWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
135,670✔
1141
  if (pOperator->status == OP_EXEC_DONE) {
135,670✔
1142
    (*ppRes) = NULL;
58,089✔
1143
    return TSDB_CODE_SUCCESS;
58,089✔
1144
  }
1145

1146
  int32_t                   code = TSDB_CODE_SUCCESS;
77,581✔
1147
  int32_t                   lino = 0;
77,581✔
1148
  SStateWindowOperatorInfo* pInfo = pOperator->info;
77,581✔
1149
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
77,581✔
1150
  SOptrBasicInfo*           pBInfo = &pInfo->binfo;
77,581✔
1151

1152
  code = pOperator->fpSet._openFn(pOperator);
77,581✔
1153
  QUERY_CHECK_CODE(code, lino, _end);
77,585!
1154

1155
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
77,585✔
1156
  QUERY_CHECK_CODE(code, lino, _end);
77,585!
1157

1158
  while (1) {
×
1159
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
77,585✔
1160
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
77,585✔
1161
    QUERY_CHECK_CODE(code, lino, _end);
77,585!
1162

1163
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
77,585✔
1164
    if (!hasRemain) {
77,585✔
1165
      setOperatorCompleted(pOperator);
74,522✔
1166
      break;
74,522✔
1167
    }
1168

1169
    if (pBInfo->pRes->info.rows > 0) {
3,063!
1170
      break;
3,063✔
1171
    }
1172
  }
1173

1174
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
77,585✔
1175

1176
_end:
77,585✔
1177
  if (code != TSDB_CODE_SUCCESS) {
77,585!
1178
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1179
    pTaskInfo->code = code;
×
1180
    T_LONG_JMP(pTaskInfo->env, code);
×
1181
  }
1182
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
77,585✔
1183
  return code;
77,585✔
1184
}
1185

1186
static int32_t doBuildIntervalResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
2,740,297✔
1187
  int32_t                   code = TSDB_CODE_SUCCESS;
2,740,297✔
1188
  int32_t                   lino = 0;
2,740,297✔
1189
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
2,740,297✔
1190
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
2,740,297✔
1191

1192
  if (pOperator->status == OP_EXEC_DONE) {
2,740,297✔
1193
    (*ppRes) = NULL;
783,523✔
1194
    return code;
783,523✔
1195
  }
1196

1197
  SSDataBlock* pBlock = pInfo->binfo.pRes;
1,956,774✔
1198
  code = pOperator->fpSet._openFn(pOperator);
1,956,774✔
1199
  QUERY_CHECK_CODE(code, lino, _end);
1,963,297!
1200

1201
  while (1) {
15✔
1202
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
1,963,312✔
1203
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
1,963,272✔
1204
    QUERY_CHECK_CODE(code, lino, _end);
1,963,199!
1205

1206
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
1,963,199✔
1207
    if (!hasRemain) {
1,963,269✔
1208
      setOperatorCompleted(pOperator);
1,488,168✔
1209
      break;
1,488,292✔
1210
    }
1211

1212
    if (pBlock->info.rows > 0) {
475,101✔
1213
      break;
475,086✔
1214
    }
1215
  }
1216

1217
  size_t rows = pBlock->info.rows;
1,963,378✔
1218
  pOperator->resultInfo.totalRows += rows;
1,963,378✔
1219

1220
_end:
1,963,378✔
1221
  if (code != TSDB_CODE_SUCCESS) {
1,963,378!
1222
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1223
    pTaskInfo->code = code;
×
1224
    T_LONG_JMP(pTaskInfo->env, code);
×
1225
  }
1226
  (*ppRes) = (rows == 0) ? NULL : pBlock;
1,963,378✔
1227
  return code;
1,963,378✔
1228
}
1229

1230
static void destroyStateWindowOperatorInfo(void* param) {
74,522✔
1231
  if (param == NULL) {
74,522!
1232
    return;
×
1233
  }
1234
  SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
74,522✔
1235
  cleanupBasicInfo(&pInfo->binfo);
74,522✔
1236
  taosMemoryFreeClear(pInfo->stateKey.pData);
74,522!
1237
  if (pInfo->pOperator) {
74,522!
1238
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
74,522✔
1239
                      pInfo->cleanGroupResInfo);
74,522✔
1240
    pInfo->pOperator = NULL;
74,521✔
1241
  }
1242

1243
  cleanupExprSupp(&pInfo->scalarSup);
74,521✔
1244
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
74,521✔
1245
  cleanupAggSup(&pInfo->aggSup);
74,522✔
1246
  cleanupGroupResInfo(&pInfo->groupResInfo);
74,522✔
1247

1248
  taosMemoryFreeClear(param);
74,522!
1249
}
1250

1251
static void freeItem(void* param) {
189,374✔
1252
  SGroupKeys* pKey = (SGroupKeys*)param;
189,374✔
1253
  taosMemoryFree(pKey->pData);
189,374✔
1254
}
189,374✔
1255

1256
void destroyIntervalOperatorInfo(void* param) {
2,033,720✔
1257
  if (param == NULL) {
2,033,720!
1258
    return;
×
1259
  }
1260

1261
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
2,033,720✔
1262

1263
  cleanupBasicInfo(&pInfo->binfo);
2,033,720✔
1264
  if (pInfo->pOperator) {
2,033,726!
1265
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
2,033,734✔
1266
                      pInfo->cleanGroupResInfo);
2,033,734✔
1267
    pInfo->pOperator = NULL;
2,033,624✔
1268
  }
1269

1270
  cleanupAggSup(&pInfo->aggSup);
2,033,616✔
1271
  cleanupExprSupp(&pInfo->scalarSupp);
2,033,798✔
1272

1273
  pInfo->binfo.resultRowInfo.openWindow = tdListFree(pInfo->binfo.resultRowInfo.openWindow);
2,033,704✔
1274

1275
  taosArrayDestroy(pInfo->pInterpCols);
2,033,698✔
1276
  pInfo->pInterpCols = NULL;
2,033,667✔
1277

1278
  taosArrayDestroyEx(pInfo->pPrevValues, freeItem);
2,033,667✔
1279
  pInfo->pPrevValues = NULL;
2,033,647✔
1280

1281
  cleanupGroupResInfo(&pInfo->groupResInfo);
2,033,647✔
1282
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
2,033,741✔
1283
  destroyBoundedQueue(pInfo->pBQ);
2,033,833✔
1284
  taosMemoryFreeClear(param);
2,033,756!
1285
}
1286

1287
static int32_t timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SIntervalAggOperatorInfo* pInfo,
2,029,408✔
1288
                                   bool* pRes) {
1289
  // the primary timestamp column
1290
  bool    needed = false;
2,029,408✔
1291
  int32_t code = TSDB_CODE_SUCCESS;
2,029,408✔
1292
  int32_t lino = 0;
2,029,408✔
1293
  void*   tmp = NULL;
2,029,408✔
1294

1295
  for (int32_t i = 0; i < numOfCols; ++i) {
6,614,919✔
1296
    SExprInfo* pExpr = pCtx[i].pExpr;
4,678,148✔
1297
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
4,678,148✔
1298
      needed = true;
94,301✔
1299
      break;
94,301✔
1300
    }
1301
  }
1302

1303
  if (needed) {
2,031,072✔
1304
    pInfo->pInterpCols = taosArrayInit(4, sizeof(SColumn));
94,301✔
1305
    QUERY_CHECK_NULL(pInfo->pInterpCols, code, lino, _end, terrno);
94,301!
1306

1307
    pInfo->pPrevValues = taosArrayInit(4, sizeof(SGroupKeys));
94,301✔
1308
    QUERY_CHECK_NULL(pInfo->pPrevValues, code, lino, _end, terrno);
94,301!
1309

1310
    {  // ts column
1311
      SColumn c = {0};
94,301✔
1312
      c.colId = 1;
94,301✔
1313
      c.slotId = pInfo->primaryTsIndex;
94,301✔
1314
      c.type = TSDB_DATA_TYPE_TIMESTAMP;
94,301✔
1315
      c.bytes = sizeof(int64_t);
94,301✔
1316
      tmp = taosArrayPush(pInfo->pInterpCols, &c);
94,301✔
1317
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
94,301!
1318

1319
      SGroupKeys key;
1320
      key.bytes = c.bytes;
94,301✔
1321
      key.type = c.type;
94,301✔
1322
      key.isNull = true;  // to denote no value is assigned yet
94,301✔
1323
      key.pData = taosMemoryCalloc(1, c.bytes);
94,301✔
1324
      QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
94,301!
1325

1326
      tmp = taosArrayPush(pInfo->pPrevValues, &key);
94,301✔
1327
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
94,301!
1328
    }
1329
  }
1330

1331
  for (int32_t i = 0; i < numOfCols; ++i) {
6,708,707✔
1332
    SExprInfo* pExpr = pCtx[i].pExpr;
4,674,584✔
1333

1334
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
4,674,584✔
1335
      SFunctParam* pParam = &pExpr->base.pParam[0];
94,895✔
1336

1337
      SColumn c = *pParam->pCol;
94,895✔
1338
      tmp = taosArrayPush(pInfo->pInterpCols, &c);
94,895✔
1339
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
95,073!
1340

1341
      SGroupKeys key = {0};
95,073✔
1342
      key.bytes = c.bytes;
95,073✔
1343
      key.type = c.type;
95,073✔
1344
      key.isNull = false;
95,073✔
1345
      key.pData = taosMemoryCalloc(1, c.bytes);
95,073✔
1346
      QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
95,073!
1347

1348
      tmp = taosArrayPush(pInfo->pPrevValues, &key);
95,073✔
1349
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
95,073!
1350
    }
1351
  }
1352

1353
_end:
2,034,123✔
1354
  if (code != TSDB_CODE_SUCCESS) {
2,034,123!
1355
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1356
  }
1357
  *pRes = needed;
2,030,154✔
1358
  return code;
2,030,154✔
1359
}
1360

1361
int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
1,485,668✔
1362
                                   SOperatorInfo** pOptrInfo) {
1363
  QRY_PARAM_CHECK(pOptrInfo);
1,485,668!
1364

1365
  int32_t                   code = TSDB_CODE_SUCCESS;
1,485,668✔
1366
  int32_t                   lino = 0;
1,485,668✔
1367
  SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
1,485,668✔
1368
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1,486,564✔
1369
  if (pInfo == NULL || pOperator == NULL) {
1,487,130!
1370
    code = terrno;
×
1371
    lino = __LINE__;
×
1372
    goto _error;
×
1373
  }
1374

1375
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->window.node.pOutputDataBlockDesc);
1,487,229✔
1376
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
1,488,214!
1377
  initBasicInfo(&pInfo->binfo, pResBlock);
1,488,214✔
1378

1379
  SExprSupp* pSup = &pOperator->exprSupp;
1,487,779✔
1380
  pSup->hasWindowOrGroup = true;
1,487,779✔
1381

1382
  pInfo->primaryTsIndex = ((SColumnNode*)pPhyNode->window.pTspk)->slotId;
1,487,779✔
1383

1384
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
1,487,779✔
1385
  initResultSizeInfo(&pOperator->resultInfo, 512);
1,487,779✔
1386
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
1,488,072✔
1387
  QUERY_CHECK_CODE(code, lino, _error);
1,487,453!
1388

1389
  int32_t    num = 0;
1,487,453✔
1390
  SExprInfo* pExprInfo = NULL;
1,487,453✔
1391
  code = createExprInfo(pPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
1,487,453✔
1392
  QUERY_CHECK_CODE(code, lino, _error);
1,488,171!
1393

1394
  code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
1,488,171✔
1395
                    &pTaskInfo->storageAPI.functionStore);
1396
  QUERY_CHECK_CODE(code, lino, _error);
1,485,709!
1397

1398
  SInterval interval = {.interval = pPhyNode->interval,
1,485,709✔
1399
                        .sliding = pPhyNode->sliding,
1,485,709✔
1400
                        .intervalUnit = pPhyNode->intervalUnit,
1,485,709✔
1401
                        .slidingUnit = pPhyNode->slidingUnit,
1,485,709✔
1402
                        .offset = pPhyNode->offset,
1,485,709✔
1403
                        .precision = ((SColumnNode*)pPhyNode->window.pTspk)->node.resType.precision};
1,485,709✔
1404

1405
  STimeWindowAggSupp as = {
1,485,709✔
1406
      .waterMark = pPhyNode->window.watermark,
1,485,709✔
1407
      .calTrigger = pPhyNode->window.triggerType,
1,485,709✔
1408
      .maxTs = INT64_MIN,
1409
  };
1410

1411
  pInfo->win = pTaskInfo->window;
1,485,709✔
1412
  pInfo->binfo.inputTsOrder = pPhyNode->window.node.inputTsOrder;
1,485,709✔
1413
  pInfo->binfo.outputTsOrder = pPhyNode->window.node.outputTsOrder;
1,485,709✔
1414
  pInfo->interval = interval;
1,485,709✔
1415
  pInfo->twAggSup = as;
1,485,709✔
1416
  pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock;
1,485,709✔
1417
  if (pPhyNode->window.node.pLimit) {
1,485,709✔
1418
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pLimit;
45,638✔
1419
    pInfo->limited = true;
45,638✔
1420
    pInfo->limit = pLimit->limit + pLimit->offset;
45,638✔
1421
  }
1422
  if (pPhyNode->window.node.pSlimit) {
1,485,709✔
1423
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pSlimit;
564✔
1424
    pInfo->slimited = true;
564✔
1425
    pInfo->slimit = pLimit->limit + pLimit->offset;
564✔
1426
    pInfo->curGroupId = UINT64_MAX;
564✔
1427
  }
1428

1429
  if (pPhyNode->window.pExprs != NULL) {
1,485,709✔
1430
    int32_t    numOfScalar = 0;
3,924✔
1431
    SExprInfo* pScalarExprInfo = NULL;
3,924✔
1432
    code = createExprInfo(pPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
3,924✔
1433
    QUERY_CHECK_CODE(code, lino, _error);
3,929!
1434

1435
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
3,929✔
1436
    if (code != TSDB_CODE_SUCCESS) {
3,928!
1437
      goto _error;
×
1438
    }
1439
  }
1440

1441
  code = filterInitFromNode((SNode*)pPhyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
1,485,713✔
1442
  if (code != TSDB_CODE_SUCCESS) {
1,484,767!
1443
    goto _error;
×
1444
  }
1445

1446
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
1,484,767✔
1447
  QUERY_CHECK_CODE(code, lino, _error);
1,487,621!
1448

1449
  pInfo->timeWindowInterpo = false;
1,487,621✔
1450
  code = timeWindowinterpNeeded(pSup->pCtx, num, pInfo, &pInfo->timeWindowInterpo);
1,487,621✔
1451
  QUERY_CHECK_CODE(code, lino, _error);
1,485,410!
1452
  if (pInfo->timeWindowInterpo) {
1,485,410✔
1453
    pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
94,301✔
1454
    if (pInfo->binfo.resultRowInfo.openWindow == NULL) {
94,301!
1455
      goto _error;
×
1456
    }
1457
  }
1458

1459
  pInfo->pOperator = pOperator;
1,485,410✔
1460
  pInfo->cleanGroupResInfo = false;
1,485,410✔
1461
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
1,485,410✔
1462
  setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED,
1,483,196✔
1463
                  pInfo, pTaskInfo);
1464

1465
  pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResultNext, NULL, destroyIntervalOperatorInfo,
1,484,338✔
1466
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1467

1468
  code = appendDownstream(pOperator, &downstream, 1);
1,483,891✔
1469
  if (code != TSDB_CODE_SUCCESS) {
1,486,592!
1470
    goto _error;
×
1471
  }
1472

1473
  *pOptrInfo = pOperator;
1,486,592✔
1474
  return TSDB_CODE_SUCCESS;
1,486,592✔
1475

1476
_error:
×
1477
  if (pInfo != NULL) {
×
1478
    destroyIntervalOperatorInfo(pInfo);
×
1479
  }
1480

1481
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1482
  pTaskInfo->code = code;
×
1483
  qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code));
×
1484
  return code;
×
1485
}
1486

1487
// todo handle multiple timeline cases. assume no timeline interweaving
1488
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) {
110,428✔
1489
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
110,428✔
1490
  SExprSupp*     pSup = &pOperator->exprSupp;
110,428✔
1491

1492
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
110,428✔
1493
  if (!pColInfoData) {
110,428!
1494
    pTaskInfo->code = terrno;
×
1495
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1496
  }
1497

1498
  bool    masterScan = true;
110,428✔
1499
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
110,428✔
1500
  int64_t gid = pBlock->info.id.groupId;
110,428✔
1501

1502
  int64_t gap = pInfo->gap;
110,428✔
1503

1504
  if (!pInfo->reptScan) {
110,428✔
1505
    pInfo->reptScan = true;
84,165✔
1506
    pInfo->winSup.prevTs = INT64_MIN;
84,165✔
1507
  }
1508

1509
  SWindowRowsSup* pRowSup = &pInfo->winSup;
110,428✔
1510
  pRowSup->numOfRows = 0;
110,428✔
1511
  pRowSup->startRowIndex = 0;
110,428✔
1512

1513
  // In case of ascending or descending order scan data, only one time window needs to be kepted for each table.
1514
  TSKEY* tsList = (TSKEY*)pColInfoData->pData;
110,428✔
1515
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
29,374,749✔
1516
    if (gid != pRowSup->groupId || pInfo->winSup.prevTs == INT64_MIN) {
29,264,319✔
1517
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
104,526✔
1518
      doKeepTuple(pRowSup, tsList[j], gid);
104,508✔
1519
    } else if (((tsList[j] - pRowSup->prevTs >= 0) && (tsList[j] - pRowSup->prevTs <= gap)) ||
29,159,793✔
1520
               ((pRowSup->prevTs - tsList[j] >= 0) && (pRowSup->prevTs - tsList[j] <= gap))) {
7,752,224!
1521
      // The gap is less than the threshold, so it belongs to current session window that has been opened already.
1522
      doKeepTuple(pRowSup, tsList[j], gid);
21,407,589✔
1523
    } else {  // start a new session window
1524
      // start a new session window
1525
      if (pRowSup->numOfRows > 0) {  // handled data that belongs to the previous session window
7,752,204✔
1526
        SResultRow* pResult = NULL;
7,750,921✔
1527

1528
        // keep the time window for the closed time window.
1529
        STimeWindow window = pRowSup->win;
7,750,921✔
1530

1531
        int32_t ret =
1532
            setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
7,750,921✔
1533
                                   numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1534
        if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
7,751,001!
1535
          T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
×
1536
        }
1537

1538
        // pInfo->numOfRows data belong to the current session window
1539
        updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
7,751,001✔
1540
        ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
7,750,983✔
1541
                                              pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
7,750,983✔
1542
        if (ret != TSDB_CODE_SUCCESS) {
7,751,010!
1543
          T_LONG_JMP(pTaskInfo->env, ret);
×
1544
        }
1545
      }
1546

1547
      // here we start a new session window
1548
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
7,752,293✔
1549
      doKeepTuple(pRowSup, tsList[j], gid);
7,752,288✔
1550
    }
1551
  }
1552

1553
  SResultRow* pResult = NULL;
110,430✔
1554
  pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
110,430✔
1555
  int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
110,430✔
1556
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1557
  if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
110,428!
1558
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
×
1559
  }
1560

1561
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
110,428✔
1562
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
110,428✔
1563
                                        pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
110,428✔
1564
  if (ret != TSDB_CODE_SUCCESS) {
110,428!
1565
    T_LONG_JMP(pTaskInfo->env, ret);
×
1566
  }
1567
}
110,428✔
1568

1569
static int32_t doSessionWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
202,762✔
1570
  if (pOperator->status == OP_EXEC_DONE) {
202,762✔
1571
    (*ppRes) = NULL;
82,656✔
1572
    return TSDB_CODE_SUCCESS;
82,656✔
1573
  }
1574

1575
  int32_t                  code = TSDB_CODE_SUCCESS;
120,106✔
1576
  int32_t                  lino = 0;
120,106✔
1577
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
120,106✔
1578
  SSessionAggOperatorInfo* pInfo = pOperator->info;
120,106✔
1579
  SOptrBasicInfo*          pBInfo = &pInfo->binfo;
120,106✔
1580
  SExprSupp*               pSup = &pOperator->exprSupp;
120,106✔
1581

1582
  pInfo->cleanGroupResInfo = false;
120,106✔
1583
  if (pOperator->status == OP_RES_TO_RETURN) {
120,106✔
1584
    while (1) {
×
1585
      doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
20,342✔
1586
      code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
20,343✔
1587
      QUERY_CHECK_CODE(code, lino, _end);
20,343!
1588

1589
      bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
20,343✔
1590
      if (!hasRemain) {
20,343✔
1591
        setOperatorCompleted(pOperator);
3,497✔
1592
        break;
3,497✔
1593
      }
1594

1595
      if (pBInfo->pRes->info.rows > 0) {
16,846!
1596
        break;
16,846✔
1597
      }
1598
    }
1599
    pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
20,343✔
1600
    (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
20,343!
1601
    return code;
20,343✔
1602
  }
1603

1604
  int64_t st = taosGetTimestampUs();
99,770✔
1605
  int32_t order = pInfo->binfo.inputTsOrder;
99,770✔
1606

1607
  SOperatorInfo* downstream = pOperator->pDownstream[0];
99,770✔
1608

1609
  while (1) {
110,428✔
1610
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
210,198✔
1611
    if (pBlock == NULL) {
210,198✔
1612
      break;
99,771✔
1613
    }
1614

1615
    pBInfo->pRes->info.scanFlag = pBlock->info.scanFlag;
110,427✔
1616
    if (pInfo->scalarSupp.pExprInfo != NULL) {
110,427✔
1617
      SExprSupp* pExprSup = &pInfo->scalarSupp;
3✔
1618
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
3✔
1619
      QUERY_CHECK_CODE(code, lino, _end);
3!
1620
    }
1621
    // the pDataBlock are always the same one, no need to call this again
1622
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
110,427✔
1623
    QUERY_CHECK_CODE(code, lino, _end);
110,428!
1624

1625
    code = blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
110,428✔
1626
    QUERY_CHECK_CODE(code, lino, _end);
110,428!
1627

1628
    doSessionWindowAggImpl(pOperator, pInfo, pBlock);
110,428✔
1629
  }
1630

1631
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
99,771✔
1632

1633
  // restore the value
1634
  pOperator->status = OP_RES_TO_RETURN;
99,771✔
1635

1636
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
99,771✔
1637
  QUERY_CHECK_CODE(code, lino, _end);
99,771!
1638
  pInfo->cleanGroupResInfo = true;
99,771✔
1639

1640
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
99,771✔
1641
  QUERY_CHECK_CODE(code, lino, _end);
99,770!
1642
  while (1) {
×
1643
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
99,770✔
1644
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
99,771✔
1645
    QUERY_CHECK_CODE(code, lino, _end);
99,770!
1646

1647
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
99,770✔
1648
    if (!hasRemain) {
99,770✔
1649
      setOperatorCompleted(pOperator);
96,273✔
1650
      break;
96,273✔
1651
    }
1652

1653
    if (pBInfo->pRes->info.rows > 0) {
3,497!
1654
      break;
3,497✔
1655
    }
1656
  }
1657
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
99,770✔
1658

1659
_end:
99,770✔
1660
  if (code != TSDB_CODE_SUCCESS) {
99,770!
1661
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1662
    pTaskInfo->code = code;
×
1663
    T_LONG_JMP(pTaskInfo->env, code);
×
1664
  }
1665
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
99,770✔
1666
  return code;
99,770✔
1667
}
1668

1669
// todo make this as an non-blocking operator
1670
int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode,
74,521✔
1671
                                      SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
1672
  QRY_PARAM_CHECK(pOptrInfo);
74,521!
1673

1674
  int32_t                   code = TSDB_CODE_SUCCESS;
74,521✔
1675
  int32_t                   lino = 0;
74,521✔
1676
  SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
74,521✔
1677
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
74,520✔
1678
  if (pInfo == NULL || pOperator == NULL) {
74,522!
1679
    code = terrno;
×
1680
    goto _error;
×
1681
  }
1682

1683
  pOperator->exprSupp.hasWindowOrGroup = true;
74,522✔
1684
  int32_t      tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
74,522✔
1685
  SColumnNode* pColNode = (SColumnNode*)(pStateNode->pStateKey);
74,522✔
1686

1687
  if (pStateNode->window.pExprs != NULL) {
74,522✔
1688
    int32_t    numOfScalarExpr = 0;
4,531✔
1689
    SExprInfo* pScalarExprInfo = NULL;
4,531✔
1690
    code = createExprInfo(pStateNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
4,531✔
1691
    QUERY_CHECK_CODE(code, lino, _error);
4,531!
1692

1693
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
4,531✔
1694
    if (code != TSDB_CODE_SUCCESS) {
4,531!
1695
      goto _error;
×
1696
    }
1697
  }
1698

1699
  pInfo->stateCol = extractColumnFromColumnNode(pColNode);
74,522✔
1700
  pInfo->stateKey.type = pInfo->stateCol.type;
74,519✔
1701
  pInfo->stateKey.bytes = pInfo->stateCol.bytes;
74,519✔
1702
  pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes);
74,519✔
1703
  if (pInfo->stateKey.pData == NULL) {
74,521!
1704
    goto _error;
×
1705
  }
1706
  pInfo->binfo.inputTsOrder = pStateNode->window.node.inputTsOrder;
74,521✔
1707
  pInfo->binfo.outputTsOrder = pStateNode->window.node.outputTsOrder;
74,521✔
1708

1709
  code = filterInitFromNode((SNode*)pStateNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
74,521✔
1710
  if (code != TSDB_CODE_SUCCESS) {
74,520!
1711
    goto _error;
×
1712
  }
1713

1714
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
74,520✔
1715

1716
  int32_t    num = 0;
74,520✔
1717
  SExprInfo* pExprInfo = NULL;
74,520✔
1718
  code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &num);
74,520✔
1719
  QUERY_CHECK_CODE(code, lino, _error);
74,522!
1720

1721
  initResultSizeInfo(&pOperator->resultInfo, 4096);
74,522✔
1722

1723
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
74,522✔
1724
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
74,522✔
1725
  if (code != TSDB_CODE_SUCCESS) {
74,522!
1726
    goto _error;
×
1727
  }
1728

1729
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pStateNode->window.node.pOutputDataBlockDesc);
74,522✔
1730
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
74,522!
1731
  initBasicInfo(&pInfo->binfo, pResBlock);
74,522✔
1732
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
74,522✔
1733

1734
  pInfo->twAggSup =
74,522✔
1735
      (STimeWindowAggSupp){.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};
74,522✔
1736

1737
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
74,522✔
1738
  QUERY_CHECK_CODE(code, lino, _error);
74,522!
1739

1740
  pInfo->tsSlotId = tsSlotId;
74,522✔
1741
  pInfo->pOperator = pOperator;
74,522✔
1742
  pInfo->cleanGroupResInfo = false;
74,522✔
1743
  setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
74,522✔
1744
                  pTaskInfo);
1745
  pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAggNext, NULL, destroyStateWindowOperatorInfo,
74,521✔
1746
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1747

1748
  code = appendDownstream(pOperator, &downstream, 1);
74,520✔
1749
  if (code != TSDB_CODE_SUCCESS) {
74,522!
1750
    goto _error;
×
1751
  }
1752

1753
  *pOptrInfo = pOperator;
74,522✔
1754
  return TSDB_CODE_SUCCESS;
74,522✔
1755

1756
_error:
×
1757
  if (pInfo != NULL) {
×
1758
    destroyStateWindowOperatorInfo(pInfo);
×
1759
  }
1760

1761
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1762
  pTaskInfo->code = code;
×
1763
  return code;
×
1764
}
1765

1766
void destroySWindowOperatorInfo(void* param) {
99,771✔
1767
  SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param;
99,771✔
1768
  if (pInfo == NULL) {
99,771!
1769
    return;
×
1770
  }
1771

1772
  cleanupBasicInfo(&pInfo->binfo);
99,771✔
1773
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
99,771✔
1774
  if (pInfo->pOperator) {
99,771!
1775
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
99,771✔
1776
                      pInfo->cleanGroupResInfo);
99,771✔
1777
    pInfo->pOperator = NULL;
99,771✔
1778
  }
1779

1780
  cleanupAggSup(&pInfo->aggSup);
99,771✔
1781
  cleanupExprSupp(&pInfo->scalarSupp);
99,770✔
1782

1783
  cleanupGroupResInfo(&pInfo->groupResInfo);
99,770✔
1784
  taosMemoryFreeClear(param);
99,771!
1785
}
1786

1787
int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
99,768✔
1788
                                     SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
1789
  QRY_PARAM_CHECK(pOptrInfo);
99,768!
1790

1791
  int32_t                  code = TSDB_CODE_SUCCESS;
99,768✔
1792
  int32_t                  lino = 0;
99,768✔
1793
  SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
99,768✔
1794
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
99,770✔
1795
  if (pInfo == NULL || pOperator == NULL) {
99,770!
1796
    code = terrno;
×
1797
    goto _error;
×
1798
  }
1799

1800
  pOperator->exprSupp.hasWindowOrGroup = true;
99,770✔
1801

1802
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
99,770✔
1803
  initResultSizeInfo(&pOperator->resultInfo, 4096);
99,770✔
1804

1805
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc);
99,770✔
1806
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
99,771!
1807
  initBasicInfo(&pInfo->binfo, pResBlock);
99,771✔
1808

1809
  int32_t      numOfCols = 0;
99,771✔
1810
  SExprInfo*   pExprInfo = NULL;
99,771✔
1811
  code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
99,771✔
1812
  QUERY_CHECK_CODE(code, lino, _error);
99,769!
1813

1814
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
99,769✔
1815
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
99,769✔
1816
  QUERY_CHECK_CODE(code, lino, _error);
99,770!
1817

1818
  pInfo->twAggSup.waterMark = pSessionNode->window.watermark;
99,770✔
1819
  pInfo->twAggSup.calTrigger = pSessionNode->window.triggerType;
99,770✔
1820
  pInfo->gap = pSessionNode->gap;
99,770✔
1821

1822
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
99,770✔
1823
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
99,770✔
1824
  QUERY_CHECK_CODE(code, lino, _error);
99,770!
1825

1826
  pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
99,770✔
1827
  pInfo->binfo.pRes = pResBlock;
99,770✔
1828
  pInfo->winSup.prevTs = INT64_MIN;
99,770✔
1829
  pInfo->reptScan = false;
99,770✔
1830
  pInfo->binfo.inputTsOrder = pSessionNode->window.node.inputTsOrder;
99,770✔
1831
  pInfo->binfo.outputTsOrder = pSessionNode->window.node.outputTsOrder;
99,770✔
1832

1833
  if (pSessionNode->window.pExprs != NULL) {
99,770✔
1834
    int32_t    numOfScalar = 0;
1✔
1835
    SExprInfo* pScalarExprInfo = NULL;
1✔
1836
    code = createExprInfo(pSessionNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
1✔
1837
    QUERY_CHECK_CODE(code, lino, _error);
1!
1838

1839
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
1✔
1840
    QUERY_CHECK_CODE(code, lino, _error);
1!
1841
  }
1842

1843
  code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
99,770✔
1844
  QUERY_CHECK_CODE(code, lino, _error);
99,769!
1845

1846
  pInfo->pOperator = pOperator;
99,769✔
1847
  pInfo->cleanGroupResInfo = false;
99,769✔
1848
  setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED,
99,769✔
1849
                  pInfo, pTaskInfo);
1850
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSessionWindowAggNext, NULL, destroySWindowOperatorInfo,
99,769✔
1851
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1852
  pOperator->pTaskInfo = pTaskInfo;
99,768✔
1853
  code = appendDownstream(pOperator, &downstream, 1);
99,768✔
1854
  QUERY_CHECK_CODE(code, lino, _error);
99,769!
1855

1856
  *pOptrInfo = pOperator;
99,769✔
1857
  return TSDB_CODE_SUCCESS;
99,769✔
1858

1859
_error:
×
1860
  if (pInfo != NULL) destroySWindowOperatorInfo(pInfo);
×
1861
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1862
  pTaskInfo->code = code;
×
1863
  return code;
×
1864
}
1865

1866
void destroyMAIOperatorInfo(void* param) {
545,162✔
1867
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = (SMergeAlignedIntervalAggOperatorInfo*)param;
545,162✔
1868
  destroyIntervalOperatorInfo(miaInfo->intervalAggOperatorInfo);
545,162✔
1869
  taosMemoryFreeClear(param);
545,164!
1870
}
545,164✔
1871

1872
static SResultRow* doSetSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, SAggSupporter* pSup) {
349,603✔
1873
  SResultRow* pResult = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
349,603✔
1874
  if (NULL == pResult) {
349,600!
1875
    return pResult;
×
1876
  }
1877
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
349,600✔
1878
  return pResult;
349,600✔
1879
}
1880

1881
static int32_t setSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult,
67,589,756✔
1882
                                       SExprSupp* pExprSup, SAggSupporter* pAggSup) {
1883
  if (*pResult == NULL) {
67,589,756✔
1884
    *pResult = doSetSingleOutputTupleBuf(pResultRowInfo, pAggSup);
349,603✔
1885
    if (*pResult == NULL) {
349,601!
1886
      return terrno;
×
1887
    }
1888
  }
1889

1890
  // set time window for current result
1891
  (*pResult)->win = (*win);
67,589,754✔
1892
  return setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
67,589,754✔
1893
}
1894

1895
static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,
433,063✔
1896
                                          SSDataBlock* pBlock, SSDataBlock* pResultBlock) {
1897
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
433,063✔
1898
  SIntervalAggOperatorInfo*             iaInfo = miaInfo->intervalAggOperatorInfo;
433,063✔
1899

1900
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
433,063✔
1901
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
433,063✔
1902
  SInterval*     pInterval = &iaInfo->interval;
433,063✔
1903

1904
  int32_t  startPos = 0;
433,063✔
1905
  int64_t* tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
433,063✔
1906

1907
  TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
433,062✔
1908

1909
  // there is an result exists
1910
  if (miaInfo->curTs != INT64_MIN) {
433,061✔
1911
    if (ts != miaInfo->curTs) {
61,438✔
1912
      finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
40,397✔
1913
      resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
40,397✔
1914
      miaInfo->curTs = ts;
40,397✔
1915
    }
1916
  } else {
1917
    miaInfo->curTs = ts;
371,623✔
1918
  }
1919

1920
  STimeWindow win = {0};
433,061✔
1921
  win.skey = miaInfo->curTs;
433,061✔
1922
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
433,061✔
1923

1924
  int32_t ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
433,064✔
1925
  if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
433,059!
UNCOV
1926
    T_LONG_JMP(pTaskInfo->env, ret);
×
1927
  }
1928

1929
  int32_t currPos = startPos;
433,059✔
1930

1931
  STimeWindow currWin = win;
433,059✔
1932
  while (++currPos < pBlock->info.rows) {
109,668,656✔
1933
    if (tsCols[currPos] == miaInfo->curTs) {
109,236,117✔
1934
      continue;
42,073,216✔
1935
    }
1936

1937
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
67,162,901✔
1938
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
67,164,195✔
1939
                                          currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
67,164,195✔
1940
    if (ret != TSDB_CODE_SUCCESS) {
67,157,150!
1941
      T_LONG_JMP(pTaskInfo->env, ret);
×
1942
    }
1943

1944
    finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
67,157,150✔
1945
    resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
67,086,136✔
1946
    miaInfo->curTs = tsCols[currPos];
67,109,793✔
1947

1948
    currWin.skey = miaInfo->curTs;
67,109,793✔
1949
    currWin.ekey = taosTimeAdd(currWin.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
67,109,793✔
1950

1951
    startPos = currPos;
67,146,458✔
1952
    ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
67,146,458✔
1953
    if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
67,161,069!
1954
      T_LONG_JMP(pTaskInfo->env, ret);
×
1955
    }
1956

1957
    miaInfo->curTs = currWin.skey;
67,162,381✔
1958
  }
1959

1960
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
432,539✔
1961
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
433,061✔
1962
                                        currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
433,061✔
1963
  if (ret != TSDB_CODE_SUCCESS) {
433,063!
1964
    T_LONG_JMP(pTaskInfo->env, ret);
×
1965
  }
1966
}
433,063✔
1967

1968
static void cleanupAfterGroupResultGen(SMergeAlignedIntervalAggOperatorInfo* pMiaInfo, SSDataBlock* pRes) {
371,095✔
1969
  pRes->info.id.groupId = pMiaInfo->groupId;
371,095✔
1970
  pMiaInfo->curTs = INT64_MIN;
371,095✔
1971
  pMiaInfo->groupId = 0;
371,095✔
1972
}
371,095✔
1973

1974
static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
636,451✔
1975
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
636,451✔
1976
  int32_t                               code = TSDB_CODE_SUCCESS;
636,451✔
1977
  int32_t                               lino = 0;
636,451✔
1978
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
636,451✔
1979
  SIntervalAggOperatorInfo*             pIaInfo = pMiaInfo->intervalAggOperatorInfo;
636,451✔
1980

1981
  SExprSupp*      pSup = &pOperator->exprSupp;
636,451✔
1982
  SSDataBlock*    pRes = pIaInfo->binfo.pRes;
636,451✔
1983
  SResultRowInfo* pResultRowInfo = &pIaInfo->binfo.resultRowInfo;
636,451✔
1984
  SOperatorInfo*  downstream = pOperator->pDownstream[0];
636,451✔
1985

1986
  while (1) {
363,263✔
1987
    SSDataBlock* pBlock = NULL;
999,714✔
1988
    if (pMiaInfo->prefetchedBlock == NULL) {
999,714✔
1989
      pBlock = getNextBlockFromDownstream(pOperator, 0);
977,692✔
1990
    } else {
1991
      pBlock = pMiaInfo->prefetchedBlock;
22,022✔
1992
      pMiaInfo->prefetchedBlock = NULL;
22,022✔
1993

1994
      pMiaInfo->groupId = pBlock->info.id.groupId;
22,022✔
1995
    }
1996

1997
    // no data exists, all query processing is done
1998
    if (pBlock == NULL) {
999,715✔
1999
      // close last unclosed time window
2000
      if (pMiaInfo->curTs != INT64_MIN) {
544,633✔
2001
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
349,074✔
2002
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
349,071✔
2003
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
349,072✔
2004
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
349,073✔
2005
        QUERY_CHECK_CODE(code, lino, _end);
349,073!
2006
      }
2007

2008
      setOperatorCompleted(pOperator);
544,632✔
2009
      break;
544,637✔
2010
    }
2011

2012
    if (pMiaInfo->groupId == 0) {
455,082✔
2013
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
403,129✔
2014
        pMiaInfo->groupId = pBlock->info.id.groupId;
2,239✔
2015
        pRes->info.id.groupId = pMiaInfo->groupId;
2,239✔
2016
      }
2017
    } else {
2018
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
51,953✔
2019
        // if there are unclosed time window, close it firstly.
2020
        if (pMiaInfo->curTs == INT64_MIN) {
22,022!
2021
          pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2022
          T_LONG_JMP(pTaskInfo->env, terrno);
×
2023
        }
2024
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
22,022✔
2025
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
22,022✔
2026

2027
        pMiaInfo->prefetchedBlock = pBlock;
22,022✔
2028
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
22,022✔
2029
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
22,022✔
2030
        QUERY_CHECK_CODE(code, lino, _end);
22,022!
2031
        if (pRes->info.rows == 0) {
22,022✔
2032
          // After filtering for last group, the result is empty, so we need to continue to process next group
2033
          continue;
34✔
2034
        } else {
2035
          break;
21,988✔
2036
        }
2037
      } else {
2038
        // continue
2039
        pRes->info.id.groupId = pMiaInfo->groupId;
29,931✔
2040
      }
2041
    }
2042

2043
    pRes->info.scanFlag = pBlock->info.scanFlag;
433,060✔
2044
    code = setInputDataBlock(pSup, pBlock, pIaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
433,060✔
2045
    QUERY_CHECK_CODE(code, lino, _end);
433,063!
2046

2047
    doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes);
433,063✔
2048

2049
    code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
433,063✔
2050
    QUERY_CHECK_CODE(code, lino, _end);
433,065!
2051

2052
    if (pRes->info.rows >= pOperator->resultInfo.capacity) {
433,065✔
2053
      break;
69,836✔
2054
    }
2055
  }
2056

2057
_end:
636,461✔
2058
  if (code != TSDB_CODE_SUCCESS) {
636,461!
2059
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2060
    pTaskInfo->code = code;
×
2061
    T_LONG_JMP(pTaskInfo->env, code);
×
2062
  }
2063
}
636,461✔
2064

2065
static int32_t mergeAlignedIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,259,102✔
2066
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
1,259,102✔
2067
  int32_t                               code = TSDB_CODE_SUCCESS;
1,259,102✔
2068
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
1,259,102✔
2069
  SIntervalAggOperatorInfo*             iaInfo = pMiaInfo->intervalAggOperatorInfo;
1,259,102✔
2070
  if (pOperator->status == OP_EXEC_DONE) {
1,259,102✔
2071
    (*ppRes) = NULL;
627,609✔
2072
    return code;
627,609✔
2073
  }
2074

2075
  SSDataBlock* pRes = iaInfo->binfo.pRes;
631,493✔
2076
  blockDataCleanup(pRes);
631,493✔
2077

2078
  if (iaInfo->binfo.mergeResultBlock) {
631,509✔
2079
    while (1) {
2080
      if (pOperator->status == OP_EXEC_DONE) {
537,261✔
2081
        break;
205,364✔
2082
      }
2083

2084
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
331,897✔
2085
        break;
60,793✔
2086
      }
2087

2088
      doMergeAlignedIntervalAgg(pOperator);
271,104✔
2089
    }
2090
  } else {
2091
    doMergeAlignedIntervalAgg(pOperator);
365,351✔
2092
  }
2093

2094
  size_t rows = pRes->info.rows;
631,513✔
2095
  pOperator->resultInfo.totalRows += rows;
631,513✔
2096
  (*ppRes) = (rows == 0) ? NULL : pRes;
631,513✔
2097
  return code;
631,513✔
2098
}
2099

2100
int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
545,153✔
2101
                                                      SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2102
  QRY_PARAM_CHECK(pOptrInfo);
545,153!
2103

2104
  int32_t                               code = TSDB_CODE_SUCCESS;
545,153✔
2105
  int32_t                               lino = 0;
545,153✔
2106
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedIntervalAggOperatorInfo));
545,153✔
2107
  SOperatorInfo*                        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
545,168✔
2108
  if (miaInfo == NULL || pOperator == NULL) {
545,155!
2109
    code = terrno;
×
2110
    goto _error;
×
2111
  }
2112

2113
  miaInfo->intervalAggOperatorInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
545,156✔
2114
  if (miaInfo->intervalAggOperatorInfo == NULL) {
545,151!
2115
    code = terrno;
×
2116
    goto _error;
×
2117
  }
2118

2119
  SInterval interval = {.interval = pNode->interval,
545,151✔
2120
                        .sliding = pNode->sliding,
545,151✔
2121
                        .intervalUnit = pNode->intervalUnit,
545,151✔
2122
                        .slidingUnit = pNode->slidingUnit,
545,151✔
2123
                        .offset = pNode->offset,
545,151✔
2124
                        .precision = ((SColumnNode*)pNode->window.pTspk)->node.resType.precision};
545,151✔
2125

2126
  SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
545,151✔
2127
  SExprSupp*                pSup = &pOperator->exprSupp;
545,151✔
2128
  pSup->hasWindowOrGroup = true;
545,151✔
2129

2130
  code = filterInitFromNode((SNode*)pNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
545,151✔
2131
  QUERY_CHECK_CODE(code, lino, _error);
545,163!
2132

2133
  miaInfo->curTs = INT64_MIN;
545,163✔
2134
  iaInfo->win = pTaskInfo->window;
545,163✔
2135
  iaInfo->binfo.inputTsOrder = pNode->window.node.inputTsOrder;
545,163✔
2136
  iaInfo->binfo.outputTsOrder = pNode->window.node.outputTsOrder;
545,163✔
2137
  iaInfo->interval = interval;
545,163✔
2138
  iaInfo->primaryTsIndex = ((SColumnNode*)pNode->window.pTspk)->slotId;
545,163✔
2139
  iaInfo->binfo.mergeResultBlock = pNode->window.mergeDataBlock;
545,163✔
2140

2141
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
545,163✔
2142
  initResultSizeInfo(&pOperator->resultInfo, 512);
545,163✔
2143

2144
  int32_t    num = 0;
545,164✔
2145
  SExprInfo* pExprInfo = NULL;
545,164✔
2146
  code = createExprInfo(pNode->window.pFuncs, NULL, &pExprInfo, &num);
545,164✔
2147
  QUERY_CHECK_CODE(code, lino, _error);
545,163!
2148

2149
  code = initAggSup(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
545,163✔
2150
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
545,163✔
2151
  QUERY_CHECK_CODE(code, lino, _error);
545,167!
2152

2153
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pNode->window.node.pOutputDataBlockDesc);
545,167✔
2154
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
545,165!
2155
  initBasicInfo(&iaInfo->binfo, pResBlock);
545,165✔
2156
  code = initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
545,166✔
2157
  QUERY_CHECK_CODE(code, lino, _error);
545,166!
2158

2159
  iaInfo->timeWindowInterpo = false;
545,166✔
2160
  code = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo, &iaInfo->timeWindowInterpo);
545,166✔
2161
  QUERY_CHECK_CODE(code, lino, _error);
545,158!
2162
  if (iaInfo->timeWindowInterpo) {
545,158!
2163
    iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
2164
  }
2165

2166
  initResultRowInfo(&iaInfo->binfo.resultRowInfo);
545,158✔
2167
  code = blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity);
545,155✔
2168
  QUERY_CHECK_CODE(code, lino, _error);
545,164!
2169
  iaInfo->pOperator = pOperator;
545,164✔
2170
  setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL,
545,164✔
2171
                  false, OP_NOT_OPENED, miaInfo, pTaskInfo);
2172

2173
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAggNext, NULL, destroyMAIOperatorInfo,
545,165✔
2174
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2175

2176
  code = appendDownstream(pOperator, &downstream, 1);
545,165✔
2177
  QUERY_CHECK_CODE(code, lino, _error);
545,166!
2178

2179
  *pOptrInfo = pOperator;
545,166✔
2180
  return TSDB_CODE_SUCCESS;
545,166✔
2181

2182
_error:
×
2183
  if (miaInfo != NULL) destroyMAIOperatorInfo(miaInfo);
×
2184
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2185
  pTaskInfo->code = code;
×
2186
  return code;
×
2187
}
2188

2189
//=====================================================================================================================
2190
// merge interval operator
2191
typedef struct SMergeIntervalAggOperatorInfo {
2192
  SIntervalAggOperatorInfo intervalAggOperatorInfo;
2193
  SList*                   groupIntervals;
2194
  SListIter                groupIntervalsIter;
2195
  bool                     hasGroupId;
2196
  uint64_t                 groupId;
2197
  SSDataBlock*             prefetchedBlock;
2198
  bool                     inputBlocksFinished;
2199
} SMergeIntervalAggOperatorInfo;
2200

2201
typedef struct SGroupTimeWindow {
2202
  uint64_t    groupId;
2203
  STimeWindow window;
2204
} SGroupTimeWindow;
2205

2206
void destroyMergeIntervalOperatorInfo(void* param) {
×
2207
  SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
×
2208
  miaInfo->groupIntervals = tdListFree(miaInfo->groupIntervals);
×
2209
  destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo);
×
2210

2211
  taosMemoryFreeClear(param);
×
2212
}
×
2213

2214
static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock,
×
2215
                                        STimeWindow* newWin) {
2216
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
2217
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2218
  bool                           ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
2219

2220
  SGroupTimeWindow groupTimeWindow = {.groupId = tableGroupId, .window = *newWin};
×
2221
  int32_t          code = tdListAppend(miaInfo->groupIntervals, &groupTimeWindow);
×
2222
  if (code != TSDB_CODE_SUCCESS) {
×
2223
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2224
    return code;
×
2225
  }
2226

2227
  SListIter iter = {0};
×
2228
  tdListInitIter(miaInfo->groupIntervals, &iter, TD_LIST_FORWARD);
×
2229
  SListNode* listNode = NULL;
×
2230
  while ((listNode = tdListNext(&iter)) != NULL) {
×
2231
    SGroupTimeWindow* prevGrpWin = (SGroupTimeWindow*)listNode->data;
×
2232
    if (prevGrpWin->groupId != tableGroupId) {
×
2233
      continue;
×
2234
    }
2235

2236
    STimeWindow* prevWin = &prevGrpWin->window;
×
2237
    if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) {
×
2238
      SListNode* tmp = tdListPopNode(miaInfo->groupIntervals, listNode);
×
2239
      taosMemoryFreeClear(tmp);
×
2240
    }
2241
  }
2242

2243
  return TSDB_CODE_SUCCESS;
×
2244
}
2245

2246
static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
×
2247
                                   int32_t scanFlag, SSDataBlock* pResultBlock) {
2248
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
2249
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2250

2251
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
×
2252
  SExprSupp*     pExprSup = &pOperatorInfo->exprSupp;
×
2253

2254
  int32_t     startPos = 0;
×
2255
  int32_t     numOfOutput = pExprSup->numOfExprs;
×
2256
  int64_t*    tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
×
2257
  uint64_t    tableGroupId = pBlock->info.id.groupId;
×
2258
  bool        ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
2259
  TSKEY       blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
×
2260
  SResultRow* pResult = NULL;
×
2261

2262
  STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
×
2263
                                        iaInfo->binfo.inputTsOrder);
2264

2265
  int32_t ret =
2266
      setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
2267
                             numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2268
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
×
2269
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
×
2270
  }
2271

2272
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
×
2273
  int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
2274
                                                 iaInfo->binfo.inputTsOrder);
2275
  if(forwardRows <= 0) {
×
2276
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2277
  }
2278

2279
  // prev time window not interpolation yet.
2280
  if (iaInfo->timeWindowInterpo) {
×
2281
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
×
2282
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
×
2283

2284
    // restore current time window
2285
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
2286
                                 numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2287
    if (ret != TSDB_CODE_SUCCESS) {
×
2288
      T_LONG_JMP(pTaskInfo->env, ret);
×
2289
    }
2290

2291
    // window start key interpolation
2292
    ret = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &win, startPos, forwardRows, pExprSup);
×
2293
    if (ret != TSDB_CODE_SUCCESS) {
×
2294
      T_LONG_JMP(pTaskInfo->env, ret);
×
2295
    }
2296
  }
2297

2298
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, 1);
×
2299
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,
×
2300
                                  pBlock->info.rows, numOfOutput);
×
2301
  if (ret != TSDB_CODE_SUCCESS) {
×
2302
    T_LONG_JMP(pTaskInfo->env, ret);
×
2303
  }
2304
  doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
2305

2306
  // output previous interval results after this interval (&win) is closed
2307
  int32_t code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &win);
×
2308
  if (code != TSDB_CODE_SUCCESS) {
×
2309
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2310
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
×
2311
  }
2312

2313
  STimeWindow nextWin = win;
×
2314
  while (1) {
×
2315
    int32_t prevEndPos = forwardRows - 1 + startPos;
×
2316
    startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos,
×
2317
                                      iaInfo->binfo.inputTsOrder);
2318
    if (startPos < 0) {
×
2319
      break;
×
2320
    }
2321

2322
    // null data, failed to allocate more memory buffer
2323
    code =
2324
        setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
×
2325
                               pExprSup->pCtx, numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2326
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
×
2327
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
×
2328
    }
2329

2330
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
×
2331
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
2332
                                           iaInfo->binfo.inputTsOrder);
2333

2334
    // window start(end) key interpolation
2335
    code = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup);
×
2336
    if (code != TSDB_CODE_SUCCESS) {
×
2337
      T_LONG_JMP(pTaskInfo->env, code);
×
2338
    }
2339

2340
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, 1);
×
2341
    code = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
×
2342
                                           forwardRows, pBlock->info.rows, numOfOutput);
×
2343
    if (code != TSDB_CODE_SUCCESS) {
×
2344
      T_LONG_JMP(pTaskInfo->env, code);
×
2345
    }
2346
    doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
2347

2348
    // output previous interval results after this interval (&nextWin) is closed
2349
    code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &nextWin);
×
2350
    if (code != TSDB_CODE_SUCCESS) {
×
2351
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2352
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
×
2353
    }
2354
  }
2355

2356
  if (iaInfo->timeWindowInterpo) {
×
2357
    saveDataBlockLastRow(iaInfo->pPrevValues, pBlock, iaInfo->pInterpCols);
×
2358
  }
2359
}
×
2360

2361
static int32_t doMergeIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
2362
  int32_t        code = TSDB_CODE_SUCCESS;
×
2363
  int32_t        lino = 0;
×
2364
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
×
2365

2366
  SMergeIntervalAggOperatorInfo* miaInfo = pOperator->info;
×
2367
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2368
  SExprSupp*                     pExpSupp = &pOperator->exprSupp;
×
2369

2370
  if (pOperator->status == OP_EXEC_DONE) {
×
2371
    (*ppRes) = NULL;
×
2372
    return code;
×
2373
  }
2374

2375
  SSDataBlock* pRes = iaInfo->binfo.pRes;
×
2376
  blockDataCleanup(pRes);
×
2377
  code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
×
2378
  QUERY_CHECK_CODE(code, lino, _end);
×
2379

2380
  if (!miaInfo->inputBlocksFinished) {
×
2381
    SOperatorInfo* downstream = pOperator->pDownstream[0];
×
2382
    while (1) {
×
2383
      SSDataBlock* pBlock = NULL;
×
2384
      if (miaInfo->prefetchedBlock == NULL) {
×
2385
        pBlock = getNextBlockFromDownstream(pOperator, 0);
×
2386
      } else {
2387
        pBlock = miaInfo->prefetchedBlock;
×
2388
        miaInfo->groupId = pBlock->info.id.groupId;
×
2389
        miaInfo->prefetchedBlock = NULL;
×
2390
      }
2391

2392
      if (pBlock == NULL) {
×
2393
        tdListInitIter(miaInfo->groupIntervals, &miaInfo->groupIntervalsIter, TD_LIST_FORWARD);
×
2394
        miaInfo->inputBlocksFinished = true;
×
2395
        break;
×
2396
      }
2397

2398
      if (!miaInfo->hasGroupId) {
×
2399
        miaInfo->hasGroupId = true;
×
2400
        miaInfo->groupId = pBlock->info.id.groupId;
×
2401
      } else if (miaInfo->groupId != pBlock->info.id.groupId) {
×
2402
        miaInfo->prefetchedBlock = pBlock;
×
2403
        break;
×
2404
      }
2405

2406
      pRes->info.scanFlag = pBlock->info.scanFlag;
×
2407
      code = setInputDataBlock(pExpSupp, pBlock, iaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
×
2408
      QUERY_CHECK_CODE(code, lino, _end);
×
2409

2410
      doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, pBlock->info.scanFlag, pRes);
×
2411

2412
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
×
2413
        break;
×
2414
      }
2415
    }
2416

2417
    pRes->info.id.groupId = miaInfo->groupId;
×
2418
  }
2419

2420
  if (miaInfo->inputBlocksFinished) {
×
2421
    SListNode* listNode = tdListNext(&miaInfo->groupIntervalsIter);
×
2422

2423
    if (listNode != NULL) {
×
2424
      SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data);
×
2425
      pRes->info.id.groupId = grpWin->groupId;
×
2426
    }
2427
  }
2428

2429
  if (pRes->info.rows == 0) {
×
2430
    setOperatorCompleted(pOperator);
×
2431
  }
2432

2433
  size_t rows = pRes->info.rows;
×
2434
  pOperator->resultInfo.totalRows += rows;
×
2435

2436
_end:
×
2437
  if (code != TSDB_CODE_SUCCESS) {
×
2438
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2439
    pTaskInfo->code = code;
×
2440
    T_LONG_JMP(pTaskInfo->env, code);
×
2441
  }
2442
  (*ppRes) = (rows == 0) ? NULL : pRes;
×
2443
  return code;
×
2444
}
2445

2446
int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
×
2447
                                               SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2448
  QRY_PARAM_CHECK(pOptrInfo);
×
2449

2450
  int32_t                        code = TSDB_CODE_SUCCESS;
×
2451
  int32_t                        lino = 0;
×
2452
  SMergeIntervalAggOperatorInfo* pMergeIntervalInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo));
×
2453
  SOperatorInfo*                 pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
2454
  if (pMergeIntervalInfo == NULL || pOperator == NULL) {
×
2455
    code = terrno;
×
2456
    goto _error;
×
2457
  }
2458

2459
  SInterval interval = {.interval = pIntervalPhyNode->interval,
×
2460
                        .sliding = pIntervalPhyNode->sliding,
×
2461
                        .intervalUnit = pIntervalPhyNode->intervalUnit,
×
2462
                        .slidingUnit = pIntervalPhyNode->slidingUnit,
×
2463
                        .offset = pIntervalPhyNode->offset,
×
2464
                        .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
×
2465

2466
  pMergeIntervalInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow));
×
2467

2468
  SIntervalAggOperatorInfo* pIntervalInfo = &pMergeIntervalInfo->intervalAggOperatorInfo;
×
2469
  pIntervalInfo->win = pTaskInfo->window;
×
2470
  pIntervalInfo->binfo.inputTsOrder = pIntervalPhyNode->window.node.inputTsOrder;
×
2471
  pIntervalInfo->interval = interval;
×
2472
  pIntervalInfo->binfo.mergeResultBlock = pIntervalPhyNode->window.mergeDataBlock;
×
2473
  pIntervalInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
×
2474
  pIntervalInfo->binfo.outputTsOrder = pIntervalPhyNode->window.node.outputTsOrder;
×
2475

2476
  SExprSupp* pExprSupp = &pOperator->exprSupp;
×
2477
  pExprSupp->hasWindowOrGroup = true;
×
2478

2479
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
×
2480
  initResultSizeInfo(&pOperator->resultInfo, 4096);
×
2481

2482
  int32_t    num = 0;
×
2483
  SExprInfo* pExprInfo = NULL;
×
2484
  code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
×
2485
  QUERY_CHECK_CODE(code, lino, _error);
×
2486

2487
  code = initAggSup(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
×
2488
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
×
2489
  if (code != TSDB_CODE_SUCCESS) {
×
2490
    goto _error;
×
2491
  }
2492

2493
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pIntervalPhyNode->window.node.pOutputDataBlockDesc);
×
2494
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
×
2495
  initBasicInfo(&pIntervalInfo->binfo, pResBlock);
×
2496
  code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win);
×
2497
  QUERY_CHECK_CODE(code, lino, _error);
×
2498

2499
  pIntervalInfo->timeWindowInterpo = false;
×
2500
  code = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo, &pIntervalInfo->timeWindowInterpo);
×
2501
  QUERY_CHECK_CODE(code, lino, _error);
×
2502
  if (pIntervalInfo->timeWindowInterpo) {
×
2503
    pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
2504
    if (pIntervalInfo->binfo.resultRowInfo.openWindow == NULL) {
×
2505
      goto _error;
×
2506
    }
2507
  }
2508

2509
  pIntervalInfo->pOperator = pOperator;
×
2510
  initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo);
×
2511
  setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false,
×
2512
                  OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo);
2513
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAggNext, NULL, destroyMergeIntervalOperatorInfo,
×
2514
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2515

2516
  code = appendDownstream(pOperator, &downstream, 1);
×
2517
  if (code != TSDB_CODE_SUCCESS) {
×
2518
    goto _error;
×
2519
  }
2520

2521
  *pOptrInfo = pOperator;
×
2522
  return TSDB_CODE_SUCCESS;
×
2523
_error:
×
2524
  if (pMergeIntervalInfo != NULL) {
×
2525
    destroyMergeIntervalOperatorInfo(pMergeIntervalInfo);
×
2526
  }
2527
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2528
  pTaskInfo->code = code;
×
2529
  return code;
×
2530
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc