• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3549

06 Dec 2024 09:44AM UTC coverage: 59.948% (+0.1%) from 59.846%
#3549

push

travis-ci

web-flow
Merge pull request #29057 from taosdata/docs/TD-33031-3.0

docs: description of user privileges

118833 of 254191 branches covered (46.75%)

Branch coverage included in aggregate %.

199893 of 277480 relevant lines covered (72.04%)

19006119.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.22
/source/libs/executor/src/timewindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "executorInt.h"
16
#include "filter.h"
17
#include "function.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "querytask.h"
21
#include "tchecksum.h"
22
#include "tcommon.h"
23
#include "tcompare.h"
24
#include "tdatablock.h"
25
#include "tfill.h"
26
#include "tglobal.h"
27
#include "tlog.h"
28
#include "ttime.h"
29

30
typedef struct SSessionAggOperatorInfo {
31
  SOptrBasicInfo     binfo;
32
  SAggSupporter      aggSup;
33
  SExprSupp          scalarSupp;  // supporter for perform scalar function
34
  SGroupResInfo      groupResInfo;
35
  SWindowRowsSup     winSup;
36
  bool               reptScan;  // next round scan
37
  int64_t            gap;       // session window gap
38
  int32_t            tsSlotId;  // primary timestamp slot id
39
  STimeWindowAggSupp twAggSup;
40
  SOperatorInfo*     pOperator;
41
  bool               cleanGroupResInfo;
42
} SSessionAggOperatorInfo;
43

44
typedef struct SStateWindowOperatorInfo {
45
  SOptrBasicInfo     binfo;
46
  SAggSupporter      aggSup;
47
  SExprSupp          scalarSup;
48
  SGroupResInfo      groupResInfo;
49
  SWindowRowsSup     winSup;
50
  SColumn            stateCol;  // start row index
51
  bool               hasKey;
52
  SStateKeys         stateKey;
53
  int32_t            tsSlotId;  // primary timestamp column slot id
54
  STimeWindowAggSupp twAggSup;
55
  SOperatorInfo*     pOperator;
56
  bool               cleanGroupResInfo;
57
} SStateWindowOperatorInfo;
58

59
typedef enum SResultTsInterpType {
60
  RESULT_ROW_START_INTERP = 1,
61
  RESULT_ROW_END_INTERP = 2,
62
} SResultTsInterpType;
63

64
typedef struct SOpenWindowInfo {
65
  SResultRowPosition pos;
66
  uint64_t           groupId;
67
} SOpenWindowInfo;
68

69
static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo);
70

71
static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult,
72
                                              uint64_t groupId, SExecTaskInfo* pTaskInfo);
73
static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult);
74

75
static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, bool masterscan,
281,489,191✔
76
                                      SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
77
                                      int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
78
                                      SExecTaskInfo* pTaskInfo) {
79
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
281,489,191✔
80
                                                  masterscan, tableGroupId, pTaskInfo, true, pAggSup, true);
81

82
  if (pResultRow == NULL || pTaskInfo->code != 0) {
283,767,689!
83
    *pResult = NULL;
×
84
    qError("failed to set result output buffer, error:%s", tstrerror(pTaskInfo->code));
×
85
    return pTaskInfo->code;
×
86
  }
87

88
  // set time window for current result
89
  pResultRow->win = (*win);
283,856,210✔
90

91
  *pResult = pResultRow;
283,856,210✔
92
  return setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
283,856,210✔
93
}
94

95
void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) {
175,072,090✔
96
  pRowSup->win.ekey = ts;
175,072,090✔
97
  pRowSup->prevTs = ts;
175,072,090✔
98
  pRowSup->numOfRows += 1;
175,072,090✔
99
  pRowSup->groupId = groupId;
175,072,090✔
100
}
175,072,090✔
101

102
void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex,
146,842,348✔
103
                                     uint64_t groupId) {
104
  pRowSup->startRowIndex = rowIndex;
146,842,348✔
105
  pRowSup->numOfRows = 0;
146,842,348✔
106
  pRowSup->win.skey = tsList[rowIndex];
146,842,348✔
107
  pRowSup->groupId = groupId;
146,842,348✔
108
}
146,842,348✔
109

110
FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos,
×
111
                                            int32_t order, int64_t* pData) {
112
  int32_t forwardRows = 0;
262,405,228✔
113

114
  if (order == TSDB_ORDER_ASC) {
×
115
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
191,870,067✔
116
    if (end >= 0) {
189,544,740!
117
      forwardRows = end;
189,695,378✔
118

119
      while (pData[end + pos] == ekey) {
191,813,947!
120
        forwardRows += 1;
2,118,569✔
121
        ++pos;
2,118,569✔
122
      }
123
    }
124
  } else {
125
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
70,535,161✔
126
    if (end >= 0) {
71,306,436!
127
      forwardRows = end;
71,322,457✔
128

129
      while (pData[end + pos] == ekey) {
89,306,139!
130
        forwardRows += 1;
17,983,682✔
131
        ++pos;
17,983,682✔
132
      }
133
    }
134
    //    int32_t end = searchFn((char*)pData, pos + 1, ekey, order);
135
    //    if (end >= 0) {
136
    //      forwardRows = pos - end;
137
    //
138
    //      if (pData[end] == ekey) {
139
    //        forwardRows += 1;
140
    //      }
141
    //    }
142
  }
143

144
  return forwardRows;
260,851,176✔
145
}
146

147
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
396,455,883✔
148
  int32_t midPos = -1;
396,455,883✔
149
  int32_t numOfRows;
150

151
  if (num <= 0) {
396,455,883!
152
    return -1;
×
153
  }
154

155
  TSKEY*  keyList = (TSKEY*)pValue;
396,455,883✔
156
  int32_t firstPos = 0;
396,455,883✔
157
  int32_t lastPos = num - 1;
396,455,883✔
158

159
  if (order == TSDB_ORDER_DESC) {
396,455,883✔
160
    // find the first position which is smaller than the key
161
    while (1) {
162
      if (key >= keyList[firstPos]) return firstPos;
678,507,585✔
163
      if (key == keyList[lastPos]) return lastPos;
630,005,420✔
164

165
      if (key < keyList[lastPos]) {
629,295,336✔
166
        lastPos += 1;
67,577,196✔
167
        if (lastPos >= num) {
67,577,196!
168
          return -1;
×
169
        } else {
170
          return lastPos;
67,577,196✔
171
        }
172
      }
173

174
      numOfRows = lastPos - firstPos + 1;
561,718,140✔
175
      midPos = (numOfRows >> 1) + firstPos;
561,718,140✔
176

177
      if (key < keyList[midPos]) {
561,718,140✔
178
        firstPos = midPos + 1;
137,103,902✔
179
      } else if (key > keyList[midPos]) {
424,614,238!
180
        lastPos = midPos - 1;
426,841,282✔
181
      } else {
182
        break;
×
183
      }
184
    }
185

186
  } else {
187
    // find the first position which is bigger than the key
188
    while (1) {
189
      if (key <= keyList[firstPos]) return firstPos;
2,054,489,246✔
190
      if (key == keyList[lastPos]) return lastPos;
1,973,344,382✔
191

192
      if (key > keyList[lastPos]) {
1,971,689,929✔
193
        lastPos = lastPos + 1;
191,632,130✔
194
        if (lastPos >= num)
191,632,130!
195
          return -1;
×
196
        else
197
          return lastPos;
191,632,130✔
198
      }
199

200
      numOfRows = lastPos - firstPos + 1;
1,780,057,799✔
201
      midPos = (numOfRows >> 1u) + firstPos;
1,780,057,799✔
202

203
      if (key < keyList[midPos]) {
1,780,057,799✔
204
        lastPos = midPos - 1;
1,441,138,517✔
205
      } else if (key > keyList[midPos]) {
338,919,282✔
206
        firstPos = midPos + 1;
331,457,247✔
207
      } else {
208
        break;
7,462,035✔
209
      }
210
    }
211
  }
212

213
  return midPos;
5,234,991✔
214
}
215

216
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
268,127,387✔
217
                                 __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order) {
218
  int32_t num = -1;
268,127,387✔
219
  int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
268,127,387✔
220

221
  if (order == TSDB_ORDER_ASC) {
268,127,387✔
222
    if (ekey < pDataBlockInfo->window.ekey && pPrimaryColumn) {
195,359,727!
223
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
190,848,123!
224
      if (item != NULL) {
189,544,740!
225
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
226
      }
227
    } else {
228
      num = pDataBlockInfo->rows - startPos;
4,511,604✔
229
      if (item != NULL) {
4,511,604!
230
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
231
      }
232
    }
233
  } else {  // desc
234
    if (ekey > pDataBlockInfo->window.skey && pPrimaryColumn) {
72,767,660!
235
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
71,557,105!
236
      if (item != NULL) {
71,306,436!
237
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
238
      }
239
    } else {
240
      num = pDataBlockInfo->rows - startPos;
1,210,555✔
241
      if (item != NULL) {
1,210,555!
242
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
243
      }
244
    }
245
  }
246

247
  return num;
266,573,335✔
248
}
249

250
void doTimeWindowInterpolation(SArray* pPrevValues, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs,
17,713,644✔
251
                               int32_t curRowIndex, TSKEY windowKey, int32_t type, SExprSupp* pSup) {
252
  SqlFunctionCtx* pCtx = pSup->pCtx;
17,713,644✔
253

254
  int32_t index = 1;
17,713,644✔
255
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
37,690,536✔
256
    if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) {
19,975,933✔
257
      pCtx[k].start.key = INT64_MIN;
2,254,789✔
258
      continue;
2,254,789✔
259
    }
260

261
    SFunctParam*     pParam = &pCtx[k].param[0];
17,721,100✔
262
    SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, pParam->pCol->slotId);
17,721,100✔
263

264
    double v1 = 0, v2 = 0, v = 0;
17,720,280✔
265
    if (prevRowIndex == -1) {
17,720,280!
266
      SGroupKeys* p = taosArrayGet(pPrevValues, index);
×
267
      GET_TYPED_DATA(v1, double, pColInfo->info.type, p->pData);
1!
268
    } else {
269
      GET_TYPED_DATA(v1, double, pColInfo->info.type, colDataGetData(pColInfo, prevRowIndex));
17,723,906!
270
    }
271

272
    GET_TYPED_DATA(v2, double, pColInfo->info.type, colDataGetData(pColInfo, curRowIndex));
17,723,907!
273

274
#if 0
275
    if (functionId == FUNCTION_INTERP) {
276
      if (type == RESULT_ROW_START_INTERP) {
277
        pCtx[k].start.key = prevTs;
278
        pCtx[k].start.val = v1;
279

280
        pCtx[k].end.key = curTs;
281
        pCtx[k].end.val = v2;
282

283
        if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_VARBINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR ||
284
            pColInfo->info.type == TSDB_DATA_TYPE_GEOMETRY) {
285
          if (prevRowIndex == -1) {
286
            //            pCtx[k].start.ptr = (char*)pRuntimeEnv->prevRow[index];
287
          } else {
288
            pCtx[k].start.ptr = (char*)pColInfo->pData + prevRowIndex * pColInfo->info.bytes;
289
          }
290

291
          pCtx[k].end.ptr = (char*)pColInfo->pData + curRowIndex * pColInfo->info.bytes;
292
        }
293
      }
294
    } else if (functionId == FUNCTION_TWA) {
295
#endif
296

297
    SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
17,723,907✔
298
    SPoint point2 = (SPoint){.key = curTs, .val = &v2};
17,723,907✔
299
    SPoint point = (SPoint){.key = windowKey, .val = &v};
17,723,907✔
300

301
    if (!fmIsElapsedFunc(pCtx[k].functionId)) {
17,723,907✔
302
      taosGetLinearInterpolationVal(&point, TSDB_DATA_TYPE_DOUBLE, &point1, &point2, TSDB_DATA_TYPE_DOUBLE);
9,308,199✔
303
    }
304

305
    if (type == RESULT_ROW_START_INTERP) {
17,722,103✔
306
      pCtx[k].start.key = point.key;
8,732,787✔
307
      pCtx[k].start.val = v;
8,732,787✔
308
    } else {
309
      pCtx[k].end.key = point.key;
8,989,316✔
310
      pCtx[k].end.val = v;
8,989,316✔
311
    }
312

313
    index += 1;
17,722,103✔
314
  }
315
#if 0
316
  }
317
#endif
318
}
17,714,603✔
319

320
static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t type) {
1,133,868✔
321
  if (type == RESULT_ROW_START_INTERP) {
1,133,868✔
322
    for (int32_t k = 0; k < numOfOutput; ++k) {
1,524,092✔
323
      pCtx[k].start.key = INT64_MIN;
828,962✔
324
    }
325
  } else {
326
    for (int32_t k = 0; k < numOfOutput; ++k) {
968,958✔
327
      pCtx[k].end.key = INT64_MIN;
530,220✔
328
    }
329
  }
330
}
1,133,868✔
331

332
static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, int32_t pos, SSDataBlock* pBlock,
9,411,543✔
333
                                              const TSKEY* tsCols, STimeWindow* win, SExprSupp* pSup) {
334
  bool ascQuery = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
9,411,543✔
335

336
  TSKEY curTs = tsCols[pos];
9,411,543✔
337

338
  SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
9,411,543✔
339
  TSKEY       lastTs = *(int64_t*)pTsKey->pData;
9,411,213✔
340

341
  // lastTs == INT64_MIN and pos == 0 means this is the first time window, interpolation is not needed.
342
  // start exactly from this point, no need to do interpolation
343
  TSKEY key = ascQuery ? win->skey : win->ekey;
9,411,213!
344
  if (key == curTs) {
9,411,213✔
345
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
519,094✔
346
    return true;
519,094✔
347
  }
348

349
  // it is the first time window, no need to do interpolation
350
  if (pTsKey->isNull && pos == 0) {
8,892,119!
351
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
163,982✔
352
  } else {
353
    TSKEY prevTs = ((pos == 0) ? lastTs : tsCols[pos - 1]);
8,728,137!
354
    doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, pos - 1, curTs, pos, key,
8,728,137✔
355
                              RESULT_ROW_START_INTERP, pSup);
356
  }
357

358
  return true;
8,893,022✔
359
}
360

361
static int32_t setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex,
9,424,137✔
362
                                            int32_t nextRowIndex, SArray* pDataBlock, const TSKEY* tsCols,
363
                                            TSKEY blockEkey, STimeWindow* win, bool* pRes) {
364
  int32_t code = TSDB_CODE_SUCCESS;
9,424,137✔
365
  int32_t lino = 0;
9,424,137✔
366
  int32_t order = pInfo->binfo.inputTsOrder;
9,424,137✔
367

368
  TSKEY actualEndKey = tsCols[endRowIndex];
9,424,137✔
369
  TSKEY key = (order == TSDB_ORDER_ASC) ? win->ekey : win->skey;
9,424,137!
370

371
  // not ended in current data block, do not invoke interpolation
372
  if ((key > blockEkey && (order == TSDB_ORDER_ASC)) || (key < blockEkey && (order == TSDB_ORDER_DESC))) {
9,424,137!
373
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
177,358✔
374
    (*pRes) = false;
177,894✔
375
    return code;
177,894✔
376
  }
377

378
  // there is actual end point of current time window, no interpolation needs
379
  if (key == actualEndKey) {
9,246,779✔
380
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
260,849✔
381
    (*pRes) = true;
260,849✔
382
    return code;
260,849✔
383
  }
384

385
  if (nextRowIndex < 0) {
8,985,930!
386
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
387
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
388
  }
389

390
  TSKEY nextKey = tsCols[nextRowIndex];
8,985,930✔
391
  doTimeWindowInterpolation(pInfo->pPrevValues, pDataBlock, actualEndKey, endRowIndex, nextKey, nextRowIndex, key,
8,985,930✔
392
                            RESULT_ROW_END_INTERP, pSup);
393
  (*pRes) = true;
8,985,605✔
394
  return code;
8,985,605✔
395
}
396

397
bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd, EStreamType blockType) {
280,785,223✔
398
  if (pInterval->interval != pInterval->sliding &&
280,785,223✔
399
      ((pWin->ekey < calStart || pWin->skey > calEnd) || (blockType == STREAM_PULL_DATA && pWin->skey < calStart))) {
136,973,841!
400
    return false;
75✔
401
  }
402

403
  return true;
280,785,148✔
404
}
405

406
bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo) {
280,710,271✔
407
  return inCalSlidingWindow(pInterval, pWin, pBlockInfo->calWin.skey, pBlockInfo->calWin.ekey, pBlockInfo->type);
280,710,271✔
408
}
409

410
int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
269,935,121✔
411
                               TSKEY* primaryKeys, int32_t prevPosition, int32_t order) {
412
  bool ascQuery = (order == TSDB_ORDER_ASC);
269,935,121✔
413

414
  int32_t precision = pInterval->precision;
269,935,121✔
415
  getNextTimeWindow(pInterval, pNext, order);
269,935,121✔
416

417
  // next time window is not in current block
418
  if ((pNext->skey > pDataBlockInfo->window.ekey && order == TSDB_ORDER_ASC) ||
268,633,849!
419
      (pNext->ekey < pDataBlockInfo->window.skey && order == TSDB_ORDER_DESC)) {
265,750,397!
420
    return -1;
3,411,075✔
421
  }
422

423
  if (!inSlidingWindow(pInterval, pNext, pDataBlockInfo) && order == TSDB_ORDER_ASC) {
265,222,774!
424
    return -1;
30✔
425
  }
426

427
  TSKEY   skey = ascQuery ? pNext->skey : pNext->ekey;
265,190,934✔
428
  int32_t startPos = 0;
265,190,934✔
429

430
  // tumbling time window query, a special case of sliding time window query
431
  if (pInterval->sliding == pInterval->interval && prevPosition != -1) {
265,190,934!
432
    startPos = prevPosition + 1;
128,337,155✔
433
  } else {
434
    if ((skey <= pDataBlockInfo->window.skey && ascQuery) || (skey >= pDataBlockInfo->window.ekey && !ascQuery)) {
136,853,779✔
435
      startPos = 0;
2,863,418✔
436
    } else {
437
      startPos = binarySearchForKey((char*)primaryKeys, pDataBlockInfo->rows, skey, order);
133,990,361✔
438
    }
439
  }
440

441
  /* interp query with fill should not skip time window */
442
  //  if (pQueryAttr->pointInterpQuery && pQueryAttr->fillType != TSDB_FILL_NONE) {
443
  //    return startPos;
444
  //  }
445

446
  /*
447
   * This time window does not cover any data, try next time window,
448
   * this case may happen when the time window is too small
449
   */
450
  if (primaryKeys != NULL) {
263,560,182✔
451
    if (ascQuery && primaryKeys[startPos] > pNext->ekey) {
334,394,370✔
452
      TSKEY next = primaryKeys[startPos];
70,944,722✔
453
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
70,944,722!
454
        pNext->skey = taosTimeTruncate(next, pInterval);
2,879✔
455
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
×
456
      } else {
457
        pNext->ekey += ((next - pNext->ekey + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
70,941,843✔
458
        pNext->skey = pNext->ekey - pInterval->interval + 1;
70,941,843✔
459
      }
460
    } else if ((!ascQuery) && primaryKeys[startPos] < pNext->skey) {
192,476,346✔
461
      TSKEY next = primaryKeys[startPos];
31,114,587✔
462
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
31,114,587!
463
        pNext->skey = taosTimeTruncate(next, pInterval);
×
464
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
×
465
      } else {
466
        pNext->skey -= ((pNext->skey - next + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
31,128,361✔
467
        pNext->ekey = pNext->skey + pInterval->interval - 1;
31,128,361✔
468
      }
469
    }
470
  }
471

472
  return startPos;
263,602,536✔
473
}
474

475
static bool isResultRowInterpolated(SResultRow* pResult, SResultTsInterpType type) {
28,269,181✔
476
  if (type == RESULT_ROW_START_INTERP) {
28,269,181✔
477
    return pResult->startInterp == true;
9,423,657✔
478
  } else {
479
    return pResult->endInterp == true;
18,845,524✔
480
  }
481
}
482

483
static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
18,657,397✔
484
  if (type == RESULT_ROW_START_INTERP) {
18,657,397✔
485
    pResult->startInterp = true;
9,412,031✔
486
  } else {
487
    pResult->endInterp = true;
9,245,366✔
488
  }
489
}
18,657,397✔
490

491
static int32_t doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataBlock* pBlock, SResultRow* pResult,
251,079,372✔
492
                                           STimeWindow* win, int32_t startPos, int32_t forwardRows, SExprSupp* pSup) {
493
  int32_t code = TSDB_CODE_SUCCESS;
251,079,372✔
494
  int32_t lino = 0;
251,079,372✔
495
  if (!pInfo->timeWindowInterpo) {
251,079,372✔
496
    return code;
242,273,180✔
497
  }
498

499
  if (pBlock == NULL) {
8,806,192!
500
    code = TSDB_CODE_INVALID_PARA;
×
501
    return code;
×
502
  }
503

504
  if (pBlock->pDataBlock == NULL) {
8,806,192!
505
    return code;
×
506
  }
507

508
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
8,806,192✔
509

510
  TSKEY* tsCols = (TSKEY*)(pColInfo->pData);
9,423,656✔
511
  bool   done = isResultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
9,423,656✔
512
  if (!done) {  // it is not interpolated, now start to generated the interpolated value
9,423,628✔
513
    bool interp = setTimeWindowInterpolationStartTs(pInfo, startPos, pBlock, tsCols, win, pSup);
9,411,574✔
514
    if (interp) {
9,412,029!
515
      setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
9,412,034✔
516
    }
517
  } else {
518
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
12,054✔
519
  }
520

521
  // point interpolation does not require the end key time window interpolation.
522
  // interpolation query does not generate the time window end interpolation
523
  done = isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
9,424,058✔
524
  if (!done) {
9,424,077!
525
    int32_t endRowIndex = startPos + forwardRows - 1;
9,424,085✔
526
    int32_t nextRowIndex = endRowIndex + 1;
9,424,085✔
527

528
    // duplicated ts row does not involve in the interpolation of end value for current time window
529
    int32_t x = endRowIndex;
9,424,085✔
530
    while (x > 0) {
9,424,103✔
531
      if (tsCols[x] == tsCols[x - 1]) {
9,384,568✔
532
        x -= 1;
18✔
533
      } else {
534
        endRowIndex = x;
9,384,550✔
535
        break;
9,384,550✔
536
      }
537
    }
538

539
    TSKEY endKey = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey;
9,424,085!
540
    bool  interp = false;
9,424,085✔
541
    code = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, nextRowIndex, pBlock->pDataBlock, tsCols,
9,424,085✔
542
                                           endKey, win, &interp);
543
    QUERY_CHECK_CODE(code, lino, _end);
9,424,295!
544
    if (interp) {
9,424,295✔
545
      setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
9,246,411✔
546
    }
547
  } else {
548
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
×
549
  }
550

551
_end:
9,424,247✔
552
  if (code != TSDB_CODE_SUCCESS) {
9,424,247!
553
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
554
  }
555
  return code;
9,424,203✔
556
}
557

558
static void saveDataBlockLastRow(SArray* pPrevKeys, const SSDataBlock* pBlock, SArray* pCols) {
61,552✔
559
  if (pBlock->pDataBlock == NULL) {
61,552!
560
    return;
×
561
  }
562

563
  size_t num = taosArrayGetSize(pPrevKeys);
61,552✔
564
  for (int32_t k = 0; k < num; ++k) {
185,404✔
565
    SColumn* pc = taosArrayGet(pCols, k);
123,852✔
566

567
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pc->slotId);
123,852✔
568

569
    SGroupKeys* pkey = taosArrayGet(pPrevKeys, k);
123,852✔
570
    for (int32_t i = pBlock->info.rows - 1; i >= 0; --i) {
123,852!
571
      if (colDataIsNull_s(pColInfo, i)) {
247,704!
572
        continue;
×
573
      }
574

575
      char* val = colDataGetData(pColInfo, i);
123,852!
576
      if (IS_VAR_DATA_TYPE(pkey->type)) {
123,852!
577
        memcpy(pkey->pData, val, varDataTLen(val));
×
578
      } else {
579
        memcpy(pkey->pData, val, pkey->bytes);
123,852✔
580
      }
581

582
      break;
123,852✔
583
    }
584
  }
585
}
586

587
static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t numOfExprs, SResultRowInfo* pResultRowInfo,
61,552✔
588
                                       SSDataBlock* pBlock, int32_t scanFlag, int64_t* tsCols, SResultRowPosition* p) {
589
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
61,552✔
590

591
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
61,552✔
592
  SExprSupp*                pSup = &pOperatorInfo->exprSupp;
61,552✔
593

594
  int32_t startPos = 0;
61,552✔
595
  int32_t numOfOutput = pSup->numOfExprs;
61,552✔
596

597
  SResultRow* pResult = NULL;
61,552✔
598

599
  while (1) {
1✔
600
    SListNode*          pn = tdListGetHead(pResultRowInfo->openWindow);
61,553✔
601
    SOpenWindowInfo*    pOpenWin = (SOpenWindowInfo*)pn->data;
61,553✔
602
    uint64_t            groupId = pOpenWin->groupId;
61,553✔
603
    SResultRowPosition* p1 = &pOpenWin->pos;
61,553✔
604
    if (p->pageId == p1->pageId && p->offset == p1->offset) {
61,553!
605
      break;
61,552✔
606
    }
607

608
    SResultRow* pr = getResultRowByPos(pInfo->aggSup.pResultBuf, p1, false);
1✔
609
    if (NULL == pr) {
1!
610
      T_LONG_JMP(pTaskInfo->env, terrno);
×
611
    }
612

613
    if (!(pr->offset == p1->offset && pr->pageId == p1->pageId)) {
1!
614
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
615
      T_LONG_JMP(pTaskInfo->env, terrno);
×
616
    }
617

618
    if (pr->closed) {
1!
619
      if (!(isResultRowInterpolated(pr, RESULT_ROW_START_INTERP) &&
×
620
             isResultRowInterpolated(pr, RESULT_ROW_END_INTERP)) ) {
×
621
        pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
622
        T_LONG_JMP(pTaskInfo->env, terrno);
×
623
      }
624
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
×
625
      taosMemoryFree(pNode);
×
626
      continue;
×
627
    }
628

629
    STimeWindow w = pr->win;
1✔
630
    int32_t     ret = setTimeWindowOutputBuf(pResultRowInfo, &w, (scanFlag == MAIN_SCAN), &pResult, groupId, pSup->pCtx,
1✔
631
                                             numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
632
    if (ret != TSDB_CODE_SUCCESS) {
1!
633
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
×
634
    }
635

636
    if(isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
1!
637
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
638
      T_LONG_JMP(pTaskInfo->env, terrno);
×
639
    }
640

641
    SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
1✔
642
    if (!pTsKey) {
1!
643
      pTaskInfo->code = terrno;
×
644
      T_LONG_JMP(pTaskInfo->env, terrno);
×
645
    }
646

647
    int64_t     prevTs = *(int64_t*)pTsKey->pData;
1✔
648
    if (groupId == pBlock->info.id.groupId) {
1!
649
      TSKEY curTs = pBlock->info.window.skey;
1✔
650
      if (tsCols != NULL) {
1!
651
        curTs = tsCols[startPos];
1✔
652
      }
653
      doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, curTs, startPos, w.ekey,
1✔
654
                                RESULT_ROW_END_INTERP, pSup);
655
    }
656

657
    setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
1✔
658
    setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
1✔
659

660
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, 1);
1✔
661
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0,
1✔
662
                                          pBlock->info.rows, numOfExprs);
1✔
663
    if (ret != TSDB_CODE_SUCCESS) {
1!
664
      T_LONG_JMP(pTaskInfo->env, ret);
×
665
    }
666

667
    if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
1!
668
      closeResultRow(pr);
1✔
669
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
1✔
670
      taosMemoryFree(pNode);
1✔
671
    } else {  // the remains are can not be closed yet.
672
      break;
×
673
    }
674
  }
675
}
61,552✔
676

677
static bool tsKeyCompFn(void* l, void* r, void* param) {
12,553,392✔
678
  TSKEY*                    lTS = (TSKEY*)l;
12,553,392✔
679
  TSKEY*                    rTS = (TSKEY*)r;
12,553,392✔
680
  SIntervalAggOperatorInfo* pInfo = param;
12,553,392✔
681
  return pInfo->binfo.outputTsOrder == ORDER_ASC ? *lTS < *rTS : *lTS > *rTS;
12,553,392✔
682
}
683

684
static bool isCalculatedWin(SIntervalAggOperatorInfo* pInfo, const STimeWindow* win, uint64_t tableGroupId) {
3,567,338✔
685
  char keyBuf[sizeof(TSKEY) + sizeof(uint64_t)] = {0};
3,567,338✔
686
  SET_RES_WINDOW_KEY(keyBuf, (char*)&win->skey, sizeof(TSKEY), tableGroupId);
3,567,338✔
687
  return tSimpleHashGet(pInfo->aggSup.pResultRowHashTable, keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))) != NULL;
3,567,338✔
688
}
689

690
/**
691
 * @brief check if cur window should be filtered out by limit info
692
 * @retval true if should be filtered out
693
 * @retval false if not filtering out
694
 * @note If no limit info, we skip filtering.
695
 *       If input/output ts order mismatch, we skip filtering too.
696
 *       eg. input ts order: desc, and output ts order: asc, limit: 10
697
 *       IntervalOperator should output the first 10 windows, however, we can't find the first 10 windows until we scan
698
 *       every tuple in every block.
699
 *       And the boundedQueue keeps refreshing all records with smaller ts key.
700
 */
701
static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STimeWindow* win, uint64_t groupId, SExecTaskInfo* pTaskInfo) {
251,979,200✔
702
  int32_t code = TSDB_CODE_SUCCESS;
251,979,200✔
703
  int32_t lino = 0;
251,979,200✔
704
  if (!pOperatorInfo->limited  // if no limit info, no filter will be applied
251,979,200✔
705
      || pOperatorInfo->binfo.inputTsOrder != pOperatorInfo->binfo.outputTsOrder
11,337,169✔
706
      // if input/output ts order mismatch, no filter
707
  ) {
708
    return false;
248,291,301✔
709
  }
710

711
  if (pOperatorInfo->limit == 0) return true;
3,687,899✔
712

713
  if (pOperatorInfo->pBQ == NULL) {
3,687,283✔
714
    pOperatorInfo->pBQ = createBoundedQueue(pOperatorInfo->limit - 1, tsKeyCompFn, taosMemoryFree, pOperatorInfo);
22,253✔
715
    QUERY_CHECK_NULL(pOperatorInfo->pBQ, code, lino, _end, terrno);
22,258!
716
  }
717

718
  bool shouldFilter = false;
3,687,288✔
719
  // if BQ has been full, compare it with top of BQ
720
  if (taosBQSize(pOperatorInfo->pBQ) == taosBQMaxSize(pOperatorInfo->pBQ) + 1) {
3,687,288✔
721
    PriorityQueueNode* top = taosBQTop(pOperatorInfo->pBQ);
1,599,762✔
722
    shouldFilter = tsKeyCompFn(top->data, &win->skey, pOperatorInfo);
1,597,928✔
723
  }
724
  if (shouldFilter) {
3,665,562✔
725
    return true;
94,668✔
726
  } else if (isCalculatedWin(pOperatorInfo, win, groupId)) {
3,570,894✔
727
    return false;
2,084,151✔
728
  }
729

730
  // cur win not been filtered out and not been pushed into BQ yet, push it into BQ
731
  PriorityQueueNode node = {.data = taosMemoryMalloc(sizeof(TSKEY))};
1,580,222✔
732
  QUERY_CHECK_NULL(node.data, code, lino, _end, terrno);
1,623,270!
733

734
  *((TSKEY*)node.data) = win->skey;
1,623,270✔
735

736
  if (NULL == taosBQPush(pOperatorInfo->pBQ, &node)) {
1,623,270!
737
    taosMemoryFree(node.data);
×
738
    return true;
×
739
  }
740

741
_end:
1,505,966✔
742
  if (code != TSDB_CODE_SUCCESS) {
1,505,966!
743
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
744
    pTaskInfo->code = code;
×
745
    T_LONG_JMP(pTaskInfo->env, code);
×
746
  }
747
  return false;
1,505,966✔
748
}
749

750
static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
2,286,676✔
751
                            int32_t scanFlag) {
752
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
2,286,676✔
753

754
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
2,286,676✔
755
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
2,286,676✔
756

757
  int32_t     startPos = 0;
2,286,676✔
758
  int32_t     numOfOutput = pSup->numOfExprs;
2,286,676✔
759
  int64_t*    tsCols = extractTsCol(pBlock, pInfo, pTaskInfo);
2,286,676✔
760
  uint64_t    tableGroupId = pBlock->info.id.groupId;
2,286,556✔
761
  bool        ascScan = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
2,286,556✔
762
  TSKEY       ts = getStartTsKey(&pBlock->info.window, tsCols);
2,286,556✔
763
  SResultRow* pResult = NULL;
2,286,521✔
764

765
  if (tableGroupId != pInfo->curGroupId) {
2,286,521✔
766
    pInfo->handledGroupNum += 1;
117,942✔
767
    if (pInfo->slimited && pInfo->handledGroupNum > pInfo->slimit) {
117,942✔
768
      return true;
138✔
769
    } else {
770
      pInfo->curGroupId = tableGroupId;
117,804✔
771
      destroyBoundedQueue(pInfo->pBQ);
117,804✔
772
      pInfo->pBQ = NULL;
117,794✔
773
    }
774
  }
775

776
  STimeWindow win =
777
      getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->binfo.inputTsOrder);
2,286,373✔
778
  if (filterWindowWithLimit(pInfo, &win, tableGroupId, pTaskInfo)) return false;
2,286,317✔
779

780
  int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
2,199,846✔
781
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
782
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
2,199,839!
783
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
×
784
  }
785

786
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
2,199,841✔
787
  int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
2,199,841✔
788
                                                 pInfo->binfo.inputTsOrder);
789

790
  // prev time window not interpolation yet.
791
  if (pInfo->timeWindowInterpo) {
2,199,774✔
792
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
61,552✔
793
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
61,552✔
794

795
    // restore current time window
796
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
61,552✔
797
                                 numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
798
    if (ret != TSDB_CODE_SUCCESS) {
61,552!
799
      T_LONG_JMP(pTaskInfo->env, ret);
×
800
    }
801

802
    // window start key interpolation
803
    ret = doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup);
61,552✔
804
    if (ret != TSDB_CODE_SUCCESS) {
61,551!
805
      T_LONG_JMP(pTaskInfo->env, ret);
×
806
    }
807
  }
808

809
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, 1);
2,199,773✔
810
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
2,199,749✔
811
                                  pBlock->info.rows, numOfOutput);
2,199,749✔
812
  if (ret != TSDB_CODE_SUCCESS) {
2,199,807!
813
    T_LONG_JMP(pTaskInfo->env, ret);
×
814
  }
815

816
  doCloseWindow(pResultRowInfo, pInfo, pResult);
2,199,807✔
817

818
  STimeWindow nextWin = win;
2,199,776✔
819
  while (1) {
252,363,238✔
820
    int32_t prevEndPos = forwardRows - 1 + startPos;
254,563,014✔
821
    startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos,
254,563,014✔
822
                                      pInfo->binfo.inputTsOrder);
823
    if (startPos < 0 || filterWindowWithLimit(pInfo, &nextWin, tableGroupId, pTaskInfo)) {
251,376,634!
824
      break;
825
    }
826
    // null data, failed to allocate more memory buffer
827
    int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
249,952,427✔
828
                                          pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
829
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
250,191,195!
830
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
×
831
    }
832

833
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
250,283,064✔
834
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
250,283,064✔
835
                                           pInfo->binfo.inputTsOrder);
836
    // window start(end) key interpolation
837
    code = doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
249,713,198✔
838
    if (code != TSDB_CODE_SUCCESS) {
251,581,746!
839
      T_LONG_JMP(pTaskInfo->env, code);
×
840
    }
841
    // TODO: add to open window? how to close the open windows after input blocks exhausted?
842
#if 0
843
    if ((ascScan && ekey <= pBlock->info.window.ekey) ||
844
        (!ascScan && ekey >= pBlock->info.window.skey)) {
845
      // window start(end) key interpolation
846
      doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
847
    } else if (pInfo->timeWindowInterpo) {
848
      addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
849
    }
850
#endif
851
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, 1);
251,581,746✔
852
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
251,777,290✔
853
                                          pBlock->info.rows, numOfOutput);
251,777,290✔
854
    if (ret != TSDB_CODE_SUCCESS) {
252,439,464!
855
      T_LONG_JMP(pTaskInfo->env, ret);
×
856
    }
857
    doCloseWindow(pResultRowInfo, pInfo, pResult);
252,439,464✔
858
  }
859

860
  if (pInfo->timeWindowInterpo) {
1,940,738✔
861
    saveDataBlockLastRow(pInfo->pPrevValues, pBlock, pInfo->pInterpCols);
61,552✔
862
  }
863
  return false;
2,199,773✔
864
}
865

866
void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult) {
254,575,905✔
867
  // current result is done in computing final results.
868
  if (pInfo->timeWindowInterpo && isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
254,575,905✔
869
    closeResultRow(pResult);
9,246,831✔
870
    SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
9,246,800✔
871
    taosMemoryFree(pNode);
9,246,772✔
872
  }
873
}
254,575,790✔
874

875
SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId,
61,552✔
876
                                       SExecTaskInfo* pTaskInfo) {
877
  int32_t         code = TSDB_CODE_SUCCESS;
61,552✔
878
  int32_t         lino = 0;
61,552✔
879
  SOpenWindowInfo openWin = {0};
61,552✔
880
  openWin.pos.pageId = pResult->pageId;
61,552✔
881
  openWin.pos.offset = pResult->offset;
61,552✔
882
  openWin.groupId = groupId;
61,552✔
883
  SListNode* pn = tdListGetTail(pResultRowInfo->openWindow);
61,552✔
884
  if (pn == NULL) {
61,552✔
885
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
59,445✔
886
    QUERY_CHECK_CODE(code, lino, _end);
59,445!
887
    return openWin.pos;
59,445✔
888
  }
889

890
  SOpenWindowInfo* px = (SOpenWindowInfo*)pn->data;
2,107✔
891
  if (px->pos.pageId != openWin.pos.pageId || px->pos.offset != openWin.pos.offset || px->groupId != openWin.groupId) {
2,107!
892
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
1✔
893
    QUERY_CHECK_CODE(code, lino, _end);
1!
894
  }
895

896
_end:
2,107✔
897
  if (code != TSDB_CODE_SUCCESS) {
2,107!
898
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
899
    pTaskInfo->code = code;
×
900
    T_LONG_JMP(pTaskInfo->env, code);
×
901
  }
902
  return openWin.pos;
2,107✔
903
}
904

905
int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo) {
2,715,506✔
906
  TSKEY* tsCols = NULL;
2,715,506✔
907

908
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
2,715,506!
909
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
2,715,563✔
910
    if (!pColDataInfo) {
2,715,434!
911
      pTaskInfo->code = terrno;
×
912
      T_LONG_JMP(pTaskInfo->env, terrno);
×
913
    }
914

915
    tsCols = (int64_t*)pColDataInfo->pData;
2,715,434✔
916
    if(tsCols[0] == 0) {
2,715,434✔
917
      qWarn("%s at line %d.block start ts:%" PRId64 ",end ts:%" PRId64, __func__, __LINE__, tsCols[0], tsCols[pBlock->info.rows - 1]);
1!
918
    }
919

920
    if (tsCols[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
2,715,448!
921
      int32_t code = blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
850,453✔
922
      if (code != TSDB_CODE_SUCCESS) {
850,451!
923
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
924
        pTaskInfo->code = code;
×
925
        T_LONG_JMP(pTaskInfo->env, code);
×
926
      }
927
    }
928
  }
929

930
  return tsCols;
2,715,389✔
931
}
932

933
static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
1,957,879✔
934
  if (OPTR_IS_OPENED(pOperator)) {
1,957,879✔
935
    return TSDB_CODE_SUCCESS;
479,103✔
936
  }
937

938
  int32_t        code = TSDB_CODE_SUCCESS;
1,478,776✔
939
  int32_t        lino = 0;
1,478,776✔
940
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1,478,776✔
941
  SOperatorInfo* downstream = pOperator->pDownstream[0];
1,478,776✔
942

943
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
1,478,776✔
944
  SExprSupp*                pSup = &pOperator->exprSupp;
1,478,776✔
945

946
  int32_t scanFlag = MAIN_SCAN;
1,478,776✔
947
  int64_t st = taosGetTimestampUs();
1,481,942✔
948

949
  pInfo->cleanGroupResInfo = false;
1,481,942✔
950
  while (1) {
2,286,190✔
951
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
3,768,132✔
952
    if (pBlock == NULL) {
3,771,204✔
953
      break;
1,484,668✔
954
    }
955

956
    pInfo->binfo.pRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
2,286,536✔
957

958
    if (pInfo->scalarSupp.pExprInfo != NULL) {
2,286,536✔
959
      SExprSupp* pExprSup = &pInfo->scalarSupp;
105,786✔
960
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
105,786✔
961
      QUERY_CHECK_CODE(code, lino, _end);
105,778!
962
    }
963

964
    // the pDataBlock are always the same one, no need to call this again
965
    code = setInputDataBlock(pSup, pBlock, pInfo->binfo.inputTsOrder, scanFlag, true);
2,286,528✔
966
    QUERY_CHECK_CODE(code, lino, _end);
2,286,717!
967
    if (hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag)) break;
2,286,717✔
968
  }
969

970
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->binfo.outputTsOrder);
1,484,806✔
971
  QUERY_CHECK_CODE(code, lino, _end);
1,484,955!
972
  pInfo->cleanGroupResInfo = true;
1,484,955✔
973

974
  OPTR_SET_OPENED(pOperator);
1,484,955✔
975

976
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
1,485,187✔
977

978
_end:
1,485,187✔
979
  if (code != TSDB_CODE_SUCCESS) {
1,485,187!
980
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
981
    pTaskInfo->code = code;
×
982
    T_LONG_JMP(pTaskInfo->env, code);
×
983
  }
984
  return code;
1,485,187✔
985
}
986

987
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock) {
70,694✔
988
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
70,694✔
989
  SExprSupp*     pSup = &pOperator->exprSupp;
70,694✔
990

991
  SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->stateCol.slotId);
70,694✔
992
  if (!pStateColInfoData) {
70,694!
993
    pTaskInfo->code = terrno;
×
994
    T_LONG_JMP(pTaskInfo->env, terrno);
×
995
  }
996
  int64_t          gid = pBlock->info.id.groupId;
70,694✔
997

998
  bool    masterScan = true;
70,694✔
999
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
70,694✔
1000
  int32_t bytes = pStateColInfoData->info.bytes;
70,694✔
1001

1002
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
70,694✔
1003
  if (!pColInfoData) {
70,694!
1004
    pTaskInfo->code = terrno;
×
1005
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1006
  }
1007
  TSKEY*           tsList = (TSKEY*)pColInfoData->pData;
70,694✔
1008

1009
  SWindowRowsSup* pRowSup = &pInfo->winSup;
70,694✔
1010
  pRowSup->numOfRows = 0;
70,694✔
1011
  pRowSup->startRowIndex = 0;
70,694✔
1012

1013
  struct SColumnDataAgg* pAgg = NULL;
70,694✔
1014
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
22,522,194✔
1015
    pAgg = (pBlock->pBlockAgg != NULL) ? &pBlock->pBlockAgg[pInfo->stateCol.slotId] : NULL;
22,451,513!
1016
    if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pAgg)) {
44,903,026✔
1017
      continue;
7,930✔
1018
    }
1019

1020
    char* val = colDataGetData(pStateColInfoData, j);
22,443,583!
1021

1022
    if (gid != pRowSup->groupId || !pInfo->hasKey) {
22,443,583✔
1023
      // todo extract method
1024
      if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) {
61,399!
1025
        varDataCopy(pInfo->stateKey.pData, val);
×
1026
      } else {
1027
        memcpy(pInfo->stateKey.pData, val, bytes);
61,411✔
1028
      }
1029

1030
      pInfo->hasKey = true;
61,399✔
1031

1032
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
61,399✔
1033
      doKeepTuple(pRowSup, tsList[j], gid);
61,663✔
1034
    } else if (compareVal(val, &pInfo->stateKey)) {
22,382,184✔
1035
      doKeepTuple(pRowSup, tsList[j], gid);
2,389,883✔
1036
    } else {  // a new state window started
1037
      SResultRow* pResult = NULL;
19,992,440✔
1038

1039
      // keep the time window for the closed time window.
1040
      STimeWindow window = pRowSup->win;
19,992,440✔
1041

1042
      pRowSup->win.ekey = pRowSup->win.skey;
19,992,440✔
1043
      int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
19,992,440✔
1044
                                           numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1045
      if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
19,992,419!
1046
        T_LONG_JMP(pTaskInfo->env, ret);
×
1047
      }
1048

1049
      updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
19,992,419✔
1050
      ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData,
19,992,302✔
1051
                                            pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
19,992,302✔
1052
      if (ret != TSDB_CODE_SUCCESS) {
19,992,411!
1053
        T_LONG_JMP(pTaskInfo->env, ret);
×
1054
      }
1055

1056
      // here we start a new session window
1057
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
19,992,411✔
1058
      doKeepTuple(pRowSup, tsList[j], gid);
19,992,255✔
1059

1060
      // todo extract method
1061
      if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) {
19,992,191!
1062
        varDataCopy(pInfo->stateKey.pData, val);
×
1063
      } else {
1064
        memcpy(pInfo->stateKey.pData, val, bytes);
19,992,191✔
1065
      }
1066
    }
1067
  }
1068

1069
  SResultRow* pResult = NULL;
70,681✔
1070
  pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
70,681✔
1071
  int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
70,681✔
1072
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1073
  if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
70,694!
1074
    T_LONG_JMP(pTaskInfo->env, ret);
×
1075
  }
1076

1077
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
70,694✔
1078
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
70,694✔
1079
                                        pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
70,694✔
1080
  if (ret != TSDB_CODE_SUCCESS) {
70,694!
1081
    T_LONG_JMP(pTaskInfo->env, ret);
×
1082
  }
1083
}
70,694✔
1084

1085
static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) {
78,716✔
1086
  if (OPTR_IS_OPENED(pOperator)) {
78,716✔
1087
    return TSDB_CODE_SUCCESS;
3,717✔
1088
  }
1089

1090
  int32_t                   code = TSDB_CODE_SUCCESS;
74,999✔
1091
  int32_t                   lino = 0;
74,999✔
1092
  SStateWindowOperatorInfo* pInfo = pOperator->info;
74,999✔
1093
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
74,999✔
1094

1095
  SExprSupp* pSup = &pOperator->exprSupp;
74,999✔
1096
  int32_t    order = pInfo->binfo.inputTsOrder;
74,999✔
1097
  int64_t    st = taosGetTimestampUs();
74,999✔
1098

1099
  SOperatorInfo* downstream = pOperator->pDownstream[0];
74,999✔
1100
  pInfo->cleanGroupResInfo = false;
74,999✔
1101
  while (1) {
70,694✔
1102
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
145,693✔
1103
    if (pBlock == NULL) {
145,693✔
1104
      break;
74,999✔
1105
    }
1106

1107
    pInfo->binfo.pRes->info.scanFlag = pBlock->info.scanFlag;
70,694✔
1108
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
70,694✔
1109
    QUERY_CHECK_CODE(code, lino, _end);
70,694!
1110

1111
    code = blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
70,694✔
1112
    QUERY_CHECK_CODE(code, lino, _end);
70,694!
1113

1114
    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
1115
    if (pInfo->scalarSup.pExprInfo != NULL) {
70,694✔
1116
      pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
5,649✔
1117
                                              pInfo->scalarSup.numOfExprs, NULL);
1118
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
5,649!
1119
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1120
      }
1121
    }
1122

1123
    doStateWindowAggImpl(pOperator, pInfo, pBlock);
70,694✔
1124
  }
1125

1126
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
74,999✔
1127
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
74,999✔
1128
  QUERY_CHECK_CODE(code, lino, _end);
74,999!
1129
  pInfo->cleanGroupResInfo = true;
74,999✔
1130
  pOperator->status = OP_RES_TO_RETURN;
74,999✔
1131

1132
_end:
74,999✔
1133
  if (code != TSDB_CODE_SUCCESS) {
74,999!
1134
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1135
    pTaskInfo->code = code;
×
1136
    T_LONG_JMP(pTaskInfo->env, code);
×
1137
  }
1138
  return code;
74,999✔
1139
}
1140

1141
static int32_t doStateWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
137,104✔
1142
  if (pOperator->status == OP_EXEC_DONE) {
137,104✔
1143
    (*ppRes) = NULL;
58,388✔
1144
    return TSDB_CODE_SUCCESS;
58,388✔
1145
  }
1146

1147
  int32_t                   code = TSDB_CODE_SUCCESS;
78,716✔
1148
  int32_t                   lino = 0;
78,716✔
1149
  SStateWindowOperatorInfo* pInfo = pOperator->info;
78,716✔
1150
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
78,716✔
1151
  SOptrBasicInfo*           pBInfo = &pInfo->binfo;
78,716✔
1152

1153
  code = pOperator->fpSet._openFn(pOperator);
78,716✔
1154
  QUERY_CHECK_CODE(code, lino, _end);
78,716!
1155

1156
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
78,716✔
1157
  QUERY_CHECK_CODE(code, lino, _end);
78,716!
1158

1159
  while (1) {
×
1160
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
78,716✔
1161
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
78,715✔
1162
    QUERY_CHECK_CODE(code, lino, _end);
78,715!
1163

1164
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
78,715✔
1165
    if (!hasRemain) {
78,715✔
1166
      setOperatorCompleted(pOperator);
74,998✔
1167
      break;
74,998✔
1168
    }
1169

1170
    if (pBInfo->pRes->info.rows > 0) {
3,717!
1171
      break;
3,717✔
1172
    }
1173
  }
1174

1175
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
78,715✔
1176

1177
_end:
78,715✔
1178
  if (code != TSDB_CODE_SUCCESS) {
78,715!
1179
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1180
    pTaskInfo->code = code;
×
1181
    T_LONG_JMP(pTaskInfo->env, code);
×
1182
  }
1183
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
78,715✔
1184
  return code;
78,715✔
1185
}
1186

1187
static int32_t doBuildIntervalResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
2,809,237✔
1188
  int32_t                   code = TSDB_CODE_SUCCESS;
2,809,237✔
1189
  int32_t                   lino = 0;
2,809,237✔
1190
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
2,809,237✔
1191
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
2,809,237✔
1192

1193
  if (pOperator->status == OP_EXEC_DONE) {
2,809,237✔
1194
    (*ppRes) = NULL;
851,211✔
1195
    return code;
851,211✔
1196
  }
1197

1198
  SSDataBlock* pBlock = pInfo->binfo.pRes;
1,958,026✔
1199
  code = pOperator->fpSet._openFn(pOperator);
1,958,026✔
1200
  QUERY_CHECK_CODE(code, lino, _end);
1,964,143!
1201

1202
  while (1) {
15✔
1203
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
1,964,158✔
1204
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
1,964,170✔
1205
    QUERY_CHECK_CODE(code, lino, _end);
1,964,006!
1206

1207
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
1,964,006✔
1208
    if (!hasRemain) {
1,964,084✔
1209
      setOperatorCompleted(pOperator);
1,484,584✔
1210
      break;
1,484,738✔
1211
    }
1212

1213
    if (pBlock->info.rows > 0) {
479,500✔
1214
      break;
479,485✔
1215
    }
1216
  }
1217

1218
  size_t rows = pBlock->info.rows;
1,964,223✔
1219
  pOperator->resultInfo.totalRows += rows;
1,964,223✔
1220

1221
_end:
1,964,223✔
1222
  if (code != TSDB_CODE_SUCCESS) {
1,964,223!
1223
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1224
    pTaskInfo->code = code;
×
1225
    T_LONG_JMP(pTaskInfo->env, code);
×
1226
  }
1227
  (*ppRes) = (rows == 0) ? NULL : pBlock;
1,964,223✔
1228
  return code;
1,964,223✔
1229
}
1230

1231
static void destroyStateWindowOperatorInfo(void* param) {
74,999✔
1232
  if (param == NULL) {
74,999!
1233
    return;
×
1234
  }
1235
  SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
74,999✔
1236
  cleanupBasicInfo(&pInfo->binfo);
74,999✔
1237
  taosMemoryFreeClear(pInfo->stateKey.pData);
74,998!
1238
  if (pInfo->pOperator) {
74,998!
1239
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
74,998✔
1240
                      pInfo->cleanGroupResInfo);
74,998✔
1241
    pInfo->pOperator = NULL;
74,997✔
1242
  }
1243

1244
  cleanupExprSupp(&pInfo->scalarSup);
74,997✔
1245
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
74,997✔
1246
  cleanupAggSup(&pInfo->aggSup);
74,998✔
1247
  cleanupGroupResInfo(&pInfo->groupResInfo);
74,999✔
1248

1249
  taosMemoryFreeClear(param);
74,999!
1250
}
1251

1252
static void freeItem(void* param) {
182,483✔
1253
  SGroupKeys* pKey = (SGroupKeys*)param;
182,483✔
1254
  taosMemoryFree(pKey->pData);
182,483✔
1255
}
182,485✔
1256

1257
void destroyIntervalOperatorInfo(void* param) {
2,029,161✔
1258
  if (param == NULL) {
2,029,161!
1259
    return;
×
1260
  }
1261

1262
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
2,029,161✔
1263

1264
  cleanupBasicInfo(&pInfo->binfo);
2,029,161✔
1265
  if (pInfo->pOperator) {
2,029,254!
1266
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
2,029,257✔
1267
                      pInfo->cleanGroupResInfo);
2,029,257✔
1268
    pInfo->pOperator = NULL;
2,029,065✔
1269
  }
1270

1271
  cleanupAggSup(&pInfo->aggSup);
2,029,062✔
1272
  cleanupExprSupp(&pInfo->scalarSupp);
2,029,270✔
1273

1274
  pInfo->binfo.resultRowInfo.openWindow = tdListFree(pInfo->binfo.resultRowInfo.openWindow);
2,029,171✔
1275

1276
  taosArrayDestroy(pInfo->pInterpCols);
2,029,150✔
1277
  pInfo->pInterpCols = NULL;
2,029,145✔
1278

1279
  taosArrayDestroyEx(pInfo->pPrevValues, freeItem);
2,029,145✔
1280
  pInfo->pPrevValues = NULL;
2,029,094✔
1281

1282
  cleanupGroupResInfo(&pInfo->groupResInfo);
2,029,094✔
1283
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
2,029,225✔
1284
  destroyBoundedQueue(pInfo->pBQ);
2,029,276✔
1285
  taosMemoryFreeClear(param);
2,029,183!
1286
}
1287

1288
static int32_t timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SIntervalAggOperatorInfo* pInfo,
2,025,009✔
1289
                                   bool* pRes) {
1290
  // the primary timestamp column
1291
  bool    needed = false;
2,025,009✔
1292
  int32_t code = TSDB_CODE_SUCCESS;
2,025,009✔
1293
  int32_t lino = 0;
2,025,009✔
1294
  void*   tmp = NULL;
2,025,009✔
1295

1296
  for (int32_t i = 0; i < numOfCols; ++i) {
6,617,026✔
1297
    SExprInfo* pExpr = pCtx[i].pExpr;
4,681,572✔
1298
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
4,681,572✔
1299
      needed = true;
90,869✔
1300
      break;
90,869✔
1301
    }
1302
  }
1303

1304
  if (needed) {
2,026,323✔
1305
    pInfo->pInterpCols = taosArrayInit(4, sizeof(SColumn));
90,869✔
1306
    QUERY_CHECK_NULL(pInfo->pInterpCols, code, lino, _end, terrno);
90,869!
1307

1308
    pInfo->pPrevValues = taosArrayInit(4, sizeof(SGroupKeys));
90,869✔
1309
    QUERY_CHECK_NULL(pInfo->pPrevValues, code, lino, _end, terrno);
90,869!
1310

1311
    {  // ts column
1312
      SColumn c = {0};
90,869✔
1313
      c.colId = 1;
90,869✔
1314
      c.slotId = pInfo->primaryTsIndex;
90,869✔
1315
      c.type = TSDB_DATA_TYPE_TIMESTAMP;
90,869✔
1316
      c.bytes = sizeof(int64_t);
90,869✔
1317
      tmp = taosArrayPush(pInfo->pInterpCols, &c);
90,869✔
1318
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
90,869!
1319

1320
      SGroupKeys key;
1321
      key.bytes = c.bytes;
90,869✔
1322
      key.type = c.type;
90,869✔
1323
      key.isNull = true;  // to denote no value is assigned yet
90,869✔
1324
      key.pData = taosMemoryCalloc(1, c.bytes);
90,869✔
1325
      QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
90,869!
1326

1327
      tmp = taosArrayPush(pInfo->pPrevValues, &key);
90,869✔
1328
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
90,869!
1329
    }
1330
  }
1331

1332
  for (int32_t i = 0; i < numOfCols; ++i) {
6,705,882✔
1333
    SExprInfo* pExpr = pCtx[i].pExpr;
4,678,463✔
1334

1335
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
4,678,463✔
1336
      SFunctParam* pParam = &pExpr->base.pParam[0];
92,076✔
1337

1338
      SColumn c = *pParam->pCol;
92,076✔
1339
      tmp = taosArrayPush(pInfo->pInterpCols, &c);
92,076✔
1340
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
91,617!
1341

1342
      SGroupKeys key = {0};
91,617✔
1343
      key.bytes = c.bytes;
91,617✔
1344
      key.type = c.type;
91,617✔
1345
      key.isNull = false;
91,617✔
1346
      key.pData = taosMemoryCalloc(1, c.bytes);
91,617✔
1347
      QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
91,617!
1348

1349
      tmp = taosArrayPush(pInfo->pPrevValues, &key);
91,617✔
1350
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
91,617!
1351
    }
1352
  }
1353

1354
_end:
2,027,419✔
1355
  if (code != TSDB_CODE_SUCCESS) {
2,027,419!
1356
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1357
  }
1358
  *pRes = needed;
2,026,050✔
1359
  return code;
2,026,050✔
1360
}
1361

1362
int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
1,482,399✔
1363
                                   SOperatorInfo** pOptrInfo) {
1364
  QRY_PARAM_CHECK(pOptrInfo);
1,482,399!
1365

1366
  int32_t                   code = TSDB_CODE_SUCCESS;
1,482,399✔
1367
  int32_t                   lino = 0;
1,482,399✔
1368
  SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
1,482,399✔
1369
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1,483,060✔
1370
  if (pInfo == NULL || pOperator == NULL) {
1,483,748!
1371
    code = terrno;
15✔
1372
    lino = __LINE__;
×
1373
    goto _error;
×
1374
  }
1375

1376
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->window.node.pOutputDataBlockDesc);
1,483,733✔
1377
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
1,485,012!
1378
  initBasicInfo(&pInfo->binfo, pResBlock);
1,485,012✔
1379

1380
  SExprSupp* pSup = &pOperator->exprSupp;
1,484,497✔
1381
  pSup->hasWindowOrGroup = true;
1,484,497✔
1382

1383
  pInfo->primaryTsIndex = ((SColumnNode*)pPhyNode->window.pTspk)->slotId;
1,484,497✔
1384

1385
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
1,484,497✔
1386
  initResultSizeInfo(&pOperator->resultInfo, 512);
1,484,497✔
1387
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
1,484,794✔
1388
  QUERY_CHECK_CODE(code, lino, _error);
1,483,998!
1389

1390
  int32_t    num = 0;
1,483,998✔
1391
  SExprInfo* pExprInfo = NULL;
1,483,998✔
1392
  code = createExprInfo(pPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
1,483,998✔
1393
  QUERY_CHECK_CODE(code, lino, _error);
1,484,913!
1394

1395
  code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
1,484,913✔
1396
                    &pTaskInfo->storageAPI.functionStore);
1397
  QUERY_CHECK_CODE(code, lino, _error);
1,484,064!
1398

1399
  SInterval interval = {.interval = pPhyNode->interval,
1,484,064✔
1400
                        .sliding = pPhyNode->sliding,
1,484,064✔
1401
                        .intervalUnit = pPhyNode->intervalUnit,
1,484,064✔
1402
                        .slidingUnit = pPhyNode->slidingUnit,
1,484,064✔
1403
                        .offset = pPhyNode->offset,
1,484,064✔
1404
                        .precision = ((SColumnNode*)pPhyNode->window.pTspk)->node.resType.precision,
1,484,064✔
1405
                        .timeRange = pPhyNode->timeRange};
1406
  calcIntervalAutoOffset(&interval);
1,484,064✔
1407

1408
  STimeWindowAggSupp as = {
1,483,770✔
1409
      .waterMark = pPhyNode->window.watermark,
1,483,770✔
1410
      .calTrigger = pPhyNode->window.triggerType,
1,483,770✔
1411
      .maxTs = INT64_MIN,
1412
  };
1413

1414
  pInfo->win = pTaskInfo->window;
1,483,770✔
1415
  pInfo->binfo.inputTsOrder = pPhyNode->window.node.inputTsOrder;
1,483,770✔
1416
  pInfo->binfo.outputTsOrder = pPhyNode->window.node.outputTsOrder;
1,483,770✔
1417
  pInfo->interval = interval;
1,483,770✔
1418
  pInfo->twAggSup = as;
1,483,770✔
1419
  pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock;
1,483,770✔
1420
  if (pPhyNode->window.node.pLimit) {
1,483,770✔
1421
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pLimit;
43,496✔
1422
    pInfo->limited = true;
43,496✔
1423
    pInfo->limit = pLimit->limit + pLimit->offset;
43,496✔
1424
  }
1425
  if (pPhyNode->window.node.pSlimit) {
1,483,770✔
1426
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pSlimit;
560✔
1427
    pInfo->slimited = true;
560✔
1428
    pInfo->slimit = pLimit->limit + pLimit->offset;
560✔
1429
    pInfo->curGroupId = UINT64_MAX;
560✔
1430
  }
1431

1432
  if (pPhyNode->window.pExprs != NULL) {
1,483,770✔
1433
    int32_t    numOfScalar = 0;
3,673✔
1434
    SExprInfo* pScalarExprInfo = NULL;
3,673✔
1435
    code = createExprInfo(pPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
3,673✔
1436
    QUERY_CHECK_CODE(code, lino, _error);
3,674!
1437

1438
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
3,674✔
1439
    if (code != TSDB_CODE_SUCCESS) {
3,671!
1440
      goto _error;
×
1441
    }
1442
  }
1443

1444
  code = filterInitFromNode((SNode*)pPhyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
1,483,768✔
1445
  if (code != TSDB_CODE_SUCCESS) {
1,482,377!
1446
    goto _error;
×
1447
  }
1448

1449
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
1,482,377✔
1450
  QUERY_CHECK_CODE(code, lino, _error);
1,483,144!
1451

1452
  pInfo->timeWindowInterpo = false;
1,483,144✔
1453
  code = timeWindowinterpNeeded(pSup->pCtx, num, pInfo, &pInfo->timeWindowInterpo);
1,483,144✔
1454
  QUERY_CHECK_CODE(code, lino, _error);
1,482,521!
1455
  if (pInfo->timeWindowInterpo) {
1,482,521✔
1456
    pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
90,868✔
1457
    if (pInfo->binfo.resultRowInfo.openWindow == NULL) {
90,868!
1458
      goto _error;
×
1459
    }
1460
  }
1461

1462
  pInfo->pOperator = pOperator;
1,482,521✔
1463
  pInfo->cleanGroupResInfo = false;
1,482,521✔
1464
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
1,482,521✔
1465
  setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED,
1,481,220✔
1466
                  pInfo, pTaskInfo);
1467

1468
  pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResultNext, NULL, destroyIntervalOperatorInfo,
1,480,538✔
1469
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1470

1471
  code = appendDownstream(pOperator, &downstream, 1);
1,480,575✔
1472
  if (code != TSDB_CODE_SUCCESS) {
1,483,242!
1473
    goto _error;
×
1474
  }
1475

1476
  *pOptrInfo = pOperator;
1,483,242✔
1477
  return TSDB_CODE_SUCCESS;
1,483,242✔
1478

1479
_error:
×
1480
  if (pInfo != NULL) {
×
1481
    destroyIntervalOperatorInfo(pInfo);
×
1482
  }
1483

1484
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1485
  pTaskInfo->code = code;
×
1486
  qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code));
×
1487
  return code;
×
1488
}
1489

1490
// todo handle multiple timeline cases. assume no timeline interweaving
1491
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) {
112,713✔
1492
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
112,713✔
1493
  SExprSupp*     pSup = &pOperator->exprSupp;
112,713✔
1494

1495
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
112,713✔
1496
  if (!pColInfoData) {
112,713!
1497
    pTaskInfo->code = terrno;
×
1498
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1499
  }
1500

1501
  bool    masterScan = true;
112,713✔
1502
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
112,713✔
1503
  int64_t gid = pBlock->info.id.groupId;
112,713✔
1504

1505
  int64_t gap = pInfo->gap;
112,713✔
1506

1507
  if (!pInfo->reptScan) {
112,713✔
1508
    pInfo->reptScan = true;
84,395✔
1509
    pInfo->winSup.prevTs = INT64_MIN;
84,395✔
1510
  }
1511

1512
  SWindowRowsSup* pRowSup = &pInfo->winSup;
112,713✔
1513
  pRowSup->numOfRows = 0;
112,713✔
1514
  pRowSup->startRowIndex = 0;
112,713✔
1515

1516
  // In case of ascending or descending order scan data, only one time window needs to be kepted for each table.
1517
  TSKEY* tsList = (TSKEY*)pColInfoData->pData;
112,713✔
1518
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
34,178,051✔
1519
    if (gid != pRowSup->groupId || pInfo->winSup.prevTs == INT64_MIN) {
34,065,336✔
1520
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
104,529✔
1521
      doKeepTuple(pRowSup, tsList[j], gid);
104,534✔
1522
    } else if (((tsList[j] - pRowSup->prevTs >= 0) && (tsList[j] - pRowSup->prevTs <= gap)) ||
33,960,807✔
1523
               ((pRowSup->prevTs - tsList[j] >= 0) && (pRowSup->prevTs - tsList[j] <= gap))) {
8,222,938!
1524
      // The gap is less than the threshold, so it belongs to current session window that has been opened already.
1525
      doKeepTuple(pRowSup, tsList[j], gid);
25,737,889✔
1526
    } else {  // start a new session window
1527
      // start a new session window
1528
      if (pRowSup->numOfRows > 0) {  // handled data that belongs to the previous session window
8,222,918✔
1529
        SResultRow* pResult = NULL;
8,219,940✔
1530

1531
        // keep the time window for the closed time window.
1532
        STimeWindow window = pRowSup->win;
8,219,940✔
1533

1534
        int32_t ret =
1535
            setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
8,219,940✔
1536
                                   numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1537
        if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
8,219,946!
1538
          T_LONG_JMP(pTaskInfo->env, ret);
×
1539
        }
1540

1541
        // pInfo->numOfRows data belong to the current session window
1542
        updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
8,219,946✔
1543
        ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
8,219,941✔
1544
                                              pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
8,219,941✔
1545
        if (ret != TSDB_CODE_SUCCESS) {
8,220,068!
1546
          T_LONG_JMP(pTaskInfo->env, ret);
×
1547
        }
1548
      }
1549

1550
      // here we start a new session window
1551
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
8,223,046✔
1552
      doKeepTuple(pRowSup, tsList[j], gid);
8,223,014✔
1553
    }
1554
  }
1555

1556
  SResultRow* pResult = NULL;
112,715✔
1557
  pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
112,715✔
1558
  int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
112,715✔
1559
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1560
  if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
112,713!
1561
    T_LONG_JMP(pTaskInfo->env, ret);
×
1562
  }
1563

1564
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
112,713✔
1565
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
112,713✔
1566
                                        pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
112,713✔
1567
  if (ret != TSDB_CODE_SUCCESS) {
112,713!
1568
    T_LONG_JMP(pTaskInfo->env, ret);
×
1569
  }
1570
}
112,713✔
1571

1572
static int32_t doSessionWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
203,182✔
1573
  if (pOperator->status == OP_EXEC_DONE) {
203,182✔
1574
    (*ppRes) = NULL;
82,957✔
1575
    return TSDB_CODE_SUCCESS;
82,957✔
1576
  }
1577

1578
  int32_t                  code = TSDB_CODE_SUCCESS;
120,225✔
1579
  int32_t                  lino = 0;
120,225✔
1580
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
120,225✔
1581
  SSessionAggOperatorInfo* pInfo = pOperator->info;
120,225✔
1582
  SOptrBasicInfo*          pBInfo = &pInfo->binfo;
120,225✔
1583
  SExprSupp*               pSup = &pOperator->exprSupp;
120,225✔
1584

1585
  pInfo->cleanGroupResInfo = false;
120,225✔
1586
  if (pOperator->status == OP_RES_TO_RETURN) {
120,225✔
1587
    while (1) {
×
1588
      doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
20,139✔
1589
      code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
20,139✔
1590
      QUERY_CHECK_CODE(code, lino, _end);
20,139!
1591

1592
      bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
20,139✔
1593
      if (!hasRemain) {
20,139✔
1594
        setOperatorCompleted(pOperator);
3,502✔
1595
        break;
3,502✔
1596
      }
1597

1598
      if (pBInfo->pRes->info.rows > 0) {
16,637!
1599
        break;
16,637✔
1600
      }
1601
    }
1602
    pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
20,139✔
1603
    (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
20,139!
1604
    return code;
20,139✔
1605
  }
1606

1607
  int64_t st = taosGetTimestampUs();
100,093✔
1608
  int32_t order = pInfo->binfo.inputTsOrder;
100,093✔
1609

1610
  SOperatorInfo* downstream = pOperator->pDownstream[0];
100,093✔
1611

1612
  while (1) {
112,713✔
1613
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
212,806✔
1614
    if (pBlock == NULL) {
212,808✔
1615
      break;
100,096✔
1616
    }
1617

1618
    pBInfo->pRes->info.scanFlag = pBlock->info.scanFlag;
112,712✔
1619
    if (pInfo->scalarSupp.pExprInfo != NULL) {
112,712✔
1620
      SExprSupp* pExprSup = &pInfo->scalarSupp;
3✔
1621
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
3✔
1622
      QUERY_CHECK_CODE(code, lino, _end);
3!
1623
    }
1624
    // the pDataBlock are always the same one, no need to call this again
1625
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
112,712✔
1626
    QUERY_CHECK_CODE(code, lino, _end);
112,713!
1627

1628
    code = blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
112,713✔
1629
    QUERY_CHECK_CODE(code, lino, _end);
112,713!
1630

1631
    doSessionWindowAggImpl(pOperator, pInfo, pBlock);
112,713✔
1632
  }
1633

1634
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
100,096✔
1635

1636
  // restore the value
1637
  pOperator->status = OP_RES_TO_RETURN;
100,096✔
1638

1639
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
100,096✔
1640
  QUERY_CHECK_CODE(code, lino, _end);
100,097!
1641
  pInfo->cleanGroupResInfo = true;
100,097✔
1642

1643
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
100,097✔
1644
  QUERY_CHECK_CODE(code, lino, _end);
100,097!
1645
  while (1) {
×
1646
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
100,097✔
1647
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
100,097✔
1648
    QUERY_CHECK_CODE(code, lino, _end);
100,097!
1649

1650
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
100,097✔
1651
    if (!hasRemain) {
100,097✔
1652
      setOperatorCompleted(pOperator);
96,595✔
1653
      break;
96,595✔
1654
    }
1655

1656
    if (pBInfo->pRes->info.rows > 0) {
3,502!
1657
      break;
3,502✔
1658
    }
1659
  }
1660
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
100,097✔
1661

1662
_end:
100,097✔
1663
  if (code != TSDB_CODE_SUCCESS) {
100,097!
1664
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1665
    pTaskInfo->code = code;
×
1666
    T_LONG_JMP(pTaskInfo->env, code);
×
1667
  }
1668
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
100,097✔
1669
  return code;
100,097✔
1670
}
1671

1672
// todo make this as an non-blocking operator
1673
int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode,
74,999✔
1674
                                      SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
1675
  QRY_PARAM_CHECK(pOptrInfo);
74,999!
1676

1677
  int32_t                   code = TSDB_CODE_SUCCESS;
74,999✔
1678
  int32_t                   lino = 0;
74,999✔
1679
  SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
74,999✔
1680
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
74,999✔
1681
  if (pInfo == NULL || pOperator == NULL) {
74,999!
1682
    code = terrno;
×
1683
    goto _error;
×
1684
  }
1685

1686
  pOperator->exprSupp.hasWindowOrGroup = true;
74,999✔
1687
  int32_t      tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
74,999✔
1688
  SColumnNode* pColNode = (SColumnNode*)(pStateNode->pStateKey);
74,999✔
1689

1690
  if (pStateNode->window.pExprs != NULL) {
74,999✔
1691
    int32_t    numOfScalarExpr = 0;
4,969✔
1692
    SExprInfo* pScalarExprInfo = NULL;
4,969✔
1693
    code = createExprInfo(pStateNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
4,969✔
1694
    QUERY_CHECK_CODE(code, lino, _error);
4,969!
1695

1696
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
4,969✔
1697
    if (code != TSDB_CODE_SUCCESS) {
4,969!
1698
      goto _error;
×
1699
    }
1700
  }
1701

1702
  pInfo->stateCol = extractColumnFromColumnNode(pColNode);
74,999✔
1703
  pInfo->stateKey.type = pInfo->stateCol.type;
74,999✔
1704
  pInfo->stateKey.bytes = pInfo->stateCol.bytes;
74,999✔
1705
  pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes);
74,999✔
1706
  if (pInfo->stateKey.pData == NULL) {
74,999!
1707
    goto _error;
×
1708
  }
1709
  pInfo->binfo.inputTsOrder = pStateNode->window.node.inputTsOrder;
74,999✔
1710
  pInfo->binfo.outputTsOrder = pStateNode->window.node.outputTsOrder;
74,999✔
1711

1712
  code = filterInitFromNode((SNode*)pStateNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
74,999✔
1713
  if (code != TSDB_CODE_SUCCESS) {
74,999!
1714
    goto _error;
×
1715
  }
1716

1717
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
74,999✔
1718

1719
  int32_t    num = 0;
74,999✔
1720
  SExprInfo* pExprInfo = NULL;
74,999✔
1721
  code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &num);
74,999✔
1722
  QUERY_CHECK_CODE(code, lino, _error);
74,999!
1723

1724
  initResultSizeInfo(&pOperator->resultInfo, 4096);
74,999✔
1725

1726
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
74,999✔
1727
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
74,999✔
1728
  if (code != TSDB_CODE_SUCCESS) {
74,999!
1729
    goto _error;
×
1730
  }
1731

1732
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pStateNode->window.node.pOutputDataBlockDesc);
74,999✔
1733
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
74,999!
1734
  initBasicInfo(&pInfo->binfo, pResBlock);
74,999✔
1735
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
74,999✔
1736

1737
  pInfo->twAggSup =
74,998✔
1738
      (STimeWindowAggSupp){.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};
74,998✔
1739

1740
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
74,998✔
1741
  QUERY_CHECK_CODE(code, lino, _error);
74,999!
1742

1743
  pInfo->tsSlotId = tsSlotId;
74,999✔
1744
  pInfo->pOperator = pOperator;
74,999✔
1745
  pInfo->cleanGroupResInfo = false;
74,999✔
1746
  setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
74,999✔
1747
                  pTaskInfo);
1748
  pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAggNext, NULL, destroyStateWindowOperatorInfo,
74,999✔
1749
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1750

1751
  code = appendDownstream(pOperator, &downstream, 1);
74,999✔
1752
  if (code != TSDB_CODE_SUCCESS) {
74,999!
1753
    goto _error;
×
1754
  }
1755

1756
  *pOptrInfo = pOperator;
74,999✔
1757
  return TSDB_CODE_SUCCESS;
74,999✔
1758

1759
_error:
×
1760
  if (pInfo != NULL) {
×
1761
    destroyStateWindowOperatorInfo(pInfo);
×
1762
  }
1763

1764
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1765
  pTaskInfo->code = code;
×
1766
  return code;
×
1767
}
1768

1769
void destroySWindowOperatorInfo(void* param) {
100,096✔
1770
  SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param;
100,096✔
1771
  if (pInfo == NULL) {
100,096!
1772
    return;
×
1773
  }
1774

1775
  cleanupBasicInfo(&pInfo->binfo);
100,096✔
1776
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
100,097✔
1777
  if (pInfo->pOperator) {
100,097!
1778
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
100,097✔
1779
                      pInfo->cleanGroupResInfo);
100,097✔
1780
    pInfo->pOperator = NULL;
100,096✔
1781
  }
1782

1783
  cleanupAggSup(&pInfo->aggSup);
100,096✔
1784
  cleanupExprSupp(&pInfo->scalarSupp);
100,097✔
1785

1786
  cleanupGroupResInfo(&pInfo->groupResInfo);
100,097✔
1787
  taosMemoryFreeClear(param);
100,097!
1788
}
1789

1790
int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
100,087✔
1791
                                     SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
1792
  QRY_PARAM_CHECK(pOptrInfo);
100,087!
1793

1794
  int32_t                  code = TSDB_CODE_SUCCESS;
100,087✔
1795
  int32_t                  lino = 0;
100,087✔
1796
  SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
100,087✔
1797
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
100,093✔
1798
  if (pInfo == NULL || pOperator == NULL) {
100,094!
1799
    code = terrno;
×
1800
    goto _error;
×
1801
  }
1802

1803
  pOperator->exprSupp.hasWindowOrGroup = true;
100,094✔
1804

1805
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
100,094✔
1806
  initResultSizeInfo(&pOperator->resultInfo, 4096);
100,094✔
1807

1808
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc);
100,097✔
1809
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
100,097!
1810
  initBasicInfo(&pInfo->binfo, pResBlock);
100,097✔
1811

1812
  int32_t      numOfCols = 0;
100,097✔
1813
  SExprInfo*   pExprInfo = NULL;
100,097✔
1814
  code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
100,097✔
1815
  QUERY_CHECK_CODE(code, lino, _error);
100,097!
1816

1817
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
100,097✔
1818
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
100,097✔
1819
  QUERY_CHECK_CODE(code, lino, _error);
100,096!
1820

1821
  pInfo->twAggSup.waterMark = pSessionNode->window.watermark;
100,096✔
1822
  pInfo->twAggSup.calTrigger = pSessionNode->window.triggerType;
100,096✔
1823
  pInfo->gap = pSessionNode->gap;
100,096✔
1824

1825
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
100,096✔
1826
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
100,095✔
1827
  QUERY_CHECK_CODE(code, lino, _error);
100,097!
1828

1829
  pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
100,097✔
1830
  pInfo->binfo.pRes = pResBlock;
100,097✔
1831
  pInfo->winSup.prevTs = INT64_MIN;
100,097✔
1832
  pInfo->reptScan = false;
100,097✔
1833
  pInfo->binfo.inputTsOrder = pSessionNode->window.node.inputTsOrder;
100,097✔
1834
  pInfo->binfo.outputTsOrder = pSessionNode->window.node.outputTsOrder;
100,097✔
1835

1836
  if (pSessionNode->window.pExprs != NULL) {
100,097✔
1837
    int32_t    numOfScalar = 0;
1✔
1838
    SExprInfo* pScalarExprInfo = NULL;
1✔
1839
    code = createExprInfo(pSessionNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
1✔
1840
    QUERY_CHECK_CODE(code, lino, _error);
1!
1841

1842
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
1✔
1843
    QUERY_CHECK_CODE(code, lino, _error);
1!
1844
  }
1845

1846
  code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
100,097✔
1847
  QUERY_CHECK_CODE(code, lino, _error);
100,092!
1848

1849
  pInfo->pOperator = pOperator;
100,092✔
1850
  pInfo->cleanGroupResInfo = false;
100,092✔
1851
  setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED,
100,092✔
1852
                  pInfo, pTaskInfo);
1853
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSessionWindowAggNext, NULL, destroySWindowOperatorInfo,
100,094✔
1854
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1855
  pOperator->pTaskInfo = pTaskInfo;
100,092✔
1856
  code = appendDownstream(pOperator, &downstream, 1);
100,092✔
1857
  QUERY_CHECK_CODE(code, lino, _error);
100,096!
1858

1859
  *pOptrInfo = pOperator;
100,096✔
1860
  return TSDB_CODE_SUCCESS;
100,096✔
1861

1862
_error:
×
1863
  if (pInfo != NULL) destroySWindowOperatorInfo(pInfo);
×
1864
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1865
  pTaskInfo->code = code;
×
1866
  return code;
×
1867
}
1868

1869
void destroyMAIOperatorInfo(void* param) {
543,951✔
1870
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = (SMergeAlignedIntervalAggOperatorInfo*)param;
543,951✔
1871
  destroyIntervalOperatorInfo(miaInfo->intervalAggOperatorInfo);
543,951✔
1872
  taosMemoryFreeClear(param);
543,956!
1873
}
543,955✔
1874

1875
static SResultRow* doSetSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, SAggSupporter* pSup) {
346,505✔
1876
  SResultRow* pResult = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
346,505✔
1877
  if (NULL == pResult) {
346,505!
1878
    return pResult;
×
1879
  }
1880
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
346,505✔
1881
  return pResult;
346,505✔
1882
}
1883

1884
static int32_t setSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult,
70,192,799✔
1885
                                       SExprSupp* pExprSup, SAggSupporter* pAggSup) {
1886
  if (*pResult == NULL) {
70,192,799✔
1887
    *pResult = doSetSingleOutputTupleBuf(pResultRowInfo, pAggSup);
346,505✔
1888
    if (*pResult == NULL) {
346,504!
1889
      return terrno;
×
1890
    }
1891
  }
1892

1893
  // set time window for current result
1894
  (*pResult)->win = (*win);
70,192,798✔
1895
  return setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
70,192,798✔
1896
}
1897

1898
static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,
428,861✔
1899
                                          SSDataBlock* pBlock, SSDataBlock* pResultBlock) {
1900
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
428,861✔
1901
  SIntervalAggOperatorInfo*             iaInfo = miaInfo->intervalAggOperatorInfo;
428,861✔
1902

1903
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
428,861✔
1904
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
428,861✔
1905
  SInterval*     pInterval = &iaInfo->interval;
428,861✔
1906

1907
  int32_t  startPos = 0;
428,861✔
1908
  int64_t* tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
428,861✔
1909

1910
  TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
428,858✔
1911

1912
  // there is an result exists
1913
  if (miaInfo->curTs != INT64_MIN) {
428,857✔
1914
    if (ts != miaInfo->curTs) {
59,794✔
1915
      finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
43,139✔
1916
      resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
43,139✔
1917
      miaInfo->curTs = ts;
43,139✔
1918
    }
1919
  } else {
1920
    miaInfo->curTs = ts;
369,063✔
1921
  }
1922

1923
  STimeWindow win = {0};
428,857✔
1924
  win.skey = miaInfo->curTs;
428,857✔
1925
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
428,857✔
1926

1927
  int32_t ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
428,859✔
1928
  if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
428,857!
1929
    T_LONG_JMP(pTaskInfo->env, ret);
×
1930
  }
1931

1932
  int32_t currPos = startPos;
428,857✔
1933

1934
  STimeWindow currWin = win;
428,857✔
1935
  while (++currPos < pBlock->info.rows) {
116,709,745✔
1936
    if (tsCols[currPos] == miaInfo->curTs) {
116,284,061✔
1937
      continue;
46,529,226✔
1938
    }
1939

1940
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
69,754,835✔
1941
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
69,758,234✔
1942
                                          currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
69,758,234✔
1943
    if (ret != TSDB_CODE_SUCCESS) {
69,771,583!
1944
      T_LONG_JMP(pTaskInfo->env, ret);
×
1945
    }
1946

1947
    finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
69,771,583✔
1948
    resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
69,666,097✔
1949
    miaInfo->curTs = tsCols[currPos];
69,705,870✔
1950

1951
    currWin.skey = miaInfo->curTs;
69,705,870✔
1952
    currWin.ekey = taosTimeAdd(currWin.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
69,705,870✔
1953

1954
    startPos = currPos;
69,756,020✔
1955
    ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
69,756,020✔
1956
    if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
69,750,803!
1957
      T_LONG_JMP(pTaskInfo->env, ret);
×
1958
    }
1959

1960
    miaInfo->curTs = currWin.skey;
69,751,662✔
1961
  }
1962

1963
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
425,684✔
1964
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
428,862✔
1965
                                        currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
428,862✔
1966
  if (ret != TSDB_CODE_SUCCESS) {
428,860!
1967
    T_LONG_JMP(pTaskInfo->env, ret);
×
1968
  }
1969
}
428,860✔
1970

1971
static void cleanupAfterGroupResultGen(SMergeAlignedIntervalAggOperatorInfo* pMiaInfo, SSDataBlock* pRes) {
368,608✔
1972
  pRes->info.id.groupId = pMiaInfo->groupId;
368,608✔
1973
  pMiaInfo->curTs = INT64_MIN;
368,608✔
1974
  pMiaInfo->groupId = 0;
368,608✔
1975
}
368,608✔
1976

1977
static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
628,634✔
1978
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
628,634✔
1979
  int32_t                               code = TSDB_CODE_SUCCESS;
628,634✔
1980
  int32_t                               lino = 0;
628,634✔
1981
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
628,634✔
1982
  SIntervalAggOperatorInfo*             pIaInfo = pMiaInfo->intervalAggOperatorInfo;
628,634✔
1983

1984
  SExprSupp*      pSup = &pOperator->exprSupp;
628,634✔
1985
  SSDataBlock*    pRes = pIaInfo->binfo.pRes;
628,634✔
1986
  SResultRowInfo* pResultRowInfo = &pIaInfo->binfo.resultRowInfo;
628,634✔
1987
  SOperatorInfo*  downstream = pOperator->pDownstream[0];
628,634✔
1988

1989
  while (1) {
366,281✔
1990
    SSDataBlock* pBlock = NULL;
994,915✔
1991
    if (pMiaInfo->prefetchedBlock == NULL) {
994,915✔
1992
      pBlock = getNextBlockFromDownstream(pOperator, 0);
972,354✔
1993
    } else {
1994
      pBlock = pMiaInfo->prefetchedBlock;
22,561✔
1995
      pMiaInfo->prefetchedBlock = NULL;
22,561✔
1996

1997
      pMiaInfo->groupId = pBlock->info.id.groupId;
22,561✔
1998
    }
1999

2000
    // no data exists, all query processing is done
2001
    if (pBlock == NULL) {
994,909✔
2002
      // close last unclosed time window
2003
      if (pMiaInfo->curTs != INT64_MIN) {
543,494✔
2004
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
346,046✔
2005
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
346,047✔
2006
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
346,047✔
2007
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
346,047✔
2008
        QUERY_CHECK_CODE(code, lino, _end);
346,047!
2009
      }
2010

2011
      setOperatorCompleted(pOperator);
543,495✔
2012
      break;
543,496✔
2013
    }
2014

2015
    if (pMiaInfo->groupId == 0) {
451,415✔
2016
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
398,393✔
2017
        pMiaInfo->groupId = pBlock->info.id.groupId;
2,271✔
2018
        pRes->info.id.groupId = pMiaInfo->groupId;
2,271✔
2019
      }
2020
    } else {
2021
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
53,022✔
2022
        // if there are unclosed time window, close it firstly.
2023
        if (pMiaInfo->curTs == INT64_MIN) {
22,561!
2024
          pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2025
          T_LONG_JMP(pTaskInfo->env, terrno);
×
2026
        }
2027
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
22,561✔
2028
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
22,561✔
2029

2030
        pMiaInfo->prefetchedBlock = pBlock;
22,561✔
2031
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
22,561✔
2032
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
22,561✔
2033
        QUERY_CHECK_CODE(code, lino, _end);
22,561!
2034
        if (pRes->info.rows == 0) {
22,561✔
2035
          // After filtering for last group, the result is empty, so we need to continue to process next group
2036
          continue;
34✔
2037
        } else {
2038
          break;
22,527✔
2039
        }
2040
      } else {
2041
        // continue
2042
        pRes->info.id.groupId = pMiaInfo->groupId;
30,461✔
2043
      }
2044
    }
2045

2046
    pRes->info.scanFlag = pBlock->info.scanFlag;
428,854✔
2047
    code = setInputDataBlock(pSup, pBlock, pIaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
428,854✔
2048
    QUERY_CHECK_CODE(code, lino, _end);
428,861!
2049

2050
    doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes);
428,861✔
2051

2052
    code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
428,860✔
2053
    QUERY_CHECK_CODE(code, lino, _end);
428,863!
2054

2055
    if (pRes->info.rows >= pOperator->resultInfo.capacity) {
428,863✔
2056
      break;
62,616✔
2057
    }
2058
  }
2059

2060
_end:
628,639✔
2061
  if (code != TSDB_CODE_SUCCESS) {
628,639!
2062
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2063
    pTaskInfo->code = code;
×
2064
    T_LONG_JMP(pTaskInfo->env, code);
×
2065
  }
2066
}
628,639✔
2067

2068
static int32_t mergeAlignedIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,251,003✔
2069
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
1,251,003✔
2070
  int32_t                               code = TSDB_CODE_SUCCESS;
1,251,003✔
2071
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
1,251,003✔
2072
  SIntervalAggOperatorInfo*             iaInfo = pMiaInfo->intervalAggOperatorInfo;
1,251,003✔
2073
  if (pOperator->status == OP_EXEC_DONE) {
1,251,003✔
2074
    (*ppRes) = NULL;
627,490✔
2075
    return code;
627,490✔
2076
  }
2077

2078
  SSDataBlock* pRes = iaInfo->binfo.pRes;
623,513✔
2079
  blockDataCleanup(pRes);
623,513✔
2080

2081
  if (iaInfo->binfo.mergeResultBlock) {
623,522✔
2082
    while (1) {
2083
      if (pOperator->status == OP_EXEC_DONE) {
516,078✔
2084
        break;
204,410✔
2085
      }
2086

2087
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
311,668✔
2088
        break;
51,072✔
2089
      }
2090

2091
      doMergeAlignedIntervalAgg(pOperator);
260,596✔
2092
    }
2093
  } else {
2094
    doMergeAlignedIntervalAgg(pOperator);
368,041✔
2095
  }
2096

2097
  size_t rows = pRes->info.rows;
623,525✔
2098
  pOperator->resultInfo.totalRows += rows;
623,525✔
2099
  (*ppRes) = (rows == 0) ? NULL : pRes;
623,525✔
2100
  return code;
623,525✔
2101
}
2102

2103
int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
543,949✔
2104
                                                      SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2105
  QRY_PARAM_CHECK(pOptrInfo);
543,949!
2106

2107
  int32_t                               code = TSDB_CODE_SUCCESS;
543,949✔
2108
  int32_t                               lino = 0;
543,949✔
2109
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedIntervalAggOperatorInfo));
543,949✔
2110
  SOperatorInfo*                        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
543,954✔
2111
  if (miaInfo == NULL || pOperator == NULL) {
543,948!
2112
    code = terrno;
2✔
2113
    goto _error;
×
2114
  }
2115

2116
  miaInfo->intervalAggOperatorInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
543,946✔
2117
  if (miaInfo->intervalAggOperatorInfo == NULL) {
543,951!
2118
    code = terrno;
×
2119
    goto _error;
×
2120
  }
2121

2122
  SInterval interval = {.interval = pNode->interval,
543,951✔
2123
                        .sliding = pNode->sliding,
543,951✔
2124
                        .intervalUnit = pNode->intervalUnit,
543,951✔
2125
                        .slidingUnit = pNode->slidingUnit,
543,951✔
2126
                        .offset = pNode->offset,
543,951✔
2127
                        .precision = ((SColumnNode*)pNode->window.pTspk)->node.resType.precision,
543,951✔
2128
                        .timeRange = pNode->timeRange};
2129
  calcIntervalAutoOffset(&interval);
543,951✔
2130

2131
  SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
543,952✔
2132
  SExprSupp*                pSup = &pOperator->exprSupp;
543,952✔
2133
  pSup->hasWindowOrGroup = true;
543,952✔
2134

2135
  code = filterInitFromNode((SNode*)pNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
543,952✔
2136
  QUERY_CHECK_CODE(code, lino, _error);
543,952!
2137

2138
  miaInfo->curTs = INT64_MIN;
543,952✔
2139
  iaInfo->win = pTaskInfo->window;
543,952✔
2140
  iaInfo->binfo.inputTsOrder = pNode->window.node.inputTsOrder;
543,952✔
2141
  iaInfo->binfo.outputTsOrder = pNode->window.node.outputTsOrder;
543,952✔
2142
  iaInfo->interval = interval;
543,952✔
2143
  iaInfo->primaryTsIndex = ((SColumnNode*)pNode->window.pTspk)->slotId;
543,952✔
2144
  iaInfo->binfo.mergeResultBlock = pNode->window.mergeDataBlock;
543,952✔
2145

2146
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
543,952✔
2147
  initResultSizeInfo(&pOperator->resultInfo, 512);
543,952✔
2148

2149
  int32_t    num = 0;
543,948✔
2150
  SExprInfo* pExprInfo = NULL;
543,948✔
2151
  code = createExprInfo(pNode->window.pFuncs, NULL, &pExprInfo, &num);
543,948✔
2152
  QUERY_CHECK_CODE(code, lino, _error);
543,957!
2153

2154
  code = initAggSup(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
543,957✔
2155
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
543,957✔
2156
  QUERY_CHECK_CODE(code, lino, _error);
543,954!
2157

2158
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pNode->window.node.pOutputDataBlockDesc);
543,954✔
2159
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
543,956!
2160
  initBasicInfo(&iaInfo->binfo, pResBlock);
543,956✔
2161
  code = initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
543,951✔
2162
  QUERY_CHECK_CODE(code, lino, _error);
543,956!
2163

2164
  iaInfo->timeWindowInterpo = false;
543,956✔
2165
  code = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo, &iaInfo->timeWindowInterpo);
543,956✔
2166
  QUERY_CHECK_CODE(code, lino, _error);
543,949!
2167
  if (iaInfo->timeWindowInterpo) {
543,949!
2168
    iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
2169
  }
2170

2171
  initResultRowInfo(&iaInfo->binfo.resultRowInfo);
543,949✔
2172
  code = blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity);
543,942✔
2173
  QUERY_CHECK_CODE(code, lino, _error);
543,953!
2174
  iaInfo->pOperator = pOperator;
543,953✔
2175
  setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL,
543,953✔
2176
                  false, OP_NOT_OPENED, miaInfo, pTaskInfo);
2177

2178
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAggNext, NULL, destroyMAIOperatorInfo,
543,956✔
2179
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2180

2181
  code = appendDownstream(pOperator, &downstream, 1);
543,955✔
2182
  QUERY_CHECK_CODE(code, lino, _error);
543,957!
2183

2184
  *pOptrInfo = pOperator;
543,957✔
2185
  return TSDB_CODE_SUCCESS;
543,957✔
2186

2187
_error:
×
2188
  if (miaInfo != NULL) destroyMAIOperatorInfo(miaInfo);
×
2189
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2190
  pTaskInfo->code = code;
×
2191
  return code;
×
2192
}
2193

2194
//=====================================================================================================================
2195
// merge interval operator
2196
typedef struct SMergeIntervalAggOperatorInfo {
2197
  SIntervalAggOperatorInfo intervalAggOperatorInfo;
2198
  SList*                   groupIntervals;
2199
  SListIter                groupIntervalsIter;
2200
  bool                     hasGroupId;
2201
  uint64_t                 groupId;
2202
  SSDataBlock*             prefetchedBlock;
2203
  bool                     inputBlocksFinished;
2204
} SMergeIntervalAggOperatorInfo;
2205

2206
typedef struct SGroupTimeWindow {
2207
  uint64_t    groupId;
2208
  STimeWindow window;
2209
} SGroupTimeWindow;
2210

2211
void destroyMergeIntervalOperatorInfo(void* param) {
×
2212
  SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
×
2213
  miaInfo->groupIntervals = tdListFree(miaInfo->groupIntervals);
×
2214
  destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo);
×
2215

2216
  taosMemoryFreeClear(param);
×
2217
}
×
2218

2219
static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock,
×
2220
                                        STimeWindow* newWin) {
2221
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
2222
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2223
  bool                           ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
2224

2225
  SGroupTimeWindow groupTimeWindow = {.groupId = tableGroupId, .window = *newWin};
×
2226
  int32_t          code = tdListAppend(miaInfo->groupIntervals, &groupTimeWindow);
×
2227
  if (code != TSDB_CODE_SUCCESS) {
×
2228
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2229
    return code;
×
2230
  }
2231

2232
  SListIter iter = {0};
×
2233
  tdListInitIter(miaInfo->groupIntervals, &iter, TD_LIST_FORWARD);
×
2234
  SListNode* listNode = NULL;
×
2235
  while ((listNode = tdListNext(&iter)) != NULL) {
×
2236
    SGroupTimeWindow* prevGrpWin = (SGroupTimeWindow*)listNode->data;
×
2237
    if (prevGrpWin->groupId != tableGroupId) {
×
2238
      continue;
×
2239
    }
2240

2241
    STimeWindow* prevWin = &prevGrpWin->window;
×
2242
    if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) {
×
2243
      SListNode* tmp = tdListPopNode(miaInfo->groupIntervals, listNode);
×
2244
      taosMemoryFreeClear(tmp);
×
2245
    }
2246
  }
2247

2248
  return TSDB_CODE_SUCCESS;
×
2249
}
2250

2251
static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
×
2252
                                   int32_t scanFlag, SSDataBlock* pResultBlock) {
2253
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
2254
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2255

2256
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
×
2257
  SExprSupp*     pExprSup = &pOperatorInfo->exprSupp;
×
2258

2259
  int32_t     startPos = 0;
×
2260
  int32_t     numOfOutput = pExprSup->numOfExprs;
×
2261
  int64_t*    tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
×
2262
  uint64_t    tableGroupId = pBlock->info.id.groupId;
×
2263
  bool        ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
2264
  TSKEY       blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
×
2265
  SResultRow* pResult = NULL;
×
2266

2267
  STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
×
2268
                                        iaInfo->binfo.inputTsOrder);
2269

2270
  int32_t ret =
2271
      setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
2272
                             numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2273
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
×
2274
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
×
2275
  }
2276

2277
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
×
2278
  int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
2279
                                                 iaInfo->binfo.inputTsOrder);
2280
  if(forwardRows <= 0) {
×
2281
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2282
  }
2283

2284
  // prev time window not interpolation yet.
2285
  if (iaInfo->timeWindowInterpo) {
×
2286
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
×
2287
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
×
2288

2289
    // restore current time window
2290
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
2291
                                 numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2292
    if (ret != TSDB_CODE_SUCCESS) {
×
2293
      T_LONG_JMP(pTaskInfo->env, ret);
×
2294
    }
2295

2296
    // window start key interpolation
2297
    ret = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &win, startPos, forwardRows, pExprSup);
×
2298
    if (ret != TSDB_CODE_SUCCESS) {
×
2299
      T_LONG_JMP(pTaskInfo->env, ret);
×
2300
    }
2301
  }
2302

2303
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, 1);
×
2304
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,
×
2305
                                  pBlock->info.rows, numOfOutput);
×
2306
  if (ret != TSDB_CODE_SUCCESS) {
×
2307
    T_LONG_JMP(pTaskInfo->env, ret);
×
2308
  }
2309
  doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
2310

2311
  // output previous interval results after this interval (&win) is closed
2312
  int32_t code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &win);
×
2313
  if (code != TSDB_CODE_SUCCESS) {
×
2314
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2315
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
×
2316
  }
2317

2318
  STimeWindow nextWin = win;
×
2319
  while (1) {
×
2320
    int32_t prevEndPos = forwardRows - 1 + startPos;
×
2321
    startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos,
×
2322
                                      iaInfo->binfo.inputTsOrder);
2323
    if (startPos < 0) {
×
2324
      break;
×
2325
    }
2326

2327
    // null data, failed to allocate more memory buffer
2328
    code =
2329
        setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
×
2330
                               pExprSup->pCtx, numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2331
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
×
2332
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
×
2333
    }
2334

2335
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
×
2336
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
2337
                                           iaInfo->binfo.inputTsOrder);
2338

2339
    // window start(end) key interpolation
2340
    code = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup);
×
2341
    if (code != TSDB_CODE_SUCCESS) {
×
2342
      T_LONG_JMP(pTaskInfo->env, code);
×
2343
    }
2344

2345
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, 1);
×
2346
    code = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
×
2347
                                           forwardRows, pBlock->info.rows, numOfOutput);
×
2348
    if (code != TSDB_CODE_SUCCESS) {
×
2349
      T_LONG_JMP(pTaskInfo->env, code);
×
2350
    }
2351
    doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
2352

2353
    // output previous interval results after this interval (&nextWin) is closed
2354
    code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &nextWin);
×
2355
    if (code != TSDB_CODE_SUCCESS) {
×
2356
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2357
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
×
2358
    }
2359
  }
2360

2361
  if (iaInfo->timeWindowInterpo) {
×
2362
    saveDataBlockLastRow(iaInfo->pPrevValues, pBlock, iaInfo->pInterpCols);
×
2363
  }
2364
}
×
2365

2366
static int32_t doMergeIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
2367
  int32_t        code = TSDB_CODE_SUCCESS;
×
2368
  int32_t        lino = 0;
×
2369
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
×
2370

2371
  SMergeIntervalAggOperatorInfo* miaInfo = pOperator->info;
×
2372
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2373
  SExprSupp*                     pExpSupp = &pOperator->exprSupp;
×
2374

2375
  if (pOperator->status == OP_EXEC_DONE) {
×
2376
    (*ppRes) = NULL;
×
2377
    return code;
×
2378
  }
2379

2380
  SSDataBlock* pRes = iaInfo->binfo.pRes;
×
2381
  blockDataCleanup(pRes);
×
2382
  code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
×
2383
  QUERY_CHECK_CODE(code, lino, _end);
×
2384

2385
  if (!miaInfo->inputBlocksFinished) {
×
2386
    SOperatorInfo* downstream = pOperator->pDownstream[0];
×
2387
    while (1) {
×
2388
      SSDataBlock* pBlock = NULL;
×
2389
      if (miaInfo->prefetchedBlock == NULL) {
×
2390
        pBlock = getNextBlockFromDownstream(pOperator, 0);
×
2391
      } else {
2392
        pBlock = miaInfo->prefetchedBlock;
×
2393
        miaInfo->groupId = pBlock->info.id.groupId;
×
2394
        miaInfo->prefetchedBlock = NULL;
×
2395
      }
2396

2397
      if (pBlock == NULL) {
×
2398
        tdListInitIter(miaInfo->groupIntervals, &miaInfo->groupIntervalsIter, TD_LIST_FORWARD);
×
2399
        miaInfo->inputBlocksFinished = true;
×
2400
        break;
×
2401
      }
2402

2403
      if (!miaInfo->hasGroupId) {
×
2404
        miaInfo->hasGroupId = true;
×
2405
        miaInfo->groupId = pBlock->info.id.groupId;
×
2406
      } else if (miaInfo->groupId != pBlock->info.id.groupId) {
×
2407
        miaInfo->prefetchedBlock = pBlock;
×
2408
        break;
×
2409
      }
2410

2411
      pRes->info.scanFlag = pBlock->info.scanFlag;
×
2412
      code = setInputDataBlock(pExpSupp, pBlock, iaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
×
2413
      QUERY_CHECK_CODE(code, lino, _end);
×
2414

2415
      doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, pBlock->info.scanFlag, pRes);
×
2416

2417
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
×
2418
        break;
×
2419
      }
2420
    }
2421

2422
    pRes->info.id.groupId = miaInfo->groupId;
×
2423
  }
2424

2425
  if (miaInfo->inputBlocksFinished) {
×
2426
    SListNode* listNode = tdListNext(&miaInfo->groupIntervalsIter);
×
2427

2428
    if (listNode != NULL) {
×
2429
      SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data);
×
2430
      pRes->info.id.groupId = grpWin->groupId;
×
2431
    }
2432
  }
2433

2434
  if (pRes->info.rows == 0) {
×
2435
    setOperatorCompleted(pOperator);
×
2436
  }
2437

2438
  size_t rows = pRes->info.rows;
×
2439
  pOperator->resultInfo.totalRows += rows;
×
2440

2441
_end:
×
2442
  if (code != TSDB_CODE_SUCCESS) {
×
2443
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2444
    pTaskInfo->code = code;
×
2445
    T_LONG_JMP(pTaskInfo->env, code);
×
2446
  }
2447
  (*ppRes) = (rows == 0) ? NULL : pRes;
×
2448
  return code;
×
2449
}
2450

2451
int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
×
2452
                                               SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2453
  QRY_PARAM_CHECK(pOptrInfo);
×
2454

2455
  int32_t                        code = TSDB_CODE_SUCCESS;
×
2456
  int32_t                        lino = 0;
×
2457
  SMergeIntervalAggOperatorInfo* pMergeIntervalInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo));
×
2458
  SOperatorInfo*                 pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
2459
  if (pMergeIntervalInfo == NULL || pOperator == NULL) {
×
2460
    code = terrno;
×
2461
    goto _error;
×
2462
  }
2463

2464
  SInterval interval = {.interval = pIntervalPhyNode->interval,
×
2465
                        .sliding = pIntervalPhyNode->sliding,
×
2466
                        .intervalUnit = pIntervalPhyNode->intervalUnit,
×
2467
                        .slidingUnit = pIntervalPhyNode->slidingUnit,
×
2468
                        .offset = pIntervalPhyNode->offset,
×
2469
                        .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision,
×
2470
                        .timeRange = pIntervalPhyNode->timeRange};
2471
  calcIntervalAutoOffset(&interval);
×
2472

2473
  pMergeIntervalInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow));
×
2474

2475
  SIntervalAggOperatorInfo* pIntervalInfo = &pMergeIntervalInfo->intervalAggOperatorInfo;
×
2476
  pIntervalInfo->win = pTaskInfo->window;
×
2477
  pIntervalInfo->binfo.inputTsOrder = pIntervalPhyNode->window.node.inputTsOrder;
×
2478
  pIntervalInfo->interval = interval;
×
2479
  pIntervalInfo->binfo.mergeResultBlock = pIntervalPhyNode->window.mergeDataBlock;
×
2480
  pIntervalInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
×
2481
  pIntervalInfo->binfo.outputTsOrder = pIntervalPhyNode->window.node.outputTsOrder;
×
2482

2483
  SExprSupp* pExprSupp = &pOperator->exprSupp;
×
2484
  pExprSupp->hasWindowOrGroup = true;
×
2485

2486
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
×
2487
  initResultSizeInfo(&pOperator->resultInfo, 4096);
×
2488

2489
  int32_t    num = 0;
×
2490
  SExprInfo* pExprInfo = NULL;
×
2491
  code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
×
2492
  QUERY_CHECK_CODE(code, lino, _error);
×
2493

2494
  code = initAggSup(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
×
2495
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
×
2496
  if (code != TSDB_CODE_SUCCESS) {
×
2497
    goto _error;
×
2498
  }
2499

2500
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pIntervalPhyNode->window.node.pOutputDataBlockDesc);
×
2501
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
×
2502
  initBasicInfo(&pIntervalInfo->binfo, pResBlock);
×
2503
  code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win);
×
2504
  QUERY_CHECK_CODE(code, lino, _error);
×
2505

2506
  pIntervalInfo->timeWindowInterpo = false;
×
2507
  code = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo, &pIntervalInfo->timeWindowInterpo);
×
2508
  QUERY_CHECK_CODE(code, lino, _error);
×
2509
  if (pIntervalInfo->timeWindowInterpo) {
×
2510
    pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
2511
    if (pIntervalInfo->binfo.resultRowInfo.openWindow == NULL) {
×
2512
      goto _error;
×
2513
    }
2514
  }
2515

2516
  pIntervalInfo->pOperator = pOperator;
×
2517
  initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo);
×
2518
  setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false,
×
2519
                  OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo);
2520
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAggNext, NULL, destroyMergeIntervalOperatorInfo,
×
2521
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2522

2523
  code = appendDownstream(pOperator, &downstream, 1);
×
2524
  if (code != TSDB_CODE_SUCCESS) {
×
2525
    goto _error;
×
2526
  }
2527

2528
  *pOptrInfo = pOperator;
×
2529
  return TSDB_CODE_SUCCESS;
×
2530
_error:
×
2531
  if (pMergeIntervalInfo != NULL) {
×
2532
    destroyMergeIntervalOperatorInfo(pMergeIntervalInfo);
×
2533
  }
2534
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2535
  pTaskInfo->code = code;
×
2536
  return code;
×
2537
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc