• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4747

21 Sep 2025 11:53PM UTC coverage: 58.002% (-1.1%) from 59.065%
#4747

push

travis-ci

web-flow
fix: refine python taos error log matching in checkAsan.sh (#33029)

* fix: refine python taos error log matching in checkAsan.sh

* fix: improve python taos error log matching in checkAsan.sh

133398 of 293157 branches covered (45.5%)

Branch coverage included in aggregate %.

201778 of 284713 relevant lines covered (70.87%)

5539418.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.82
/source/libs/executor/src/timewindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "executorInt.h"
16
#include "filter.h"
17
#include "function.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "taoserror.h"
23
#include "tchecksum.h"
24
#include "tcommon.h"
25
#include "tcompare.h"
26
#include "tdatablock.h"
27
#include "tfill.h"
28
#include "tglobal.h"
29
#include "tlog.h"
30
#include "ttime.h"
31

32
typedef enum SResultTsInterpType {
33
  RESULT_ROW_START_INTERP = 1,
34
  RESULT_ROW_END_INTERP = 2,
35
} SResultTsInterpType;
36

37
typedef struct SOpenWindowInfo {
38
  SResultRowPosition pos;
39
  uint64_t           groupId;
40
} SOpenWindowInfo;
41

42
static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo);
43

44
static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult,
45
                                              uint64_t groupId, SExecTaskInfo* pTaskInfo);
46
static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult);
47

48
static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, bool masterscan,
47,136,678✔
49
                                      SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
50
                                      int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
51
                                      SExecTaskInfo* pTaskInfo) {
52
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
47,136,678✔
53
                                                  masterscan, tableGroupId, pTaskInfo, true, pAggSup, true);
54

55
  if (pResultRow == NULL || pTaskInfo->code != 0) {
47,450,420!
56
    *pResult = NULL;
×
57
    qError("failed to set result output buffer, error:%s", tstrerror(pTaskInfo->code));
×
58
    return pTaskInfo->code;
×
59
  }
60

61
  // set time window for current result
62
  TAOS_SET_POBJ_ALIGNED(&pResultRow->win, win);
47,456,856✔
63
  *pResult = pResultRow;
47,456,856✔
64
  return setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
47,456,856✔
65
}
66

67
void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) {
1,161,837✔
68
  pRowSup->win.ekey = ts;
1,161,837✔
69
  pRowSup->prevTs = ts;
1,161,837✔
70
  pRowSup->numOfRows += 1;
1,161,837✔
71
  pRowSup->groupId = groupId;
1,161,837✔
72
}
1,161,837✔
73

74
void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, uint64_t groupId) {
476,625✔
75
  pRowSup->startRowIndex = rowIndex;
476,625✔
76
  pRowSup->numOfRows = 0;
476,625✔
77
  pRowSup->win.skey = tsList[rowIndex];
476,625✔
78
  pRowSup->groupId = groupId;
476,625✔
79
}
476,625✔
80

81
FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos,
×
82
                                            int32_t order, int64_t* pData) {
83
  int32_t forwardRows = 0;
45,855,722✔
84

85
  if (order == TSDB_ORDER_ASC) {
×
86
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
45,523,408✔
87
    if (end >= 0) {
44,874,253!
88
      forwardRows = end;
44,900,787✔
89

90
      while (pData[end + pos] == ekey) {
44,915,147!
91
        forwardRows += 1;
14,360✔
92
        ++pos;
14,360✔
93
      }
94
    }
95
  } else {
96
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
332,314✔
97
    if (end >= 0) {
403,969!
98
      forwardRows = end;
403,973✔
99

100
      while (pData[end + pos] == ekey) {
764,764!
101
        forwardRows += 1;
360,791✔
102
        ++pos;
360,791✔
103
      }
104
    }
105
    //    int32_t end = searchFn((char*)pData, pos + 1, ekey, order);
106
    //    if (end >= 0) {
107
    //      forwardRows = pos - end;
108
    //
109
    //      if (pData[end] == ekey) {
110
    //        forwardRows += 1;
111
    //      }
112
    //    }
113
  }
114

115
  return forwardRows;
45,278,222✔
116
}
117

118
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
45,914,249✔
119
  int32_t midPos = -1;
45,914,249✔
120
  int32_t numOfRows;
121

122
  if (num <= 0) {
45,914,249!
123
    return -1;
×
124
  }
125

126
  TSKEY*  keyList = (TSKEY*)pValue;
45,914,249✔
127
  int32_t firstPos = 0;
45,914,249✔
128
  int32_t lastPos = num - 1;
45,914,249✔
129

130
  if (order == TSDB_ORDER_DESC) {
45,914,249✔
131
    // find the first position which is smaller than the key
132
    while (1) {
133
      if (key >= keyList[firstPos]) return firstPos;
1,473,239✔
134
      if (key == keyList[lastPos]) return lastPos;
1,167,703✔
135

136
      if (key < keyList[lastPos]) {
1,143,101✔
137
        lastPos += 1;
37,543✔
138
        if (lastPos >= num) {
37,543!
139
          return -1;
×
140
        } else {
141
          return lastPos;
37,543✔
142
        }
143
      }
144

145
      numOfRows = lastPos - firstPos + 1;
1,105,558✔
146
      midPos = (numOfRows >> 1) + firstPos;
1,105,558✔
147

148
      if (key < keyList[midPos]) {
1,105,558✔
149
        firstPos = midPos + 1;
163,397✔
150
      } else if (key > keyList[midPos]) {
942,161✔
151
        lastPos = midPos - 1;
902,659✔
152
      } else {
153
        break;
39,502✔
154
      }
155
    }
156

157
  } else {
158
    // find the first position which is bigger than the key
159
    while (1) {
160
      if (key <= keyList[firstPos]) return firstPos;
375,529,780✔
161
      if (key == keyList[lastPos]) return lastPos;
359,801,558✔
162

163
      if (key > keyList[lastPos]) {
359,798,233✔
164
        lastPos = lastPos + 1;
29,635,073✔
165
        if (lastPos >= num)
29,635,073!
166
          return -1;
×
167
        else
168
          return lastPos;
29,635,073✔
169
      }
170

171
      numOfRows = lastPos - firstPos + 1;
330,163,160✔
172
      midPos = (numOfRows >> 1u) + firstPos;
330,163,160✔
173

174
      if (key < keyList[midPos]) {
330,163,160✔
175
        lastPos = midPos - 1;
291,039,266✔
176
      } else if (key > keyList[midPos]) {
39,123,894✔
177
        firstPos = midPos + 1;
38,983,448✔
178
      } else {
179
        break;
140,446✔
180
      }
181
    }
182
  }
183

184
  return midPos;
179,948✔
185
}
186

187
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
46,571,134✔
188
                                 __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order) {
189
  int32_t num = -1;
46,571,134✔
190
  int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
46,571,134✔
191

192
  if (order == TSDB_ORDER_ASC) {
46,571,134✔
193
    if (ekey < pDataBlockInfo->window.ekey && pPrimaryColumn) {
46,175,592!
194
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
45,451,914!
195
      if (item != NULL) {
44,874,253!
196
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
197
      }
198
    } else {
199
      num = pDataBlockInfo->rows - startPos;
723,678✔
200
      if (item != NULL) {
723,678!
201
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
202
      }
203
    }
204
  } else {  // desc
205
    if (ekey > pDataBlockInfo->window.skey && pPrimaryColumn) {
395,542!
206
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
403,808!
207
      if (item != NULL) {
403,969!
208
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
209
      }
210
    } else {
211
      num = pDataBlockInfo->rows - startPos;
×
212
      if (item != NULL) {
×
213
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
214
      }
215
    }
216
  }
217

218
  return num;
45,993,634✔
219
}
220

221
void doTimeWindowInterpolation(SArray* pPrevValues, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs,
19,456✔
222
                               int32_t curRowIndex, TSKEY windowKey, int32_t type, SExprSupp* pSup) {
223
  SqlFunctionCtx* pCtx = pSup->pCtx;
19,456✔
224

225
  int32_t index = 1;
19,456✔
226
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
58,193✔
227
    if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) {
38,737✔
228
      pCtx[k].start.key = INT64_MIN;
19,281✔
229
      continue;
19,281✔
230
    }
231

232
    SFunctParam*     pParam = &pCtx[k].param[0];
19,456✔
233
    SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, pParam->pCol->slotId);
19,456✔
234

235
    double v1 = 0, v2 = 0, v = 0;
19,456✔
236
    if (prevRowIndex == -1) {
19,456!
237
      SGroupKeys* p = taosArrayGet(pPrevValues, index);
×
238
      GET_TYPED_DATA(v1, double, pColInfo->info.type, p->pData, typeGetTypeModFromColInfo(&pColInfo->info));
×
239
    } else {
240
      GET_TYPED_DATA(v1, double, pColInfo->info.type, colDataGetData(pColInfo, prevRowIndex),
19,456!
241
                     typeGetTypeModFromColInfo(&pColInfo->info));
242
    }
243

244
    GET_TYPED_DATA(v2, double, pColInfo->info.type, colDataGetData(pColInfo, curRowIndex),
19,456!
245
                   typeGetTypeModFromColInfo(&pColInfo->info));
246

247
#if 0
248
    if (functionId == FUNCTION_INTERP) {
249
      if (type == RESULT_ROW_START_INTERP) {
250
        pCtx[k].start.key = prevTs;
251
        pCtx[k].start.val = v1;
252

253
        pCtx[k].end.key = curTs;
254
        pCtx[k].end.val = v2;
255

256
        if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_VARBINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR ||
257
            pColInfo->info.type == TSDB_DATA_TYPE_GEOMETRY) {
258
          if (prevRowIndex == -1) {
259
            //            pCtx[k].start.ptr = (char*)pRuntimeEnv->prevRow[index];
260
          } else {
261
            pCtx[k].start.ptr = (char*)pColInfo->pData + prevRowIndex * pColInfo->info.bytes;
262
          }
263

264
          pCtx[k].end.ptr = (char*)pColInfo->pData + curRowIndex * pColInfo->info.bytes;
265
        }
266
      }
267
    } else if (functionId == FUNCTION_TWA) {
268
#endif
269

270
    SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
19,456✔
271
    SPoint point2 = (SPoint){.key = curTs, .val = &v2};
19,456✔
272
    SPoint point = (SPoint){.key = windowKey, .val = &v};
19,456✔
273

274
    if (!fmIsElapsedFunc(pCtx[k].functionId)) {
19,456✔
275
      taosGetLinearInterpolationVal(&point, TSDB_DATA_TYPE_DOUBLE, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, 0);
19,021✔
276
    }
277

278
    if (type == RESULT_ROW_START_INTERP) {
19,456✔
279
      pCtx[k].start.key = point.key;
9,508✔
280
      pCtx[k].start.val = v;
9,508✔
281
    } else {
282
      pCtx[k].end.key = point.key;
9,948✔
283
      pCtx[k].end.val = v;
9,948✔
284
    }
285

286
    index += 1;
19,456✔
287
  }
288
#if 0
289
  }
290
#endif
291
}
19,456✔
292

293
static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t type) {
1,720✔
294
  if (type == RESULT_ROW_START_INTERP) {
1,720✔
295
    for (int32_t k = 0; k < numOfOutput; ++k) {
2,926✔
296
      pCtx[k].start.key = INT64_MIN;
1,846✔
297
    }
298
  } else {
299
    for (int32_t k = 0; k < numOfOutput; ++k) {
1,887✔
300
      pCtx[k].end.key = INT64_MIN;
1,247✔
301
    }
302
  }
303
}
1,720✔
304

305
static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, int32_t pos, SSDataBlock* pBlock,
10,584✔
306
                                              const TSKEY* tsCols, STimeWindow* win, SExprSupp* pSup) {
307
  bool ascQuery = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
10,584✔
308

309
  TSKEY curTs = tsCols[pos];
10,584✔
310

311
  SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
10,584✔
312
  TSKEY       lastTs = *(int64_t*)pTsKey->pData;
10,584✔
313

314
  // lastTs == INT64_MIN and pos == 0 means this is the first time window, interpolation is not needed.
315
  // start exactly from this point, no need to do interpolation
316
  TSKEY key = ascQuery ? win->skey : win->ekey;
10,584!
317
  if (key == curTs) {
10,584✔
318
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
1,058✔
319
    return true;
1,058✔
320
  }
321

322
  // it is the first time window, no need to do interpolation
323
  if (pTsKey->isNull && pos == 0) {
9,526!
324
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
18✔
325
  } else {
326
    TSKEY prevTs = ((pos == 0) ? lastTs : tsCols[pos - 1]);
9,508!
327
    doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, pos - 1, curTs, pos, key,
9,508✔
328
                              RESULT_ROW_START_INTERP, pSup);
329
  }
330

331
  return true;
9,526✔
332
}
333

334
static int32_t setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex,
10,588✔
335
                                               int32_t nextRowIndex, SArray* pDataBlock, const TSKEY* tsCols,
336
                                               TSKEY blockEkey, STimeWindow* win, bool* pRes) {
337
  int32_t code = TSDB_CODE_SUCCESS;
10,588✔
338
  int32_t lino = 0;
10,588✔
339
  int32_t order = pInfo->binfo.inputTsOrder;
10,588✔
340

341
  TSKEY actualEndKey = tsCols[endRowIndex];
10,588✔
342
  TSKEY key = (order == TSDB_ORDER_ASC) ? win->ekey : win->skey;
10,588!
343

344
  // not ended in current data block, do not invoke interpolation
345
  if ((key > blockEkey && (order == TSDB_ORDER_ASC)) || (key < blockEkey && (order == TSDB_ORDER_DESC))) {
10,588!
346
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
114✔
347
    (*pRes) = false;
114✔
348
    return code;
114✔
349
  }
350

351
  // there is actual end point of current time window, no interpolation needs
352
  if (key == actualEndKey) {
10,474✔
353
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
526✔
354
    (*pRes) = true;
526✔
355
    return code;
526✔
356
  }
357

358
  if (nextRowIndex < 0) {
9,948!
359
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
360
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
361
  }
362

363
  TSKEY nextKey = tsCols[nextRowIndex];
9,948✔
364
  doTimeWindowInterpolation(pInfo->pPrevValues, pDataBlock, actualEndKey, endRowIndex, nextKey, nextRowIndex, key,
9,948✔
365
                            RESULT_ROW_END_INTERP, pSup);
366
  (*pRes) = true;
9,948✔
367
  return code;
9,948✔
368
}
369

370
bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd, EStreamType blockType) {
45,990,683✔
371
  if (pInterval->interval != pInterval->sliding &&
45,990,683✔
372
      ((pWin->ekey < calStart || pWin->skey > calEnd) || (blockType == STREAM_PULL_DATA && pWin->skey < calStart))) {
26,781!
373
    return false;
×
374
  }
375

376
  return true;
45,990,683✔
377
}
378

379
bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo) {
45,949,246✔
380
  return inCalSlidingWindow(pInterval, pWin, pBlockInfo->calWin.skey, pBlockInfo->calWin.ekey, pBlockInfo->type);
45,949,246✔
381
}
382

383
int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
46,858,988✔
384
                               TSKEY* primaryKeys, int32_t prevPosition, int32_t order) {
385
  bool ascQuery = (order == TSDB_ORDER_ASC);
46,858,988✔
386

387
  int32_t precision = pInterval->precision;
46,858,988✔
388
  getNextTimeWindow(pInterval, pNext, order);
46,858,988✔
389

390
  // next time window is not in current block
391
  if ((pNext->skey > pDataBlockInfo->window.ekey && order == TSDB_ORDER_ASC) ||
46,682,841!
392
      (pNext->ekey < pDataBlockInfo->window.skey && order == TSDB_ORDER_DESC)) {
45,915,809!
393
    return -1;
777,293✔
394
  }
395

396
  if (!inSlidingWindow(pInterval, pNext, pDataBlockInfo) && order == TSDB_ORDER_ASC) {
45,905,548!
397
    return -1;
×
398
  }
399

400
  TSKEY   skey = ascQuery ? pNext->skey : pNext->ekey;
45,954,610✔
401
  int32_t startPos = 0;
45,954,610✔
402

403
  // tumbling time window query, a special case of sliding time window query
404
  if (pInterval->sliding == pInterval->interval && prevPosition != -1) {
45,954,610!
405
    startPos = prevPosition + 1;
45,945,121✔
406
  } else {
407
    if ((skey <= pDataBlockInfo->window.skey && ascQuery) || (skey >= pDataBlockInfo->window.ekey && !ascQuery)) {
9,489!
408
      startPos = 0;
11,967✔
409
    } else {
410
      startPos = binarySearchForKey((char*)primaryKeys, pDataBlockInfo->rows, skey, order);
×
411
    }
412
  }
413

414
  /* interp query with fill should not skip time window */
415
  //  if (pQueryAttr->pointInterpQuery && pQueryAttr->fillType != TSDB_FILL_NONE) {
416
  //    return startPos;
417
  //  }
418

419
  /*
420
   * This time window does not cover any data, try next time window,
421
   * this case may happen when the time window is too small
422
   */
423
  if (primaryKeys != NULL) {
45,969,875!
424
    if (ascQuery && primaryKeys[startPos] > pNext->ekey) {
59,897,284✔
425
      TSKEY next = primaryKeys[startPos];
13,925,633✔
426
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
13,925,633✔
427
        pNext->skey = taosTimeTruncate(next, pInterval);
7,803✔
428
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
4✔
429
      } else {
430
        pNext->ekey += ((next - pNext->ekey + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
13,917,830✔
431
        pNext->skey = pNext->ekey - pInterval->interval + 1;
13,917,830✔
432
      }
433
    } else if ((!ascQuery) && primaryKeys[startPos] < pNext->skey) {
32,046,589✔
434
      TSKEY next = primaryKeys[startPos];
276,711✔
435
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
276,711!
436
        pNext->skey = taosTimeTruncate(next, pInterval);
×
437
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
×
438
      } else {
439
        pNext->skey -= ((pNext->skey - next + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
276,718✔
440
        pNext->ekey = pNext->skey + pInterval->interval - 1;
276,718✔
441
      }
442
    }
443
  }
444

445
  return startPos;
45,969,311✔
446
}
447

448
static bool isResultRowInterpolated(SResultRow* pResult, SResultTsInterpType type) {
31,764✔
449
  if (type == RESULT_ROW_START_INTERP) {
31,764✔
450
    return pResult->startInterp == true;
10,588✔
451
  } else {
452
    return pResult->endInterp == true;
21,176✔
453
  }
454
}
455

456
static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
21,058✔
457
  if (type == RESULT_ROW_START_INTERP) {
21,058✔
458
    pResult->startInterp = true;
10,584✔
459
  } else {
460
    pResult->endInterp = true;
10,474✔
461
  }
462
}
21,058✔
463

464
static int32_t doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataBlock* pBlock, SResultRow* pResult,
45,420,574✔
465
                                           STimeWindow* win, int32_t startPos, int32_t forwardRows, SExprSupp* pSup) {
466
  int32_t code = TSDB_CODE_SUCCESS;
45,420,574✔
467
  int32_t lino = 0;
45,420,574✔
468
  if (!pInfo->timeWindowInterpo) {
45,420,574!
469
    return code;
45,428,839✔
470
  }
471

472
  if (pBlock == NULL) {
×
473
    code = TSDB_CODE_INVALID_PARA;
×
474
    return code;
×
475
  }
476

477
  if (pBlock->pDataBlock == NULL) {
×
478
    return code;
×
479
  }
480

481
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
×
482

483
  TSKEY* tsCols = (TSKEY*)(pColInfo->pData);
10,588✔
484
  bool   done = isResultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
10,588✔
485
  if (!done) {  // it is not interpolated, now start to generated the interpolated value
10,588✔
486
    bool interp = setTimeWindowInterpolationStartTs(pInfo, startPos, pBlock, tsCols, win, pSup);
10,584✔
487
    if (interp) {
10,584!
488
      setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
10,584✔
489
    }
490
  } else {
491
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
4✔
492
  }
493

494
  // point interpolation does not require the end key time window interpolation.
495
  // interpolation query does not generate the time window end interpolation
496
  done = isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
10,588✔
497
  if (!done) {
10,588!
498
    int32_t endRowIndex = startPos + forwardRows - 1;
10,588✔
499
    int32_t nextRowIndex = endRowIndex + 1;
10,588✔
500

501
    // duplicated ts row does not involve in the interpolation of end value for current time window
502
    int32_t x = endRowIndex;
10,588✔
503
    while (x > 0) {
10,641✔
504
      if (tsCols[x] == tsCols[x - 1]) {
10,561✔
505
        x -= 1;
53✔
506
      } else {
507
        endRowIndex = x;
10,508✔
508
        break;
10,508✔
509
      }
510
    }
511

512
    TSKEY endKey = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey;
10,588!
513
    bool  interp = false;
10,588✔
514
    code = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, nextRowIndex, pBlock->pDataBlock, tsCols, endKey,
10,588✔
515
                                           win, &interp);
516
    QUERY_CHECK_CODE(code, lino, _end);
10,588!
517
    if (interp) {
10,588✔
518
      setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
10,474✔
519
    }
520
  } else {
521
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
×
522
  }
523

524
_end:
10,588✔
525
  if (code != TSDB_CODE_SUCCESS) {
10,588!
526
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
527
  }
528
  return code;
10,588✔
529
}
530

531
static void saveDataBlockLastRow(SArray* pPrevKeys, const SSDataBlock* pBlock, SArray* pCols) {
114✔
532
  if (pBlock->pDataBlock == NULL) {
114!
533
    return;
×
534
  }
535

536
  size_t num = taosArrayGetSize(pPrevKeys);
114✔
537
  for (int32_t k = 0; k < num; ++k) {
342✔
538
    SColumn* pc = taosArrayGet(pCols, k);
228✔
539

540
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pc->slotId);
228✔
541

542
    SGroupKeys* pkey = taosArrayGet(pPrevKeys, k);
228✔
543
    for (int32_t i = pBlock->info.rows - 1; i >= 0; --i) {
228!
544
      if (colDataIsNull_s(pColInfo, i)) {
456!
545
        continue;
×
546
      }
547

548
      char* val = colDataGetData(pColInfo, i);
228!
549
      if (IS_VAR_DATA_TYPE(pkey->type)) {
228!
550
        if (IS_STR_DATA_BLOB(pkey->type)) {
×
551
          memcpy(pkey->pData, val, blobDataTLen(val));
×
552
        } else {
553
          memcpy(pkey->pData, val, varDataTLen(val));
×
554
        }
555
      } else {
556
        memcpy(pkey->pData, val, pkey->bytes);
228✔
557
      }
558

559
      break;
228✔
560
    }
561
  }
562
}
563

564
static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t numOfExprs, SResultRowInfo* pResultRowInfo,
114✔
565
                                       SSDataBlock* pBlock, int32_t scanFlag, int64_t* tsCols, SResultRowPosition* p) {
566
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
114✔
567

568
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
114✔
569
  SExprSupp*                pSup = &pOperatorInfo->exprSupp;
114✔
570

571
  int32_t startPos = 0;
114✔
572
  int32_t numOfOutput = pSup->numOfExprs;
114✔
573

574
  SResultRow* pResult = NULL;
114✔
575

576
  while (1) {
×
577
    SListNode*          pn = tdListGetHead(pResultRowInfo->openWindow);
114✔
578
    SOpenWindowInfo*    pOpenWin = (SOpenWindowInfo*)pn->data;
114✔
579
    uint64_t            groupId = pOpenWin->groupId;
114✔
580
    SResultRowPosition* p1 = &pOpenWin->pos;
114✔
581
    if (p->pageId == p1->pageId && p->offset == p1->offset) {
114!
582
      break;
114✔
583
    }
584

585
    SResultRow* pr = getResultRowByPos(pInfo->aggSup.pResultBuf, p1, false);
×
586
    if (NULL == pr) {
×
587
      T_LONG_JMP(pTaskInfo->env, terrno);
×
588
    }
589

590
    if (!(pr->offset == p1->offset && pr->pageId == p1->pageId)) {
×
591
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
592
      T_LONG_JMP(pTaskInfo->env, terrno);
×
593
    }
594

595
    if (pr->closed) {
×
596
      if (!(isResultRowInterpolated(pr, RESULT_ROW_START_INTERP) &&
×
597
            isResultRowInterpolated(pr, RESULT_ROW_END_INTERP))) {
×
598
        pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
599
        T_LONG_JMP(pTaskInfo->env, terrno);
×
600
      }
601
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
×
602
      taosMemoryFree(pNode);
×
603
      continue;
×
604
    }
605

606
    STimeWindow w = pr->win;
×
607
    int32_t     ret = setTimeWindowOutputBuf(pResultRowInfo, &w, (scanFlag == MAIN_SCAN), &pResult, groupId, pSup->pCtx,
×
608
                                             numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
609
    if (ret != TSDB_CODE_SUCCESS) {
×
610
      T_LONG_JMP(pTaskInfo->env, ret);
×
611
    }
612

613
    if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
×
614
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
615
      T_LONG_JMP(pTaskInfo->env, terrno);
×
616
    }
617

618
    SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
×
619
    if (!pTsKey) {
×
620
      pTaskInfo->code = terrno;
×
621
      T_LONG_JMP(pTaskInfo->env, terrno);
×
622
    }
623

624
    int64_t prevTs = *(int64_t*)pTsKey->pData;
×
625
    if (groupId == pBlock->info.id.groupId) {
×
626
      TSKEY curTs = pBlock->info.window.skey;
×
627
      if (tsCols != NULL) {
×
628
        curTs = tsCols[startPos];
×
629
      }
630
      doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, curTs, startPos, w.ekey,
×
631
                                RESULT_ROW_END_INTERP, pSup);
632
    }
633

634
    setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
×
635
    setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
×
636

637
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, 1);
×
638
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0,
×
639
                                          pBlock->info.rows, numOfExprs);
×
640
    if (ret != TSDB_CODE_SUCCESS) {
×
641
      T_LONG_JMP(pTaskInfo->env, ret);
×
642
    }
643

644
    if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
×
645
      closeResultRow(pr);
×
646
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
×
647
      taosMemoryFree(pNode);
×
648
    } else {  // the remains are can not be closed yet.
649
      break;
×
650
    }
651
  }
652
}
114✔
653

654
static bool tsKeyCompFn(void* l, void* r, void* param) {
2,108,319✔
655
  TSKEY*                    lTS = (TSKEY*)l;
2,108,319✔
656
  TSKEY*                    rTS = (TSKEY*)r;
2,108,319✔
657
  SIntervalAggOperatorInfo* pInfo = param;
2,108,319✔
658
  return pInfo->binfo.outputTsOrder == ORDER_ASC ? *lTS < *rTS : *lTS > *rTS;
2,108,319!
659
}
660

661
static bool isCalculatedWin(SIntervalAggOperatorInfo* pInfo, const STimeWindow* win, uint64_t tableGroupId) {
210,825✔
662
  char keyBuf[sizeof(TSKEY) + sizeof(uint64_t)] = {0};
210,825✔
663
  SET_RES_WINDOW_KEY(keyBuf, (char*)&win->skey, sizeof(TSKEY), tableGroupId);
210,825✔
664
  return tSimpleHashGet(pInfo->aggSup.pResultRowHashTable, keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))) != NULL;
210,825✔
665
}
666

667
/**
668
 * @brief check if cur window should be filtered out by limit info
669
 * @retval true if should be filtered out
670
 * @retval false if not filtering out
671
 * @note If no limit info, we skip filtering.
672
 *       If input/output ts order mismatch, we skip filtering too.
673
 *       eg. input ts order: desc, and output ts order: asc, limit: 10
674
 *       IntervalOperator should output the first 10 windows, however, we can't find the first 10 windows until we scan
675
 *       every tuple in every block.
676
 *       And the boundedQueue keeps refreshing all records with smaller ts key.
677
 */
678
static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STimeWindow* win, uint64_t groupId,
46,791,346✔
679
                                  SExecTaskInfo* pTaskInfo) {
680
  int32_t code = TSDB_CODE_SUCCESS;
46,791,346✔
681
  int32_t lino = 0;
46,791,346✔
682
  if (!pOperatorInfo->limited  // if no limit info, no filter will be applied
46,791,346✔
683
      || pOperatorInfo->binfo.inputTsOrder != pOperatorInfo->binfo.outputTsOrder
492,374✔
684
      // if input/output ts order mismatch, no filter
685
  ) {
686
    return false;
46,494,861✔
687
  }
688

689
  if (pOperatorInfo->limit == 0) return true;
296,485✔
690

691
  if (pOperatorInfo->pBQ == NULL) {
296,481✔
692
    pOperatorInfo->pBQ = createBoundedQueue(pOperatorInfo->limit - 1, tsKeyCompFn, taosAutoMemoryFree, pOperatorInfo);
14,070✔
693
    QUERY_CHECK_NULL(pOperatorInfo->pBQ, code, lino, _end, terrno);
14,106!
694
  }
695

696
  bool shouldFilter = false;
296,517✔
697
  // if BQ has been full, compare it with top of BQ
698
  if (taosBQSize(pOperatorInfo->pBQ) == taosBQMaxSize(pOperatorInfo->pBQ) + 1) {
296,517✔
699
    PriorityQueueNode* top = taosBQTop(pOperatorInfo->pBQ);
86,463✔
700
    shouldFilter = tsKeyCompFn(top->data, &win->skey, pOperatorInfo);
86,463✔
701
  }
702
  if (shouldFilter) {
296,486✔
703
    return true;
85,661✔
704
  } else if (isCalculatedWin(pOperatorInfo, win, groupId)) {
210,825✔
705
    return false;
4,085✔
706
  }
707

708
  // cur win not been filtered out and not been pushed into BQ yet, push it into BQ
709
  PriorityQueueNode node = {.data = taosMemoryMalloc(sizeof(TSKEY))};
206,727!
710
  QUERY_CHECK_NULL(node.data, code, lino, _end, terrno);
206,836!
711

712
  *((TSKEY*)node.data) = win->skey;
206,836✔
713

714
  if (NULL == taosBQPush(pOperatorInfo->pBQ, &node)) {
206,836✔
715
    taosMemoryFree(node.data);
3!
716
    return true;
×
717
  }
718

719
_end:
206,712✔
720
  if (code != TSDB_CODE_SUCCESS) {
206,712!
721
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
722
    pTaskInfo->code = code;
×
723
    T_LONG_JMP(pTaskInfo->env, code);
×
724
  }
725
  return false;
206,712✔
726
}
727

728
static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
863,021✔
729
                            int32_t scanFlag) {
730
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
863,021✔
731

732
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
863,021✔
733
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
863,021✔
734

735
  int32_t     startPos = 0;
863,021✔
736
  int32_t     numOfOutput = pSup->numOfExprs;
863,021✔
737
  int64_t*    tsCols = extractTsCol(pBlock, pInfo, pTaskInfo);
863,021✔
738
  uint64_t    tableGroupId = pBlock->info.id.groupId;
862,900✔
739
  bool        ascScan = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
862,900✔
740
  TSKEY       ts = getStartTsKey(&pBlock->info.window, tsCols);
862,900✔
741
  SResultRow* pResult = NULL;
862,866✔
742

743
  if (tableGroupId != pInfo->curGroupId) {
862,866✔
744
    pInfo->handledGroupNum += 1;
91,230✔
745
    if (pInfo->slimited && pInfo->handledGroupNum > pInfo->slimit) {
91,230✔
746
      return true;
28✔
747
    } else {
748
      pInfo->curGroupId = tableGroupId;
91,202✔
749
      destroyBoundedQueue(pInfo->pBQ);
91,202✔
750
      pInfo->pBQ = NULL;
91,208✔
751
    }
752
  }
753

754
  STimeWindow win =
755
      getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->binfo.inputTsOrder);
862,844✔
756
  if (filterWindowWithLimit(pInfo, &win, tableGroupId, pTaskInfo)) return false;
862,596✔
757

758
  int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
779,768✔
759
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
760
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
780,025!
761
    T_LONG_JMP(pTaskInfo->env, ret);
22!
762
  }
763

764
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
780,003✔
765
  int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
780,003✔
766
                                                 pInfo->binfo.inputTsOrder);
767

768
  // prev time window not interpolation yet.
769
  if (pInfo->timeWindowInterpo) {
779,440✔
770
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
114✔
771
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
114✔
772

773
    // restore current time window
774
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
114✔
775
                                 numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
776
    if (ret != TSDB_CODE_SUCCESS) {
114!
777
      T_LONG_JMP(pTaskInfo->env, ret);
×
778
    }
779

780
    // window start key interpolation
781
    ret = doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup);
114✔
782
    if (ret != TSDB_CODE_SUCCESS) {
114!
783
      T_LONG_JMP(pTaskInfo->env, ret);
×
784
    }
785
  }
786
  // qDebug("hashIntervalAgg1 window skey: %lld, ekey:%lld, startPos: %d, forwardRows: %d",
787
  //   win.skey, win.ekey, startPos, forwardRows);
788
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, 1);
779,440✔
789
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
779,423✔
790
                                        pBlock->info.rows, numOfOutput);
779,423✔
791
  if (ret != TSDB_CODE_SUCCESS) {
780,111!
792
    T_LONG_JMP(pTaskInfo->env, ret);
×
793
  }
794

795
  doCloseWindow(pResultRowInfo, pInfo, pResult);
780,111✔
796

797
  STimeWindow nextWin = win;
780,065✔
798
  while (1) {
46,098,210✔
799
    int32_t prevEndPos = forwardRows - 1 + startPos;
46,878,275✔
800
    startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos,
46,878,275✔
801
                                      pInfo->binfo.inputTsOrder);
802
    if (startPos < 0 || filterWindowWithLimit(pInfo, &nextWin, tableGroupId, pTaskInfo)) {
46,707,680!
803
      break;
804
    }
805
    // null data, failed to allocate more memory buffer
806
    int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
45,928,005✔
807
                                          pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
808
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
45,705,175!
809
      T_LONG_JMP(pTaskInfo->env, code);
×
810
    }
811

812
    // qDebug("hashIntervalAgg2 window skey: %lld, ekey:%lld, startPos: %d, forwardRows: %d",
813
      // nextWin.skey, nextWin.ekey, startPos, forwardRows);
814

815
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
45,724,144✔
816
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
45,724,144✔
817
                                           pInfo->binfo.inputTsOrder);
818
    // window start(end) key interpolation
819
    code = doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
45,324,532✔
820
    if (code != TSDB_CODE_SUCCESS) {
45,431,388!
821
      T_LONG_JMP(pTaskInfo->env, code);
×
822
    }
823
    // TODO: add to open window? how to close the open windows after input blocks exhausted?
824
#if 0
825
    if ((ascScan && ekey <= pBlock->info.window.ekey) ||
826
        (!ascScan && ekey >= pBlock->info.window.skey)) {
827
      // window start(end) key interpolation
828
      doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
829
    } else if (pInfo->timeWindowInterpo) {
830
      addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
831
    }
832
#endif
833
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, 1);
45,431,388✔
834
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
45,444,654✔
835
                                          pBlock->info.rows, numOfOutput);
45,444,654✔
836
    if (ret != TSDB_CODE_SUCCESS) {
46,123,228!
837
      T_LONG_JMP(pTaskInfo->env, ret);
×
838
    }
839
    doCloseWindow(pResultRowInfo, pInfo, pResult);
46,123,228✔
840
  }
841

842
  if (pInfo->timeWindowInterpo) {
753,901✔
843
    saveDataBlockLastRow(pInfo->pPrevValues, pBlock, pInfo->pInterpCols);
114✔
844
  }
845
  return false;
780,170✔
846
}
847

848
void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult) {
46,880,070✔
849
  // current result is done in computing final results.
850
  if (pInfo->timeWindowInterpo && isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
46,880,070✔
851
    closeResultRow(pResult);
10,474✔
852
    SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
10,474✔
853
    taosMemoryFree(pNode);
10,474!
854
  }
855
}
46,880,070✔
856

857
SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId,
114✔
858
                                       SExecTaskInfo* pTaskInfo) {
859
  int32_t         code = TSDB_CODE_SUCCESS;
114✔
860
  int32_t         lino = 0;
114✔
861
  SOpenWindowInfo openWin = {0};
114✔
862
  openWin.pos.pageId = pResult->pageId;
114✔
863
  openWin.pos.offset = pResult->offset;
114✔
864
  openWin.groupId = groupId;
114✔
865
  SListNode* pn = tdListGetTail(pResultRowInfo->openWindow);
114✔
866
  if (pn == NULL) {
114✔
867
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
112✔
868
    QUERY_CHECK_CODE(code, lino, _end);
112!
869
    return openWin.pos;
112✔
870
  }
871

872
  SOpenWindowInfo* px = (SOpenWindowInfo*)pn->data;
2✔
873
  if (px->pos.pageId != openWin.pos.pageId || px->pos.offset != openWin.pos.offset || px->groupId != openWin.groupId) {
2!
874
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
×
875
    QUERY_CHECK_CODE(code, lino, _end);
×
876
  }
877

878
_end:
2✔
879
  if (code != TSDB_CODE_SUCCESS) {
2!
880
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
881
    pTaskInfo->code = code;
×
882
    T_LONG_JMP(pTaskInfo->env, code);
×
883
  }
884
  return openWin.pos;
2✔
885
}
886

887
int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo) {
879,440✔
888
  TSKEY* tsCols = NULL;
879,440✔
889

890
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
879,440!
891
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
879,458✔
892
    if (!pColDataInfo) {
879,354!
893
      pTaskInfo->code = terrno;
×
894
      T_LONG_JMP(pTaskInfo->env, terrno);
×
895
    }
896

897
    tsCols = (int64_t*)pColDataInfo->pData;
879,354✔
898
    if (tsCols[0] == 0) {
879,354✔
899
      qWarn("%s at line %d.block start ts:%" PRId64 ",end ts:%" PRId64, __func__, __LINE__, tsCols[0],
2!
900
            tsCols[pBlock->info.rows - 1]);
901
    }
902

903
    if (tsCols[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
879,350!
904
      int32_t code = blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
52,319✔
905
      if (code != TSDB_CODE_SUCCESS) {
52,319!
906
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
907
        pTaskInfo->code = code;
×
908
        T_LONG_JMP(pTaskInfo->env, code);
×
909
      }
910
    }
911
  }
912

913
  return tsCols;
879,332✔
914
}
915

916
static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
149,345✔
917
  if (OPTR_IS_OPENED(pOperator)) {
149,345✔
918
    return TSDB_CODE_SUCCESS;
132,676✔
919
  }
920

921
  int32_t        code = TSDB_CODE_SUCCESS;
16,669✔
922
  int32_t        lino = 0;
16,669✔
923
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
16,669✔
924
  SOperatorInfo* downstream = pOperator->pDownstream[0];
16,669✔
925

926
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
16,669✔
927
  SExprSupp*                pSup = &pOperator->exprSupp;
16,669✔
928

929
  int32_t scanFlag = MAIN_SCAN;
16,669✔
930
  int64_t st = taosGetTimestampUs();
16,670✔
931

932
  pInfo->cleanGroupResInfo = false;
16,670✔
933
  while (1) {
862,938✔
934
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
879,608✔
935
    if (pBlock == NULL) {
879,323✔
936
      break;
16,638✔
937
    }
938

939
    pInfo->binfo.pRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
862,685✔
940

941
    if (pInfo->scalarSupp.pExprInfo != NULL) {
862,685✔
942
      SExprSupp* pExprSup = &pInfo->scalarSupp;
103,265✔
943
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
103,265✔
944
                                   GET_STM_RTINFO(pOperator->pTaskInfo));
103,265!
945
      QUERY_CHECK_CODE(code, lino, _end);
103,264!
946
    }
947

948
    // the pDataBlock are always the same one, no need to call this again
949
    code = setInputDataBlock(pSup, pBlock, pInfo->binfo.inputTsOrder, scanFlag, true);
862,684✔
950
    QUERY_CHECK_CODE(code, lino, _end);
863,050!
951
    if (hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag)) break;
863,050✔
952
  }
953

954
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->binfo.outputTsOrder);
16,666✔
955
  QUERY_CHECK_CODE(code, lino, _end);
16,666!
956
  pInfo->cleanGroupResInfo = true;
16,666✔
957

958
  OPTR_SET_OPENED(pOperator);
16,666✔
959

960
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
16,667✔
961

962
_end:
16,667✔
963
  if (code != TSDB_CODE_SUCCESS) {
16,667!
964
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
965
    pTaskInfo->code = code;
×
966
    T_LONG_JMP(pTaskInfo->env, code);
×
967
  }
968
  return code;
16,667✔
969
}
970

971
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock) {
6,292✔
972
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
6,292✔
973
  SExprSupp*     pSup = &pOperator->exprSupp;
6,292✔
974

975
  SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->stateCol.slotId);
6,292✔
976
  if (!pStateColInfoData) {
6,292!
977
    pTaskInfo->code = terrno;
×
978
    T_LONG_JMP(pTaskInfo->env, terrno);
×
979
  }
980
  int64_t gid = pBlock->info.id.groupId;
6,292✔
981

982
  bool    hasResult = false;
6,292✔
983
  bool    masterScan = true;
6,292✔
984
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
6,292✔
985
  int32_t bytes = pStateColInfoData->info.bytes;
6,292✔
986

987
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
6,292✔
988
  if (!pColInfoData) {
6,292!
989
    pTaskInfo->code = terrno;
×
990
    T_LONG_JMP(pTaskInfo->env, terrno);
×
991
  }
992
  TSKEY* tsList = (TSKEY*)pColInfoData->pData;
6,292✔
993

994
  SWindowRowsSup* pRowSup = &pInfo->winSup;
6,292✔
995
  pRowSup->numOfRows = 0;
6,292✔
996
  pRowSup->startRowIndex = 0;
6,292✔
997

998
  struct SColumnDataAgg* pAgg = NULL;
6,292✔
999
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
496,237✔
1000
    pAgg = (pBlock->pBlockAgg != NULL) ? &pBlock->pBlockAgg[pInfo->stateCol.slotId] : NULL;
489,945!
1001
    if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pAgg)) {
979,890✔
1002
      continue;
164,530✔
1003
    }
1004
    hasResult = true;
325,415✔
1005
    if (pStateColInfoData->pData == NULL) {
325,415!
1006
      qError("%s:%d state column data is null", __FILE__, __LINE__);
×
1007
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1008
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1009
    }
1010

1011
    char* val = colDataGetData(pStateColInfoData, j);
325,415!
1012

1013
    if (gid != pRowSup->groupId || !pInfo->hasKey) {
325,415✔
1014
      // todo extract method
1015
      if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) {
5,529!
1016
        if (IS_STR_DATA_BLOB(pInfo->stateKey.type)) {
118!
1017
          blobDataCopy(pInfo->stateKey.pData, val);
×
1018
        } else {
1019
          varDataCopy(pInfo->stateKey.pData, val);
118✔
1020
        }
1021
      } else {
1022
        memcpy(pInfo->stateKey.pData, val, bytes);
5,411✔
1023
      }
1024

1025
      pInfo->hasKey = true;
5,529✔
1026

1027
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
5,529✔
1028
      doKeepTuple(pRowSup, tsList[j], gid);
5,529✔
1029
    } else if (compareVal(val, &pInfo->stateKey)) {
319,886✔
1030
      doKeepTuple(pRowSup, tsList[j], gid);
164,918✔
1031
    } else {  // a new state window started
1032
      SResultRow* pResult = NULL;
154,968✔
1033

1034
      // keep the time window for the closed time window.
1035
      STimeWindow window = pRowSup->win;
154,968✔
1036

1037
      pRowSup->win.ekey = pRowSup->win.skey;
154,968✔
1038
      int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
154,968✔
1039
                                           numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1040
      if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
154,968!
1041
        T_LONG_JMP(pTaskInfo->env, ret);
×
1042
      }
1043

1044
      updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
154,968✔
1045
      ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData,
154,968✔
1046
                                            pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
154,968✔
1047
      if (ret != TSDB_CODE_SUCCESS) {
154,968!
1048
        T_LONG_JMP(pTaskInfo->env, ret);
×
1049
      }
1050

1051
      // here we start a new session window
1052
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
154,968✔
1053
      doKeepTuple(pRowSup, tsList[j], gid);
154,968✔
1054

1055
      // todo extract method
1056
      if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) {
154,968!
1057
        if (IS_STR_DATA_BLOB(pInfo->stateKey.type)) {
9!
1058
          blobDataCopy(pInfo->stateKey.pData, val);
×
1059
        } else {
1060
          varDataCopy(pInfo->stateKey.pData, val);
9✔
1061
        }
1062
      } else {
1063
        memcpy(pInfo->stateKey.pData, val, bytes);
154,959✔
1064
      }
1065
    }
1066
  }
1067

1068
  if (!hasResult) {
6,292✔
1069
    return;
467✔
1070
  }
1071
  SResultRow* pResult = NULL;
5,825✔
1072
  pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
5,825✔
1073
  int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
5,825✔
1074
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1075
  if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
5,825!
1076
    T_LONG_JMP(pTaskInfo->env, ret);
×
1077
  }
1078

1079
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
5,825✔
1080
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
5,825✔
1081
                                        pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
5,825✔
1082
  if (ret != TSDB_CODE_SUCCESS) {
5,825✔
1083
    T_LONG_JMP(pTaskInfo->env, ret);
2!
1084
  }
1085
}
1086

1087
static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) {
3,284✔
1088
  if (OPTR_IS_OPENED(pOperator)) {
3,284✔
1089
    return TSDB_CODE_SUCCESS;
455✔
1090
  }
1091

1092
  int32_t                   code = TSDB_CODE_SUCCESS;
2,829✔
1093
  int32_t                   lino = 0;
2,829✔
1094
  SStateWindowOperatorInfo* pInfo = pOperator->info;
2,829✔
1095
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
2,829✔
1096

1097
  SExprSupp* pSup = &pOperator->exprSupp;
2,829✔
1098
  int32_t    order = pInfo->binfo.inputTsOrder;
2,829✔
1099
  int64_t    st = taosGetTimestampUs();
2,829✔
1100

1101
  SOperatorInfo* downstream = pOperator->pDownstream[0];
2,829✔
1102
  pInfo->cleanGroupResInfo = false;
2,829✔
1103
  while (1) {
6,290✔
1104
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
9,119✔
1105
    if (pBlock == NULL) {
9,119✔
1106
      break;
2,827✔
1107
    }
1108

1109
    pInfo->binfo.pRes->info.scanFlag = pBlock->info.scanFlag;
6,292✔
1110
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
6,292✔
1111
    QUERY_CHECK_CODE(code, lino, _end);
6,292!
1112

1113
    code = blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
6,292✔
1114
    QUERY_CHECK_CODE(code, lino, _end);
6,292!
1115

1116
    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
1117
    if (pInfo->scalarSup.pExprInfo != NULL) {
6,292✔
1118
      pTaskInfo->code =
2,290✔
1119
          projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
2,290✔
1120
                                pInfo->scalarSup.numOfExprs, NULL, GET_STM_RTINFO(pOperator->pTaskInfo));
2,290!
1121
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
2,290!
1122
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1123
      }
1124
    }
1125

1126
    doStateWindowAggImpl(pOperator, pInfo, pBlock);
6,292✔
1127
  }
1128

1129
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
2,827✔
1130
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
2,827✔
1131
  QUERY_CHECK_CODE(code, lino, _end);
2,827!
1132
  pInfo->cleanGroupResInfo = true;
2,827✔
1133
  pOperator->status = OP_RES_TO_RETURN;
2,827✔
1134

1135
_end:
2,827✔
1136
  if (code != TSDB_CODE_SUCCESS) {
2,827!
1137
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1138
    pTaskInfo->code = code;
×
1139
    T_LONG_JMP(pTaskInfo->env, code);
×
1140
  }
1141
  return code;
2,827✔
1142
}
1143

1144
static int32_t doStateWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
5,471✔
1145
  if (pOperator->status == OP_EXEC_DONE) {
5,471✔
1146
    (*ppRes) = NULL;
2,187✔
1147
    return TSDB_CODE_SUCCESS;
2,187✔
1148
  }
1149

1150
  int32_t                   code = TSDB_CODE_SUCCESS;
3,284✔
1151
  int32_t                   lino = 0;
3,284✔
1152
  SStateWindowOperatorInfo* pInfo = pOperator->info;
3,284✔
1153
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
3,284✔
1154
  SOptrBasicInfo*           pBInfo = &pInfo->binfo;
3,284✔
1155

1156
  code = pOperator->fpSet._openFn(pOperator);
3,284✔
1157
  QUERY_CHECK_CODE(code, lino, _end);
3,282!
1158

1159
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
3,282✔
1160
  QUERY_CHECK_CODE(code, lino, _end);
3,282!
1161

1162
  while (1) {
×
1163
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
3,282✔
1164
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
3,282✔
1165
    QUERY_CHECK_CODE(code, lino, _end);
3,282!
1166

1167
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
3,282✔
1168
    if (!hasRemain) {
3,282✔
1169
      setOperatorCompleted(pOperator);
2,807✔
1170
      break;
2,807✔
1171
    }
1172

1173
    if (pBInfo->pRes->info.rows > 0) {
475!
1174
      break;
475✔
1175
    }
1176
  }
1177

1178
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
3,282✔
1179

1180
_end:
3,282✔
1181
  if (code != TSDB_CODE_SUCCESS) {
3,282!
1182
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1183
    pTaskInfo->code = code;
×
1184
    T_LONG_JMP(pTaskInfo->env, code);
×
1185
  }
1186
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
3,282✔
1187
  return code;
3,282✔
1188
}
1189

1190
static int32_t doBuildIntervalResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
161,034✔
1191
  int32_t                   code = TSDB_CODE_SUCCESS;
161,034✔
1192
  int32_t                   lino = 0;
161,034✔
1193
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
161,034✔
1194
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
161,034✔
1195

1196
  if (pOperator->status == OP_EXEC_DONE) {
161,034✔
1197
    (*ppRes) = NULL;
11,689✔
1198
    return code;
11,689✔
1199
  }
1200

1201
  SSDataBlock* pBlock = pInfo->binfo.pRes;
149,345✔
1202
  code = pOperator->fpSet._openFn(pOperator);
149,345✔
1203
  QUERY_CHECK_CODE(code, lino, _end);
149,339!
1204

1205
  while (1) {
×
1206
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
149,339✔
1207
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
149,345✔
1208
    QUERY_CHECK_CODE(code, lino, _end);
149,343!
1209

1210
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
149,343✔
1211
    if (!hasRemain) {
149,345✔
1212
      setOperatorCompleted(pOperator);
16,648✔
1213
      break;
16,647✔
1214
    }
1215

1216
    if (pBlock->info.rows > 0) {
132,697!
1217
      break;
132,697✔
1218
    }
1219
  }
1220

1221
  size_t rows = pBlock->info.rows;
149,344✔
1222
  pOperator->resultInfo.totalRows += rows;
149,344✔
1223

1224
_end:
149,344✔
1225
  if (code != TSDB_CODE_SUCCESS) {
149,344!
1226
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1227
    pTaskInfo->code = code;
×
1228
    T_LONG_JMP(pTaskInfo->env, code);
×
1229
  }
1230
  (*ppRes) = (rows == 0) ? NULL : pBlock;
149,344✔
1231
  return code;
149,344✔
1232
}
1233

1234
static void destroyStateWindowOperatorInfo(void* param) {
2,829✔
1235
  if (param == NULL) {
2,829!
1236
    return;
×
1237
  }
1238
  SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
2,829✔
1239
  cleanupBasicInfo(&pInfo->binfo);
2,829✔
1240
  taosMemoryFreeClear(pInfo->stateKey.pData);
2,829!
1241
  if (pInfo->pOperator) {
2,829!
1242
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
2,829✔
1243
                      pInfo->cleanGroupResInfo);
2,829✔
1244
    pInfo->pOperator = NULL;
2,829✔
1245
  }
1246

1247
  cleanupExprSupp(&pInfo->scalarSup);
2,829✔
1248
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
2,829✔
1249
  cleanupAggSup(&pInfo->aggSup);
2,829✔
1250
  cleanupGroupResInfo(&pInfo->groupResInfo);
2,829✔
1251

1252
  taosMemoryFreeClear(param);
2,829!
1253
}
1254

1255
static void freeItem(void* param) {
216✔
1256
  SGroupKeys* pKey = (SGroupKeys*)param;
216✔
1257
  taosMemoryFree(pKey->pData);
216!
1258
}
216✔
1259

1260
void destroyIntervalOperatorInfo(void* param) {
19,172✔
1261
  if (param == NULL) {
19,172!
1262
    return;
×
1263
  }
1264

1265
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
19,172✔
1266

1267
  cleanupBasicInfo(&pInfo->binfo);
19,172✔
1268
  if (pInfo->pOperator) {
19,172!
1269
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
19,172✔
1270
                      pInfo->cleanGroupResInfo);
19,172✔
1271
    pInfo->pOperator = NULL;
19,172✔
1272
  }
1273

1274
  cleanupAggSup(&pInfo->aggSup);
19,172✔
1275
  cleanupExprSupp(&pInfo->scalarSupp);
19,172✔
1276

1277
  pInfo->binfo.resultRowInfo.openWindow = tdListFree(pInfo->binfo.resultRowInfo.openWindow);
19,172✔
1278

1279
  taosArrayDestroy(pInfo->pInterpCols);
19,172✔
1280
  pInfo->pInterpCols = NULL;
19,172✔
1281

1282
  taosArrayDestroyEx(pInfo->pPrevValues, freeItem);
19,172✔
1283
  pInfo->pPrevValues = NULL;
19,172✔
1284

1285
  cleanupGroupResInfo(&pInfo->groupResInfo);
19,172✔
1286
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
19,172✔
1287
  destroyBoundedQueue(pInfo->pBQ);
19,172✔
1288
  taosMemoryFreeClear(param);
19,169!
1289
}
1290

1291
static int32_t initWindowInterpPrevVal(SIntervalAggOperatorInfo* pInfo) {
108✔
1292
  int32_t code = TSDB_CODE_SUCCESS;
108✔
1293
  int32_t lino = 0;
108✔
1294
  void*   tmp = NULL;
108✔
1295

1296
  pInfo->pInterpCols = taosArrayInit(4, sizeof(SColumn));
108✔
1297
  QUERY_CHECK_NULL(pInfo->pInterpCols, code, lino, _end, terrno);
108!
1298

1299
  pInfo->pPrevValues = taosArrayInit(4, sizeof(SGroupKeys));
108✔
1300
  QUERY_CHECK_NULL(pInfo->pPrevValues, code, lino, _end, terrno);
108!
1301

1302
  {  // ts column
1303
    SColumn c = {0};
108✔
1304
    c.colId = 1;
108✔
1305
    c.slotId = pInfo->primaryTsIndex;
108✔
1306
    c.type = TSDB_DATA_TYPE_TIMESTAMP;
108✔
1307
    c.bytes = sizeof(int64_t);
108✔
1308
    tmp = taosArrayPush(pInfo->pInterpCols, &c);
108✔
1309
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
108!
1310

1311
    SGroupKeys key;
1312
    key.bytes = c.bytes;
108✔
1313
    key.type = c.type;
108✔
1314
    key.isNull = true;  // to denote no value is assigned yet
108✔
1315
    key.pData = taosMemoryCalloc(1, c.bytes);
108!
1316
    QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
108!
1317

1318
    tmp = taosArrayPush(pInfo->pPrevValues, &key);
108✔
1319
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
108!
1320
  }
1321
_end:
108✔
1322
  if (code != TSDB_CODE_SUCCESS) {
108!
1323
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1324
  }
1325
  return code;
108✔
1326
}
1327

1328
static int32_t timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SIntervalAggOperatorInfo* pInfo,
19,127✔
1329
                                      bool* pRes) {
1330
  // the primary timestamp column
1331
  bool    needed = false;
19,127✔
1332
  int32_t code = TSDB_CODE_SUCCESS;
19,127✔
1333
  int32_t lino = 0;
19,127✔
1334
  void*   tmp = NULL;
19,127✔
1335

1336
  for (int32_t i = 0; i < numOfCols; ++i) {
102,407✔
1337
    SExprInfo* pExpr = pCtx[i].pExpr;
83,377✔
1338
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
83,377✔
1339
      needed = true;
108✔
1340
      break;
108✔
1341
    }
1342
  }
1343

1344
  if (needed) {
19,138✔
1345
    code = initWindowInterpPrevVal(pInfo);
108✔
1346
    QUERY_CHECK_CODE(code, lino, _end);
108!
1347
  }
1348

1349
  for (int32_t i = 0; i < numOfCols; ++i) {
102,500✔
1350
    SExprInfo* pExpr = pCtx[i].pExpr;
83,355✔
1351

1352
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
83,355✔
1353
      SFunctParam* pParam = &pExpr->base.pParam[0];
109✔
1354

1355
      SColumn c = *pParam->pCol;
109✔
1356
      tmp = taosArrayPush(pInfo->pInterpCols, &c);
109✔
1357
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
108!
1358

1359
      SGroupKeys key = {0};
108✔
1360
      key.bytes = c.bytes;
108✔
1361
      key.type = c.type;
108✔
1362
      key.isNull = false;
108✔
1363
      key.pData = taosMemoryCalloc(1, c.bytes);
108!
1364
      QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
108!
1365

1366
      tmp = taosArrayPush(pInfo->pPrevValues, &key);
108✔
1367
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
108!
1368
    }
1369
  }
1370

1371
_end:
19,145✔
1372
  if (code != TSDB_CODE_SUCCESS) {
19,145!
1373
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1374
  }
1375
  *pRes = needed;
19,159✔
1376
  return code;
19,159✔
1377
}
1378

1379
static int32_t resetInterval(SOperatorInfo* pOper, SIntervalAggOperatorInfo* pIntervalInfo){
×
1380
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
×
1381
  SIntervalPhysiNode* pPhynode = (SIntervalPhysiNode*)pOper->pPhyNode;
×
1382
  pOper->status = OP_NOT_OPENED;
×
1383

1384
  resetBasicOperatorState(&pIntervalInfo->binfo);
×
1385
  cleanupResultInfo(pIntervalInfo->pOperator->pTaskInfo, &pIntervalInfo->pOperator->exprSupp, &pIntervalInfo->groupResInfo, &pIntervalInfo->aggSup,
×
1386
    pIntervalInfo->cleanGroupResInfo);
×
1387

1388
  colDataDestroy(&pIntervalInfo->twAggSup.timeWindowData);
×
1389
  int32_t code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pTaskInfo->window);
×
1390
  if (code == 0) {
×
1391
    code = resetAggSup(&pOper->exprSupp, &pIntervalInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
×
1392
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
×
1393
                       &pTaskInfo->storageAPI.functionStore);
1394
  }
1395
  if (code == 0) {
×
1396
    code = resetExprSupp(&pIntervalInfo->scalarSupp, pTaskInfo, pPhynode->window.pExprs, NULL,
×
1397
                         &pTaskInfo->storageAPI.functionStore);
1398
  }
1399

1400
  if (pIntervalInfo->binfo.resultRowInfo.openWindow != NULL){
×
1401
    tdListEmpty(pIntervalInfo->binfo.resultRowInfo.openWindow);
×
1402
  }
1403

1404
  if (pPhynode->window.node.pSlimit && ((SLimitNode*)pPhynode->window.node.pSlimit)->limit) {
×
1405
    pIntervalInfo->curGroupId = UINT64_MAX;
×
1406
  }
1407

1408
  pIntervalInfo->cleanGroupResInfo = false;
×
1409
  pIntervalInfo->handledGroupNum = 0;
×
1410
  pIntervalInfo->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder;
×
1411
  pIntervalInfo->binfo.outputTsOrder = pPhynode->window.node.outputTsOrder;
×
1412

1413
  taosArrayDestroy(pIntervalInfo->pInterpCols);
×
1414
  pIntervalInfo->pInterpCols = NULL;
×
1415

1416
  if (pIntervalInfo->pPrevValues != NULL) {
×
1417
    taosArrayDestroyEx(pIntervalInfo->pPrevValues, freeItem);
×
1418
    pIntervalInfo->pPrevValues = NULL;
×
1419
    initWindowInterpPrevVal(pIntervalInfo);
×
1420
  }
1421

1422
  cleanupGroupResInfo(&pIntervalInfo->groupResInfo);
×
1423
  destroyBoundedQueue(pIntervalInfo->pBQ);
×
1424
  pIntervalInfo->pBQ = NULL;
×
1425
  return code;
×
1426
}
1427

1428
static int32_t resetIntervalOperState(SOperatorInfo* pOper) {
×
1429
  SIntervalAggOperatorInfo* pInfo = pOper->info;
×
1430
  return resetInterval(pOper, pInfo);
×
1431
}
1432

1433
int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
16,649✔
1434
                                   SOperatorInfo** pOptrInfo) {
1435
  QRY_PARAM_CHECK(pOptrInfo);
16,649!
1436

1437
  int32_t                   code = TSDB_CODE_SUCCESS;
16,649✔
1438
  int32_t                   lino = 0;
16,649✔
1439
  SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
16,649!
1440
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
16,663!
1441
  if (pInfo == NULL || pOperator == NULL) {
16,668!
1442
    code = terrno;
1✔
1443
    lino = __LINE__;
×
1444
    goto _error;
×
1445
  }
1446

1447
  pOperator->pPhyNode = pPhyNode;
16,667✔
1448
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->window.node.pOutputDataBlockDesc);
16,667✔
1449
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
16,666!
1450
  initBasicInfo(&pInfo->binfo, pResBlock);
16,666✔
1451

1452
  SExprSupp* pSup = &pOperator->exprSupp;
16,665✔
1453
  pSup->hasWindowOrGroup = true;
16,665✔
1454

1455
  pInfo->primaryTsIndex = ((SColumnNode*)pPhyNode->window.pTspk)->slotId;
16,665✔
1456

1457
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
16,665✔
1458
  initResultSizeInfo(&pOperator->resultInfo, 512);
16,665✔
1459
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
16,667✔
1460
  QUERY_CHECK_CODE(code, lino, _error);
16,668!
1461

1462
  int32_t    num = 0;
16,668✔
1463
  SExprInfo* pExprInfo = NULL;
16,668✔
1464
  code = createExprInfo(pPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
16,668✔
1465
  QUERY_CHECK_CODE(code, lino, _error);
16,662!
1466

1467
  code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
16,662✔
1468
                    &pTaskInfo->storageAPI.functionStore);
1469
  QUERY_CHECK_CODE(code, lino, _error);
16,661!
1470

1471
  SInterval interval = {.interval = pPhyNode->interval,
16,661✔
1472
                        .sliding = pPhyNode->sliding,
16,661✔
1473
                        .intervalUnit = pPhyNode->intervalUnit,
16,661✔
1474
                        .slidingUnit = pPhyNode->slidingUnit,
16,661✔
1475
                        .offset = pPhyNode->offset,
16,661✔
1476
                        .precision = ((SColumnNode*)pPhyNode->window.pTspk)->node.resType.precision,
16,661✔
1477
                        .timeRange = pPhyNode->timeRange};
1478
  calcIntervalAutoOffset(&interval);
16,661✔
1479

1480
  STimeWindowAggSupp as = {
16,656✔
1481
      .waterMark = pPhyNode->window.watermark,
16,656✔
1482
      .calTrigger = pPhyNode->window.triggerType,
16,656✔
1483
      .maxTs = INT64_MIN,
1484
  };
1485

1486
  pInfo->win = pTaskInfo->window;
16,656✔
1487
  pInfo->binfo.inputTsOrder = pPhyNode->window.node.inputTsOrder;
16,656✔
1488
  pInfo->binfo.outputTsOrder = pPhyNode->window.node.outputTsOrder;
16,656✔
1489
  pInfo->interval = interval;
16,656✔
1490
  pInfo->twAggSup = as;
16,656✔
1491
  pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock;
16,656✔
1492
  if (pPhyNode->window.node.pLimit && ((SLimitNode*)pPhyNode->window.node.pLimit)->limit) {
16,656!
1493
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pLimit;
4,488✔
1494
    pInfo->limited = true;
4,488✔
1495
    pInfo->limit = pLimit->limit->datum.i + (pLimit->offset ? pLimit->offset->datum.i : 0);
4,488✔
1496
  }
1497
  if (pPhyNode->window.node.pSlimit && ((SLimitNode*)pPhyNode->window.node.pSlimit)->limit) {
16,656!
1498
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pSlimit;
126✔
1499
    pInfo->slimited = true;
126✔
1500
    pInfo->slimit = pLimit->limit->datum.i + (pLimit->offset ? pLimit->offset->datum.i : 0);
126✔
1501
    pInfo->curGroupId = UINT64_MAX;
126✔
1502
  }
1503

1504
  if (pPhyNode->window.pExprs != NULL) {
16,656✔
1505
    int32_t    numOfScalar = 0;
2,130✔
1506
    SExprInfo* pScalarExprInfo = NULL;
2,130✔
1507
    code = createExprInfo(pPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
2,130✔
1508
    QUERY_CHECK_CODE(code, lino, _error);
2,132!
1509

1510
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
2,132✔
1511
    if (code != TSDB_CODE_SUCCESS) {
2,133!
1512
      goto _error;
×
1513
    }
1514
  }
1515

1516
  code = filterInitFromNode((SNode*)pPhyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
16,659✔
1517
                            pTaskInfo->pStreamRuntimeInfo);
16,659✔
1518
  if (code != TSDB_CODE_SUCCESS) {
16,642!
1519
    goto _error;
×
1520
  }
1521

1522
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
16,642✔
1523
  QUERY_CHECK_CODE(code, lino, _error);
16,652!
1524

1525
  pInfo->timeWindowInterpo = false;
16,652✔
1526
  code = timeWindowinterpNeeded(pSup->pCtx, num, pInfo, &pInfo->timeWindowInterpo);
16,652✔
1527
  QUERY_CHECK_CODE(code, lino, _error);
16,652!
1528
  if (pInfo->timeWindowInterpo) {
16,652✔
1529
    pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
108✔
1530
    if (pInfo->binfo.resultRowInfo.openWindow == NULL) {
108!
1531
      goto _error;
×
1532
    }
1533
  }
1534

1535
  pInfo->pOperator = pOperator;
16,652✔
1536
  pInfo->cleanGroupResInfo = false;
16,652✔
1537
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
16,652✔
1538
  setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED,
16,631✔
1539
                  pInfo, pTaskInfo);
1540

1541
  pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResultNext, NULL, destroyIntervalOperatorInfo,
16,646✔
1542
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1543
  setOperatorResetStateFn(pOperator, resetIntervalOperState);
16,653✔
1544
  code = appendDownstream(pOperator, &downstream, 1);
16,656✔
1545
  if (code != TSDB_CODE_SUCCESS) {
16,641!
1546
    goto _error;
×
1547
  }
1548

1549
  *pOptrInfo = pOperator;
16,641✔
1550
  return TSDB_CODE_SUCCESS;
16,641✔
1551

1552
_error:
×
1553
  if (pInfo != NULL) {
×
1554
    destroyIntervalOperatorInfo(pInfo);
×
1555
  }
1556

1557
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1558
  pTaskInfo->code = code;
×
1559
  qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code));
×
1560
  return code;
×
1561
}
1562

1563
// todo handle multiple timeline cases. assume no timeline interweaving
1564
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) {
4,605✔
1565
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
4,605✔
1566
  SExprSupp*     pSup = &pOperator->exprSupp;
4,605✔
1567

1568
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
4,605✔
1569
  if (!pColInfoData) {
4,605!
1570
    pTaskInfo->code = terrno;
×
1571
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1572
  }
1573

1574
  bool    masterScan = true;
4,605✔
1575
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
4,605✔
1576
  int64_t gid = pBlock->info.id.groupId;
4,605✔
1577

1578
  int64_t gap = pInfo->gap;
4,605✔
1579

1580
  if (!pInfo->reptScan) {
4,605✔
1581
    pInfo->reptScan = true;
1,260✔
1582
    pInfo->winSup.prevTs = INT64_MIN;
1,260✔
1583
  }
1584

1585
  SWindowRowsSup* pRowSup = &pInfo->winSup;
4,605✔
1586
  pRowSup->numOfRows = 0;
4,605✔
1587
  pRowSup->startRowIndex = 0;
4,605✔
1588

1589
  // In case of ascending or descending order scan data, only one time window needs to be kepted for each table.
1590
  TSKEY* tsList = (TSKEY*)pColInfoData->pData;
4,605✔
1591
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
787,910✔
1592
    if (gid != pRowSup->groupId || pInfo->winSup.prevTs == INT64_MIN) {
783,305✔
1593
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
3,831✔
1594
      doKeepTuple(pRowSup, tsList[j], gid);
3,831✔
1595
    } else if (((tsList[j] - pRowSup->prevTs >= 0) && (tsList[j] - pRowSup->prevTs <= gap)) ||
779,474✔
1596
               ((pRowSup->prevTs - tsList[j] >= 0) && (pRowSup->prevTs - tsList[j] <= gap))) {
259,506✔
1597
      // The gap is less than the threshold, so it belongs to current session window that has been opened already.
1598
      doKeepTuple(pRowSup, tsList[j], gid);
520,028✔
1599
    } else {  // start a new session window
1600
      // start a new session window
1601
      if (pRowSup->numOfRows > 0) {  // handled data that belongs to the previous session window
259,446✔
1602
        SResultRow* pResult = NULL;
259,293✔
1603

1604
        // keep the time window for the closed time window.
1605
        STimeWindow window = pRowSup->win;
259,293✔
1606

1607
        int32_t ret =
1608
            setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
259,293✔
1609
                                   numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1610
        if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
259,293!
1611
          T_LONG_JMP(pTaskInfo->env, ret);
×
1612
        }
1613

1614
        // pInfo->numOfRows data belong to the current session window
1615
        updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
259,293✔
1616
        ret =
1617
            applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData,
259,293✔
1618
                                            pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
259,293✔
1619
        if (ret != TSDB_CODE_SUCCESS) {
259,293!
1620
          T_LONG_JMP(pTaskInfo->env, ret);
×
1621
        }
1622
      }
1623

1624
      // here we start a new session window
1625
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
259,446✔
1626
      doKeepTuple(pRowSup, tsList[j], gid);
259,446✔
1627
    }
1628
  }
1629

1630
  SResultRow* pResult = NULL;
4,605✔
1631
  pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
4,605✔
1632
  int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
4,605✔
1633
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1634
  if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
4,605!
1635
    T_LONG_JMP(pTaskInfo->env, ret);
×
1636
  }
1637

1638
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
4,605✔
1639
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
4,605✔
1640
                                        pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
4,605✔
1641
  if (ret != TSDB_CODE_SUCCESS) {
4,605!
1642
    T_LONG_JMP(pTaskInfo->env, ret);
×
1643
  }
1644
}
4,605✔
1645

1646
static int32_t doSessionWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
2,321✔
1647
  if (pOperator->status == OP_EXEC_DONE) {
2,321✔
1648
    (*ppRes) = NULL;
911✔
1649
    return TSDB_CODE_SUCCESS;
911✔
1650
  }
1651

1652
  int32_t                  code = TSDB_CODE_SUCCESS;
1,410✔
1653
  int32_t                  lino = 0;
1,410✔
1654
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
1,410✔
1655
  SSessionAggOperatorInfo* pInfo = pOperator->info;
1,410✔
1656
  SOptrBasicInfo*          pBInfo = &pInfo->binfo;
1,410✔
1657
  SExprSupp*               pSup = &pOperator->exprSupp;
1,410✔
1658

1659
  if (pOperator->status == OP_RES_TO_RETURN) {
1,410✔
1660
    while (1) {
×
1661
      doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
115✔
1662
      code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
115✔
1663
      QUERY_CHECK_CODE(code, lino, _end);
115!
1664

1665
      bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
115✔
1666
      if (!hasRemain) {
115✔
1667
        setOperatorCompleted(pOperator);
18✔
1668
        break;
18✔
1669
      }
1670

1671
      if (pBInfo->pRes->info.rows > 0) {
97!
1672
        break;
97✔
1673
      }
1674
    }
1675
    pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
115✔
1676
    (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
115!
1677
    return code;
115✔
1678
  }
1679

1680
  int64_t st = taosGetTimestampUs();
1,295✔
1681
  int32_t order = pInfo->binfo.inputTsOrder;
1,295✔
1682

1683
  SOperatorInfo* downstream = pOperator->pDownstream[0];
1,295✔
1684

1685
  pInfo->cleanGroupResInfo = false;
1,295✔
1686
  while (1) {
4,605✔
1687
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
5,900✔
1688
    if (pBlock == NULL) {
5,900✔
1689
      break;
1,295✔
1690
    }
1691

1692
    pBInfo->pRes->info.scanFlag = pBlock->info.scanFlag;
4,605✔
1693
    if (pInfo->scalarSupp.pExprInfo != NULL) {
4,605✔
1694
      SExprSupp* pExprSup = &pInfo->scalarSupp;
3✔
1695
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
3✔
1696
                                   GET_STM_RTINFO(pOperator->pTaskInfo));
3!
1697
      QUERY_CHECK_CODE(code, lino, _end);
3!
1698
    }
1699
    // the pDataBlock are always the same one, no need to call this again
1700
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
4,605✔
1701
    QUERY_CHECK_CODE(code, lino, _end);
4,605!
1702

1703
    code = blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
4,605✔
1704
    QUERY_CHECK_CODE(code, lino, _end);
4,605!
1705

1706
    doSessionWindowAggImpl(pOperator, pInfo, pBlock);
4,605✔
1707
  }
1708

1709
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
1,295✔
1710

1711
  // restore the value
1712
  pOperator->status = OP_RES_TO_RETURN;
1,295✔
1713

1714
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
1,295✔
1715
  QUERY_CHECK_CODE(code, lino, _end);
1,295!
1716
  pInfo->cleanGroupResInfo = true;
1,295✔
1717

1718
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
1,295✔
1719
  QUERY_CHECK_CODE(code, lino, _end);
1,295!
1720
  while (1) {
×
1721
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
1,295✔
1722
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
1,295✔
1723
    QUERY_CHECK_CODE(code, lino, _end);
1,295!
1724

1725
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
1,295✔
1726
    if (!hasRemain) {
1,295✔
1727
      setOperatorCompleted(pOperator);
1,249✔
1728
      break;
1,249✔
1729
    }
1730

1731
    if (pBInfo->pRes->info.rows > 0) {
46!
1732
      break;
46✔
1733
    }
1734
  }
1735
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
1,295✔
1736

1737
_end:
1,295✔
1738
  if (code != TSDB_CODE_SUCCESS) {
1,295!
1739
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1740
    pTaskInfo->code = code;
×
1741
    T_LONG_JMP(pTaskInfo->env, code);
×
1742
  }
1743
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
1,295✔
1744
  return code;
1,295✔
1745
}
1746

1747
static int32_t resetStatewindowOperState(SOperatorInfo* pOper) {
×
1748
  SStateWindowOperatorInfo* pInfo = pOper->info;
×
1749
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
×
1750
  SStateWinodwPhysiNode* pPhynode = (SStateWinodwPhysiNode*)pOper->pPhyNode;
×
1751
  pOper->status = OP_NOT_OPENED;
×
1752

1753
  resetBasicOperatorState(&pInfo->binfo);
×
1754
  cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
×
1755
                    pInfo->cleanGroupResInfo);
×
1756

1757
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
×
1758
  int32_t code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
×
1759
  if (code == 0) {
×
1760
    code = resetAggSup(&pOper->exprSupp, &pInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
×
1761
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
×
1762
                       &pTaskInfo->storageAPI.functionStore);
1763
  }
1764
  if (code == 0) {
×
1765
    code = resetExprSupp(&pInfo->scalarSup, pTaskInfo, pPhynode->window.pExprs, NULL,
×
1766
                         &pTaskInfo->storageAPI.functionStore);
1767
  }
1768

1769
  pInfo->cleanGroupResInfo = false;
×
1770
  pInfo->hasKey = false;
×
1771

1772
  cleanupGroupResInfo(&pInfo->groupResInfo);
×
1773
  memset(pInfo->stateKey.pData, 0, pInfo->stateKey.bytes);
×
1774
  return code;
×
1775
}
1776

1777
// todo make this as an non-blocking operator
1778
int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode,
2,829✔
1779
                                      SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
1780
  QRY_PARAM_CHECK(pOptrInfo);
2,829!
1781

1782
  int32_t                   code = TSDB_CODE_SUCCESS;
2,829✔
1783
  int32_t                   lino = 0;
2,829✔
1784
  SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
2,829!
1785
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2,829!
1786
  if (pInfo == NULL || pOperator == NULL) {
2,829!
1787
    code = terrno;
×
1788
    goto _error;
×
1789
  }
1790

1791
  pOperator->pPhyNode = pStateNode;
2,829✔
1792
  pOperator->exprSupp.hasWindowOrGroup = true;
2,829✔
1793
  int32_t      tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
2,829✔
1794
  SColumnNode* pColNode = (SColumnNode*)(pStateNode->pStateKey);
2,829✔
1795

1796
  if (pStateNode->window.pExprs != NULL) {
2,829✔
1797
    int32_t    numOfScalarExpr = 0;
1,955✔
1798
    SExprInfo* pScalarExprInfo = NULL;
1,955✔
1799
    code = createExprInfo(pStateNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
1,955✔
1800
    QUERY_CHECK_CODE(code, lino, _error);
1,955!
1801

1802
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
1,955✔
1803
    if (code != TSDB_CODE_SUCCESS) {
1,955!
1804
      goto _error;
×
1805
    }
1806
  }
1807

1808
  pInfo->stateCol = extractColumnFromColumnNode(pColNode);
2,829✔
1809
  pInfo->stateKey.type = pInfo->stateCol.type;
2,829✔
1810
  pInfo->stateKey.bytes = pInfo->stateCol.bytes;
2,829✔
1811
  pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes);
2,829!
1812
  if (pInfo->stateKey.pData == NULL) {
2,829!
1813
    goto _error;
×
1814
  }
1815
  pInfo->binfo.inputTsOrder = pStateNode->window.node.inputTsOrder;
2,829✔
1816
  pInfo->binfo.outputTsOrder = pStateNode->window.node.outputTsOrder;
2,829✔
1817

1818
  code = filterInitFromNode((SNode*)pStateNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
2,829✔
1819
                            pTaskInfo->pStreamRuntimeInfo);
2,829✔
1820
  if (code != TSDB_CODE_SUCCESS) {
2,829!
1821
    goto _error;
×
1822
  }
1823

1824
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
2,829✔
1825

1826
  int32_t    num = 0;
2,829✔
1827
  SExprInfo* pExprInfo = NULL;
2,829✔
1828
  code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &num);
2,829✔
1829
  QUERY_CHECK_CODE(code, lino, _error);
2,829!
1830

1831
  initResultSizeInfo(&pOperator->resultInfo, 4096);
2,829✔
1832

1833
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
2,829✔
1834
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
2,829✔
1835
  if (code != TSDB_CODE_SUCCESS) {
2,829!
1836
    goto _error;
×
1837
  }
1838

1839
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pStateNode->window.node.pOutputDataBlockDesc);
2,829✔
1840
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
2,829!
1841
  initBasicInfo(&pInfo->binfo, pResBlock);
2,829✔
1842
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
2,829✔
1843

1844
  pInfo->twAggSup =
2,829✔
1845
      (STimeWindowAggSupp){.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};
2,829✔
1846

1847
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
2,829✔
1848
  QUERY_CHECK_CODE(code, lino, _error);
2,829!
1849

1850
  pInfo->tsSlotId = tsSlotId;
2,829✔
1851
  pInfo->pOperator = pOperator;
2,829✔
1852
  pInfo->cleanGroupResInfo = false;
2,829✔
1853
  pInfo->trueForLimit = pStateNode->trueForLimit;
2,829✔
1854
  setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
2,829✔
1855
                  pTaskInfo);
1856
  pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAggNext, NULL, destroyStateWindowOperatorInfo,
2,829✔
1857
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1858
  setOperatorResetStateFn(pOperator, resetStatewindowOperState);
2,829✔
1859

1860
  code = appendDownstream(pOperator, &downstream, 1);
2,829✔
1861
  if (code != TSDB_CODE_SUCCESS) {
2,829!
1862
    goto _error;
×
1863
  }
1864

1865
  *pOptrInfo = pOperator;
2,829✔
1866
  return TSDB_CODE_SUCCESS;
2,829✔
1867

1868
_error:
×
1869
  if (pInfo != NULL) {
×
1870
    destroyStateWindowOperatorInfo(pInfo);
×
1871
  }
1872

1873
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1874
  pTaskInfo->code = code;
×
1875
  return code;
×
1876
}
1877

1878
void destroySWindowOperatorInfo(void* param) {
1,295✔
1879
  SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param;
1,295✔
1880
  if (pInfo == NULL) {
1,295!
1881
    return;
×
1882
  }
1883

1884
  cleanupBasicInfo(&pInfo->binfo);
1,295✔
1885
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
1,295✔
1886
  if (pInfo->pOperator) {
1,295!
1887
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
1,295✔
1888
                      pInfo->cleanGroupResInfo);
1,295✔
1889
    pInfo->pOperator = NULL;
1,295✔
1890
  }
1891

1892
  cleanupAggSup(&pInfo->aggSup);
1,295✔
1893
  cleanupExprSupp(&pInfo->scalarSupp);
1,295✔
1894

1895
  cleanupGroupResInfo(&pInfo->groupResInfo);
1,295✔
1896
  taosMemoryFreeClear(param);
1,295!
1897
}
1898

1899
static int32_t resetSessionAggOperState(SOperatorInfo* pOper) {
×
1900
  SSessionAggOperatorInfo* pInfo = pOper->info;
×
1901
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
×
1902
  SSessionWinodwPhysiNode* pPhynode = (SSessionWinodwPhysiNode*)pOper->pPhyNode;
×
1903
  pOper->status = OP_NOT_OPENED;
×
1904

1905
  resetBasicOperatorState(&pInfo->binfo);
×
1906
  cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
×
1907
                    pInfo->cleanGroupResInfo);
×
1908

1909
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
×
1910
  int32_t code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
×
1911
  if (code == 0) {
×
1912
    code = resetAggSup(&pOper->exprSupp, &pInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
×
1913
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
×
1914
                       &pTaskInfo->storageAPI.functionStore);
1915
  }
1916
  if (code == 0) {
×
1917
    code = resetExprSupp(&pInfo->scalarSupp, pTaskInfo, pPhynode->window.pExprs, NULL,
×
1918
                         &pTaskInfo->storageAPI.functionStore);
1919
  }
1920

1921
  pInfo->cleanGroupResInfo = false;
×
1922
  pInfo->winSup = (SWindowRowsSup){0};
×
1923
  pInfo->winSup.prevTs = INT64_MIN;
×
1924
  pInfo->reptScan = false;
×
1925

1926
  cleanupGroupResInfo(&pInfo->groupResInfo);
×
1927
  return code;
×
1928
}
1929

1930
int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
1,295✔
1931
                                     SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
1932
  QRY_PARAM_CHECK(pOptrInfo);
1,295!
1933

1934
  int32_t                  code = TSDB_CODE_SUCCESS;
1,295✔
1935
  int32_t                  lino = 0;
1,295✔
1936
  SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
1,295!
1937
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1,295!
1938
  if (pInfo == NULL || pOperator == NULL) {
1,295!
1939
    code = terrno;
×
1940
    goto _error;
×
1941
  }
1942

1943
  pOperator->pPhyNode = pSessionNode;
1,295✔
1944
  pOperator->exprSupp.hasWindowOrGroup = true;
1,295✔
1945

1946
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
1,295✔
1947
  initResultSizeInfo(&pOperator->resultInfo, 4096);
1,295✔
1948

1949
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc);
1,295✔
1950
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
1,295!
1951
  initBasicInfo(&pInfo->binfo, pResBlock);
1,295✔
1952

1953
  int32_t    numOfCols = 0;
1,295✔
1954
  SExprInfo* pExprInfo = NULL;
1,295✔
1955
  code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
1,295✔
1956
  QUERY_CHECK_CODE(code, lino, _error);
1,295!
1957

1958
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
1,295✔
1959
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
1,295✔
1960
  QUERY_CHECK_CODE(code, lino, _error);
1,295!
1961

1962
  pInfo->twAggSup.waterMark = pSessionNode->window.watermark;
1,295✔
1963
  pInfo->twAggSup.calTrigger = pSessionNode->window.triggerType;
1,295✔
1964
  pInfo->gap = pSessionNode->gap;
1,295✔
1965

1966
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
1,295✔
1967
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
1,295✔
1968
  QUERY_CHECK_CODE(code, lino, _error);
1,295!
1969

1970
  pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
1,295✔
1971
  pInfo->binfo.pRes = pResBlock;
1,295✔
1972
  pInfo->winSup.prevTs = INT64_MIN;
1,295✔
1973
  pInfo->reptScan = false;
1,295✔
1974
  pInfo->binfo.inputTsOrder = pSessionNode->window.node.inputTsOrder;
1,295✔
1975
  pInfo->binfo.outputTsOrder = pSessionNode->window.node.outputTsOrder;
1,295✔
1976

1977
  if (pSessionNode->window.pExprs != NULL) {
1,295✔
1978
    int32_t    numOfScalar = 0;
1✔
1979
    SExprInfo* pScalarExprInfo = NULL;
1✔
1980
    code = createExprInfo(pSessionNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
1✔
1981
    QUERY_CHECK_CODE(code, lino, _error);
1!
1982

1983
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
1✔
1984
    QUERY_CHECK_CODE(code, lino, _error);
1!
1985
  }
1986

1987
  code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
1,295✔
1988
                            pTaskInfo->pStreamRuntimeInfo);
1,295✔
1989
  QUERY_CHECK_CODE(code, lino, _error);
1,295!
1990

1991
  pInfo->pOperator = pOperator;
1,295✔
1992
  pInfo->cleanGroupResInfo = false;
1,295✔
1993
  setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED,
1,295✔
1994
                  pInfo, pTaskInfo);
1995
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSessionWindowAggNext, NULL, destroySWindowOperatorInfo,
1,295✔
1996
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1997
  pOperator->pTaskInfo = pTaskInfo;
1,295✔
1998
  setOperatorResetStateFn(pOperator, resetSessionAggOperState);
1,295✔
1999

2000
  code = appendDownstream(pOperator, &downstream, 1);
1,295✔
2001
  QUERY_CHECK_CODE(code, lino, _error);
1,295!
2002

2003
  *pOptrInfo = pOperator;
1,295✔
2004
  return TSDB_CODE_SUCCESS;
1,295✔
2005

2006
_error:
×
2007
  if (pInfo != NULL) destroySWindowOperatorInfo(pInfo);
×
2008
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2009
  pTaskInfo->code = code;
×
2010
  return code;
×
2011
}
2012

2013
void destroyMAIOperatorInfo(void* param) {
2,502✔
2014
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = (SMergeAlignedIntervalAggOperatorInfo*)param;
2,502✔
2015
  destroyIntervalOperatorInfo(miaInfo->intervalAggOperatorInfo);
2,502✔
2016
  taosMemoryFreeClear(param);
2,502!
2017
}
2,502✔
2018

2019
static SResultRow* doSetSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, SAggSupporter* pSup) {
1,951✔
2020
  SResultRow* pResult = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
1,951✔
2021
  if (NULL == pResult) {
1,951!
2022
    return pResult;
×
2023
  }
2024
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
1,951✔
2025
  return pResult;
1,951✔
2026
}
2027

2028
static int32_t setSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult,
3,685,877✔
2029
                                       SExprSupp* pExprSup, SAggSupporter* pAggSup) {
2030
  if (*pResult == NULL) {
3,685,877✔
2031
    *pResult = doSetSingleOutputTupleBuf(pResultRowInfo, pAggSup);
1,951✔
2032
    if (*pResult == NULL) {
1,951!
2033
      return terrno;
×
2034
    }
2035
  }
2036

2037
  // set time window for current result
2038
  (*pResult)->win = (*win);
3,685,877✔
2039
  return setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
3,685,877✔
2040
}
2041

2042
static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,
16,424✔
2043
                                          SSDataBlock* pBlock, SSDataBlock* pResultBlock) {
2044
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
16,424✔
2045
  SIntervalAggOperatorInfo*             iaInfo = miaInfo->intervalAggOperatorInfo;
16,424✔
2046

2047
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
16,424✔
2048
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
16,424✔
2049
  SInterval*     pInterval = &iaInfo->interval;
16,424✔
2050

2051
  int32_t  startPos = 0;
16,424✔
2052
  int64_t* tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
16,424✔
2053

2054
  TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
16,424✔
2055

2056
  // there is an result exists
2057
  if (miaInfo->curTs != INT64_MIN) {
16,424✔
2058
    if (ts != miaInfo->curTs) {
2,307✔
2059
      finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
1,924✔
2060
      resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
1,924✔
2061
      miaInfo->curTs = ts;
1,924✔
2062
    }
2063
  } else {
2064
    miaInfo->curTs = ts;
14,117✔
2065
  }
2066

2067
  STimeWindow win = {0};
16,424✔
2068
  win.skey = miaInfo->curTs;
16,424✔
2069
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
16,424✔
2070

2071
  int32_t ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
16,424✔
2072
  if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
16,424!
2073
    T_LONG_JMP(pTaskInfo->env, ret);
×
2074
  }
2075

2076
  int32_t currPos = startPos;
16,424✔
2077

2078
  STimeWindow currWin = win;
16,424✔
2079
  while (++currPos < pBlock->info.rows) {
5,915,469✔
2080
    if (tsCols[currPos] == miaInfo->curTs) {
5,899,165✔
2081
      continue;
2,230,297✔
2082
    }
2083

2084
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
3,668,868✔
2085
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
3,669,004✔
2086
                                          currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
3,669,004✔
2087
    if (ret != TSDB_CODE_SUCCESS) {
3,669,818!
2088
      T_LONG_JMP(pTaskInfo->env, ret);
×
2089
    }
2090

2091
    finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
3,669,818✔
2092
    resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
3,667,771✔
2093
    miaInfo->curTs = tsCols[currPos];
3,668,087✔
2094

2095
    currWin.skey = miaInfo->curTs;
3,668,087✔
2096
    currWin.ekey =
3,669,232✔
2097
        taosTimeAdd(currWin.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
3,668,087✔
2098

2099
    startPos = currPos;
3,669,232✔
2100
    ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
3,669,232✔
2101
    if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
3,668,741!
2102
      T_LONG_JMP(pTaskInfo->env, ret);
×
2103
    }
2104

2105
    miaInfo->curTs = currWin.skey;
3,668,748✔
2106
  }
2107

2108
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
16,304✔
2109
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
16,424✔
2110
                                        currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
16,424✔
2111
  if (ret != TSDB_CODE_SUCCESS) {
16,424!
2112
    T_LONG_JMP(pTaskInfo->env, ret);
×
2113
  }
2114
}
16,424✔
2115

2116
static void cleanupAfterGroupResultGen(SMergeAlignedIntervalAggOperatorInfo* pMiaInfo, SSDataBlock* pRes) {
14,115✔
2117
  pRes->info.id.groupId = pMiaInfo->groupId;
14,115✔
2118
  pMiaInfo->curTs = INT64_MIN;
14,115✔
2119
  pMiaInfo->groupId = 0;
14,115✔
2120
}
14,115✔
2121

2122
static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
16,617✔
2123
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
16,617✔
2124
  int32_t                               code = TSDB_CODE_SUCCESS;
16,617✔
2125
  int32_t                               lino = 0;
16,617✔
2126
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
16,617✔
2127
  SIntervalAggOperatorInfo*             pIaInfo = pMiaInfo->intervalAggOperatorInfo;
16,617✔
2128

2129
  SExprSupp*      pSup = &pOperator->exprSupp;
16,617✔
2130
  SSDataBlock*    pRes = pIaInfo->binfo.pRes;
16,617✔
2131
  SResultRowInfo* pResultRowInfo = &pIaInfo->binfo.resultRowInfo;
16,617✔
2132
  SOperatorInfo*  downstream = pOperator->pDownstream[0];
16,617✔
2133

2134
  while (1) {
14,442✔
2135
    SSDataBlock* pBlock = NULL;
31,059✔
2136
    if (pMiaInfo->prefetchedBlock == NULL) {
31,059✔
2137
      pBlock = getNextBlockFromDownstream(pOperator, 0);
18,893✔
2138
    } else {
2139
      pBlock = pMiaInfo->prefetchedBlock;
12,166✔
2140
      pMiaInfo->prefetchedBlock = NULL;
12,166✔
2141

2142
      pMiaInfo->groupId = pBlock->info.id.groupId;
12,166✔
2143
    }
2144

2145
    // no data exists, all query processing is done
2146
    if (pBlock == NULL) {
31,059✔
2147
      // close last unclosed time window
2148
      if (pMiaInfo->curTs != INT64_MIN) {
2,469✔
2149
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
1,949✔
2150
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
1,949✔
2151
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
1,949✔
2152
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
1,949✔
2153
        QUERY_CHECK_CODE(code, lino, _end);
1,949!
2154
      }
2155

2156
      setOperatorCompleted(pOperator);
2,469✔
2157
      break;
2,469✔
2158
    }
2159

2160
    if (pMiaInfo->groupId == 0) {
28,590✔
2161
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
2,532✔
2162
        pMiaInfo->groupId = pBlock->info.id.groupId;
647✔
2163
        pRes->info.id.groupId = pMiaInfo->groupId;
647✔
2164
      }
2165
    } else {
2166
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
26,058✔
2167
        // if there are unclosed time window, close it firstly.
2168
        if (pMiaInfo->curTs == INT64_MIN) {
12,166!
2169
          pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2170
          T_LONG_JMP(pTaskInfo->env, terrno);
×
2171
        }
2172
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
12,166✔
2173
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
12,166✔
2174

2175
        pMiaInfo->prefetchedBlock = pBlock;
12,166✔
2176
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
12,166✔
2177
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
12,166✔
2178
        QUERY_CHECK_CODE(code, lino, _end);
12,166!
2179
        if (pRes->info.rows == 0) {
12,166✔
2180
          // After filtering for last group, the result is empty, so we need to continue to process next group
2181
          continue;
49✔
2182
        } else {
2183
          break;
12,117✔
2184
        }
2185
      } else {
2186
        // continue
2187
        pRes->info.id.groupId = pMiaInfo->groupId;
13,892✔
2188
      }
2189
    }
2190

2191
    pRes->info.scanFlag = pBlock->info.scanFlag;
16,424✔
2192
    code = setInputDataBlock(pSup, pBlock, pIaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
16,424✔
2193
    QUERY_CHECK_CODE(code, lino, _end);
16,424!
2194

2195
    doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes);
16,424✔
2196

2197
    code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
16,424✔
2198
    QUERY_CHECK_CODE(code, lino, _end);
16,424!
2199

2200
    if (pRes->info.rows >= pOperator->resultInfo.capacity) {
16,424✔
2201
      break;
2,031✔
2202
    }
2203
  }
2204

2205
_end:
16,617✔
2206
  if (code != TSDB_CODE_SUCCESS) {
16,617!
2207
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2208
    pTaskInfo->code = code;
×
2209
    T_LONG_JMP(pTaskInfo->env, code);
×
2210
  }
2211
}
16,617✔
2212

2213
static int32_t mergeAlignedIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
16,070✔
2214
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
16,070✔
2215
  int32_t                               code = TSDB_CODE_SUCCESS;
16,070✔
2216
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
16,070✔
2217
  SIntervalAggOperatorInfo*             iaInfo = pMiaInfo->intervalAggOperatorInfo;
16,070✔
2218
  if (pOperator->status == OP_EXEC_DONE) {
16,070✔
2219
    (*ppRes) = NULL;
2,124✔
2220
    return code;
2,124✔
2221
  }
2222

2223
  SSDataBlock* pRes = iaInfo->binfo.pRes;
13,946✔
2224
  blockDataCleanup(pRes);
13,946✔
2225

2226
  if (iaInfo->binfo.mergeResultBlock) {
13,946✔
2227
    while (1) {
2228
      if (pOperator->status == OP_EXEC_DONE) {
12,883✔
2229
        break;
1,077✔
2230
      }
2231

2232
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
11,806✔
2233
        break;
4,029✔
2234
      }
2235

2236
      doMergeAlignedIntervalAgg(pOperator);
7,777✔
2237
    }
2238
  } else {
2239
    doMergeAlignedIntervalAgg(pOperator);
8,840✔
2240
  }
2241

2242
  size_t rows = pRes->info.rows;
13,946✔
2243
  pOperator->resultInfo.totalRows += rows;
13,946✔
2244
  (*ppRes) = (rows == 0) ? NULL : pRes;
13,946✔
2245
  return code;
13,946✔
2246
}
2247

2248
static int32_t resetMergeAlignedIntervalOperState(SOperatorInfo* pOper) {
×
2249
  SMergeAlignedIntervalAggOperatorInfo* pInfo = pOper->info;
×
2250
  
2251
  uint64_t     groupId;  // current groupId
2252
  int64_t      curTs;    // current ts
2253
  SSDataBlock* prefetchedBlock;
2254
  SResultRow*  pResultRow;
2255

2256
  pInfo->groupId = 0;
×
2257
  pInfo->curTs = INT64_MIN;
×
2258
  pInfo->prefetchedBlock = NULL;
×
2259
  pInfo->pResultRow = NULL;
×
2260

2261
  return resetInterval(pOper, pInfo->intervalAggOperatorInfo);
×
2262
}
2263

2264
int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
2,502✔
2265
                                               SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2266
  QRY_PARAM_CHECK(pOptrInfo);
2,502!
2267

2268
  int32_t                               code = TSDB_CODE_SUCCESS;
2,502✔
2269
  int32_t                               lino = 0;
2,502✔
2270
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedIntervalAggOperatorInfo));
2,502!
2271
  SOperatorInfo*                        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2,502!
2272
  if (miaInfo == NULL || pOperator == NULL) {
2,502!
2273
    code = terrno;
×
2274
    goto _error;
×
2275
  }
2276

2277
  pOperator->pPhyNode = pNode;
2,502✔
2278
  miaInfo->intervalAggOperatorInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
2,502!
2279
  if (miaInfo->intervalAggOperatorInfo == NULL) {
2,502!
2280
    code = terrno;
×
2281
    goto _error;
×
2282
  }
2283

2284
  SInterval interval = {.interval = pNode->interval,
2,502✔
2285
                        .sliding = pNode->sliding,
2,502✔
2286
                        .intervalUnit = pNode->intervalUnit,
2,502✔
2287
                        .slidingUnit = pNode->slidingUnit,
2,502✔
2288
                        .offset = pNode->offset,
2,502✔
2289
                        .precision = ((SColumnNode*)pNode->window.pTspk)->node.resType.precision,
2,502✔
2290
                        .timeRange = pNode->timeRange};
2291
  calcIntervalAutoOffset(&interval);
2,502✔
2292

2293
  SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
2,502✔
2294
  SExprSupp*                pSup = &pOperator->exprSupp;
2,502✔
2295
  pSup->hasWindowOrGroup = true;
2,502✔
2296

2297
  code = filterInitFromNode((SNode*)pNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
2,502✔
2298
                            pTaskInfo->pStreamRuntimeInfo);
2,502✔
2299
  QUERY_CHECK_CODE(code, lino, _error);
2,502!
2300

2301
  miaInfo->curTs = INT64_MIN;
2,502✔
2302
  iaInfo->win = pTaskInfo->window;
2,502✔
2303
  iaInfo->binfo.inputTsOrder = pNode->window.node.inputTsOrder;
2,502✔
2304
  iaInfo->binfo.outputTsOrder = pNode->window.node.outputTsOrder;
2,502✔
2305
  iaInfo->interval = interval;
2,502✔
2306
  iaInfo->primaryTsIndex = ((SColumnNode*)pNode->window.pTspk)->slotId;
2,502✔
2307
  iaInfo->binfo.mergeResultBlock = pNode->window.mergeDataBlock;
2,502✔
2308

2309
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
2,502✔
2310
  initResultSizeInfo(&pOperator->resultInfo, 512);
2,502✔
2311

2312
  int32_t    num = 0;
2,502✔
2313
  SExprInfo* pExprInfo = NULL;
2,502✔
2314
  code = createExprInfo(pNode->window.pFuncs, NULL, &pExprInfo, &num);
2,502✔
2315
  QUERY_CHECK_CODE(code, lino, _error);
2,502!
2316

2317
  code = initAggSup(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
2,502✔
2318
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
2,502✔
2319
  QUERY_CHECK_CODE(code, lino, _error);
2,502!
2320

2321
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pNode->window.node.pOutputDataBlockDesc);
2,502✔
2322
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
2,502!
2323
  initBasicInfo(&iaInfo->binfo, pResBlock);
2,502✔
2324
  code = initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
2,502✔
2325
  QUERY_CHECK_CODE(code, lino, _error);
2,502!
2326

2327
  iaInfo->timeWindowInterpo = false;
2,502✔
2328
  code = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo, &iaInfo->timeWindowInterpo);
2,502✔
2329
  QUERY_CHECK_CODE(code, lino, _error);
2,502!
2330
  if (iaInfo->timeWindowInterpo) {
2,502!
2331
    iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
2332
  }
2333

2334
  initResultRowInfo(&iaInfo->binfo.resultRowInfo);
2,502✔
2335
  code = blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity);
2,502✔
2336
  QUERY_CHECK_CODE(code, lino, _error);
2,502!
2337
  iaInfo->pOperator = pOperator;
2,502✔
2338
  setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL,
2,502✔
2339
                  false, OP_NOT_OPENED, miaInfo, pTaskInfo);
2340

2341
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAggNext, NULL, destroyMAIOperatorInfo,
2,502✔
2342
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2343
  setOperatorResetStateFn(pOperator, resetMergeAlignedIntervalOperState);
2,502✔
2344

2345
  code = appendDownstream(pOperator, &downstream, 1);
2,502✔
2346
  QUERY_CHECK_CODE(code, lino, _error);
2,502!
2347

2348
  *pOptrInfo = pOperator;
2,502✔
2349
  return TSDB_CODE_SUCCESS;
2,502✔
2350

2351
_error:
×
2352
  if (miaInfo != NULL) destroyMAIOperatorInfo(miaInfo);
×
2353
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2354
  pTaskInfo->code = code;
×
2355
  return code;
×
2356
}
2357

2358
//=====================================================================================================================
2359
// merge interval operator
2360
typedef struct SMergeIntervalAggOperatorInfo {
2361
  SIntervalAggOperatorInfo intervalAggOperatorInfo;
2362
  SList*                   groupIntervals;
2363
  SListIter                groupIntervalsIter;
2364
  bool                     hasGroupId;
2365
  uint64_t                 groupId;
2366
  SSDataBlock*             prefetchedBlock;
2367
  bool                     inputBlocksFinished;
2368
} SMergeIntervalAggOperatorInfo;
2369

2370
typedef struct SGroupTimeWindow {
2371
  uint64_t    groupId;
2372
  STimeWindow window;
2373
} SGroupTimeWindow;
2374

2375
void destroyMergeIntervalOperatorInfo(void* param) {
×
2376
  SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
×
2377
  miaInfo->groupIntervals = tdListFree(miaInfo->groupIntervals);
×
2378
  destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo);
×
2379

2380
  taosMemoryFreeClear(param);
×
2381
}
×
2382

2383
static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock,
×
2384
                                        STimeWindow* newWin) {
2385
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
2386
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2387
  bool                           ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
2388

2389
  SGroupTimeWindow groupTimeWindow = {.groupId = tableGroupId, .window = *newWin};
×
2390
  int32_t          code = tdListAppend(miaInfo->groupIntervals, &groupTimeWindow);
×
2391
  if (code != TSDB_CODE_SUCCESS) {
×
2392
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2393
    return code;
×
2394
  }
2395

2396
  SListIter iter = {0};
×
2397
  tdListInitIter(miaInfo->groupIntervals, &iter, TD_LIST_FORWARD);
×
2398
  SListNode* listNode = NULL;
×
2399
  while ((listNode = tdListNext(&iter)) != NULL) {
×
2400
    SGroupTimeWindow* prevGrpWin = (SGroupTimeWindow*)listNode->data;
×
2401
    if (prevGrpWin->groupId != tableGroupId) {
×
2402
      continue;
×
2403
    }
2404

2405
    STimeWindow* prevWin = &prevGrpWin->window;
×
2406
    if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) {
×
2407
      SListNode* tmp = tdListPopNode(miaInfo->groupIntervals, listNode);
×
2408
      taosMemoryFreeClear(tmp);
×
2409
    }
2410
  }
2411

2412
  return TSDB_CODE_SUCCESS;
×
2413
}
2414

2415
static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
×
2416
                                   int32_t scanFlag, SSDataBlock* pResultBlock) {
2417
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
2418
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2419

2420
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
×
2421
  SExprSupp*     pExprSup = &pOperatorInfo->exprSupp;
×
2422

2423
  int32_t     startPos = 0;
×
2424
  int32_t     numOfOutput = pExprSup->numOfExprs;
×
2425
  int64_t*    tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
×
2426
  uint64_t    tableGroupId = pBlock->info.id.groupId;
×
2427
  bool        ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
2428
  TSKEY       blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
×
2429
  SResultRow* pResult = NULL;
×
2430

2431
  STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
×
2432
                                        iaInfo->binfo.inputTsOrder);
2433

2434
  int32_t ret =
2435
      setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
2436
                             numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2437
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
×
2438
    T_LONG_JMP(pTaskInfo->env, ret);
×
2439
  }
2440

2441
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
×
2442
  int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
2443
                                                 iaInfo->binfo.inputTsOrder);
2444
  if (forwardRows <= 0) {
×
2445
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2446
  }
2447

2448
  // prev time window not interpolation yet.
2449
  if (iaInfo->timeWindowInterpo) {
×
2450
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
×
2451
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
×
2452

2453
    // restore current time window
2454
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
2455
                                 numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2456
    if (ret != TSDB_CODE_SUCCESS) {
×
2457
      T_LONG_JMP(pTaskInfo->env, ret);
×
2458
    }
2459

2460
    // window start key interpolation
2461
    ret = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &win, startPos, forwardRows, pExprSup);
×
2462
    if (ret != TSDB_CODE_SUCCESS) {
×
2463
      T_LONG_JMP(pTaskInfo->env, ret);
×
2464
    }
2465
  }
2466

2467
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, 1);
×
2468
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
×
2469
                                        forwardRows, pBlock->info.rows, numOfOutput);
×
2470
  if (ret != TSDB_CODE_SUCCESS) {
×
2471
    T_LONG_JMP(pTaskInfo->env, ret);
×
2472
  }
2473
  doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
2474

2475
  // output previous interval results after this interval (&win) is closed
2476
  int32_t code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &win);
×
2477
  if (code != TSDB_CODE_SUCCESS) {
×
2478
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2479
    T_LONG_JMP(pTaskInfo->env, code);
×
2480
  }
2481

2482
  STimeWindow nextWin = win;
×
2483
  while (1) {
×
2484
    int32_t prevEndPos = forwardRows - 1 + startPos;
×
2485
    startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos,
×
2486
                                      iaInfo->binfo.inputTsOrder);
2487
    if (startPos < 0) {
×
2488
      break;
×
2489
    }
2490

2491
    // null data, failed to allocate more memory buffer
2492
    code =
2493
        setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
×
2494
                               pExprSup->pCtx, numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2495
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
×
2496
      T_LONG_JMP(pTaskInfo->env, code);
×
2497
    }
2498

2499
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
×
2500
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
2501
                                           iaInfo->binfo.inputTsOrder);
2502

2503
    // window start(end) key interpolation
2504
    code = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup);
×
2505
    if (code != TSDB_CODE_SUCCESS) {
×
2506
      T_LONG_JMP(pTaskInfo->env, code);
×
2507
    }
2508

2509
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, 1);
×
2510
    code = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
×
2511
                                           forwardRows, pBlock->info.rows, numOfOutput);
×
2512
    if (code != TSDB_CODE_SUCCESS) {
×
2513
      T_LONG_JMP(pTaskInfo->env, code);
×
2514
    }
2515
    doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
2516

2517
    // output previous interval results after this interval (&nextWin) is closed
2518
    code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &nextWin);
×
2519
    if (code != TSDB_CODE_SUCCESS) {
×
2520
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2521
      T_LONG_JMP(pTaskInfo->env, code);
×
2522
    }
2523
  }
2524

2525
  if (iaInfo->timeWindowInterpo) {
×
2526
    saveDataBlockLastRow(iaInfo->pPrevValues, pBlock, iaInfo->pInterpCols);
×
2527
  }
2528
}
×
2529

2530
static int32_t doMergeIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
2531
  int32_t        code = TSDB_CODE_SUCCESS;
×
2532
  int32_t        lino = 0;
×
2533
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
×
2534

2535
  SMergeIntervalAggOperatorInfo* miaInfo = pOperator->info;
×
2536
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2537
  SExprSupp*                     pExpSupp = &pOperator->exprSupp;
×
2538

2539
  if (pOperator->status == OP_EXEC_DONE) {
×
2540
    (*ppRes) = NULL;
×
2541
    return code;
×
2542
  }
2543

2544
  SSDataBlock* pRes = iaInfo->binfo.pRes;
×
2545
  blockDataCleanup(pRes);
×
2546
  code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
×
2547
  QUERY_CHECK_CODE(code, lino, _end);
×
2548

2549
  if (!miaInfo->inputBlocksFinished) {
×
2550
    SOperatorInfo* downstream = pOperator->pDownstream[0];
×
2551
    while (1) {
×
2552
      SSDataBlock* pBlock = NULL;
×
2553
      if (miaInfo->prefetchedBlock == NULL) {
×
2554
        pBlock = getNextBlockFromDownstream(pOperator, 0);
×
2555
      } else {
2556
        pBlock = miaInfo->prefetchedBlock;
×
2557
        miaInfo->groupId = pBlock->info.id.groupId;
×
2558
        miaInfo->prefetchedBlock = NULL;
×
2559
      }
2560

2561
      if (pBlock == NULL) {
×
2562
        tdListInitIter(miaInfo->groupIntervals, &miaInfo->groupIntervalsIter, TD_LIST_FORWARD);
×
2563
        miaInfo->inputBlocksFinished = true;
×
2564
        break;
×
2565
      }
2566

2567
      if (!miaInfo->hasGroupId) {
×
2568
        miaInfo->hasGroupId = true;
×
2569
        miaInfo->groupId = pBlock->info.id.groupId;
×
2570
      } else if (miaInfo->groupId != pBlock->info.id.groupId) {
×
2571
        miaInfo->prefetchedBlock = pBlock;
×
2572
        break;
×
2573
      }
2574

2575
      pRes->info.scanFlag = pBlock->info.scanFlag;
×
2576
      code = setInputDataBlock(pExpSupp, pBlock, iaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
×
2577
      QUERY_CHECK_CODE(code, lino, _end);
×
2578

2579
      doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, pBlock->info.scanFlag, pRes);
×
2580

2581
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
×
2582
        break;
×
2583
      }
2584
    }
2585

2586
    pRes->info.id.groupId = miaInfo->groupId;
×
2587
  }
2588

2589
  if (miaInfo->inputBlocksFinished) {
×
2590
    SListNode* listNode = tdListNext(&miaInfo->groupIntervalsIter);
×
2591

2592
    if (listNode != NULL) {
×
2593
      SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data);
×
2594
      pRes->info.id.groupId = grpWin->groupId;
×
2595
    }
2596
  }
2597

2598
  if (pRes->info.rows == 0) {
×
2599
    setOperatorCompleted(pOperator);
×
2600
  }
2601

2602
  size_t rows = pRes->info.rows;
×
2603
  pOperator->resultInfo.totalRows += rows;
×
2604

2605
_end:
×
2606
  if (code != TSDB_CODE_SUCCESS) {
×
2607
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2608
    pTaskInfo->code = code;
×
2609
    T_LONG_JMP(pTaskInfo->env, code);
×
2610
  }
2611
  (*ppRes) = (rows == 0) ? NULL : pRes;
×
2612
  return code;
×
2613
}
2614

2615
static int32_t resetMergeIntervalOperState(SOperatorInfo* pOper) {
×
2616
  SMergeIntervalAggOperatorInfo* pInfo = pOper->info;
×
2617

2618
  pInfo->hasGroupId = false;
×
2619
  pInfo->groupId = 0;
×
2620
  pInfo->prefetchedBlock = NULL;
×
2621
  pInfo->inputBlocksFinished = false;
×
2622
  tdListEmpty(pInfo->groupIntervals);
×
2623
  
2624
  SIntervalAggOperatorInfo* pIntervalInfo = &pInfo->intervalAggOperatorInfo;
×
2625
  return resetInterval(pOper, pIntervalInfo);
×
2626
}
2627

2628
int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
×
2629
                                        SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2630
  QRY_PARAM_CHECK(pOptrInfo);
×
2631

2632
  int32_t                        code = TSDB_CODE_SUCCESS;
×
2633
  int32_t                        lino = 0;
×
2634
  SMergeIntervalAggOperatorInfo* pMergeIntervalInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo));
×
2635
  SOperatorInfo*                 pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
2636
  if (pMergeIntervalInfo == NULL || pOperator == NULL) {
×
2637
    code = terrno;
×
2638
    goto _error;
×
2639
  }
2640

2641
  pOperator->pPhyNode = pIntervalPhyNode;
×
2642
  SInterval interval = {.interval = pIntervalPhyNode->interval,
×
2643
                        .sliding = pIntervalPhyNode->sliding,
×
2644
                        .intervalUnit = pIntervalPhyNode->intervalUnit,
×
2645
                        .slidingUnit = pIntervalPhyNode->slidingUnit,
×
2646
                        .offset = pIntervalPhyNode->offset,
×
2647
                        .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision,
×
2648
                        .timeRange = pIntervalPhyNode->timeRange};
2649
  calcIntervalAutoOffset(&interval);
×
2650

2651
  pMergeIntervalInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow));
×
2652

2653
  SIntervalAggOperatorInfo* pIntervalInfo = &pMergeIntervalInfo->intervalAggOperatorInfo;
×
2654
  pIntervalInfo->win = pTaskInfo->window;
×
2655
  pIntervalInfo->binfo.inputTsOrder = pIntervalPhyNode->window.node.inputTsOrder;
×
2656
  pIntervalInfo->interval = interval;
×
2657
  pIntervalInfo->binfo.mergeResultBlock = pIntervalPhyNode->window.mergeDataBlock;
×
2658
  pIntervalInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
×
2659
  pIntervalInfo->binfo.outputTsOrder = pIntervalPhyNode->window.node.outputTsOrder;
×
2660

2661
  SExprSupp* pExprSupp = &pOperator->exprSupp;
×
2662
  pExprSupp->hasWindowOrGroup = true;
×
2663

2664
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
×
2665
  initResultSizeInfo(&pOperator->resultInfo, 4096);
×
2666

2667
  int32_t    num = 0;
×
2668
  SExprInfo* pExprInfo = NULL;
×
2669
  code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
×
2670
  QUERY_CHECK_CODE(code, lino, _error);
×
2671

2672
  code = initAggSup(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
×
2673
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
×
2674
  if (code != TSDB_CODE_SUCCESS) {
×
2675
    goto _error;
×
2676
  }
2677

2678
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pIntervalPhyNode->window.node.pOutputDataBlockDesc);
×
2679
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
×
2680
  initBasicInfo(&pIntervalInfo->binfo, pResBlock);
×
2681
  code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win);
×
2682
  QUERY_CHECK_CODE(code, lino, _error);
×
2683

2684
  pIntervalInfo->timeWindowInterpo = false;
×
2685
  code = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo, &pIntervalInfo->timeWindowInterpo);
×
2686
  QUERY_CHECK_CODE(code, lino, _error);
×
2687
  if (pIntervalInfo->timeWindowInterpo) {
×
2688
    pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
2689
    if (pIntervalInfo->binfo.resultRowInfo.openWindow == NULL) {
×
2690
      goto _error;
×
2691
    }
2692
  }
2693

2694
  pIntervalInfo->pOperator = pOperator;
×
2695
  initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo);
×
2696
  setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false,
×
2697
                  OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo);
2698
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAggNext, NULL, destroyMergeIntervalOperatorInfo,
×
2699
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2700
  setOperatorResetStateFn(pOperator, resetMergeIntervalOperState);
×
2701

2702
  code = appendDownstream(pOperator, &downstream, 1);
×
2703
  if (code != TSDB_CODE_SUCCESS) {
×
2704
    goto _error;
×
2705
  }
2706

2707
  *pOptrInfo = pOperator;
×
2708
  return TSDB_CODE_SUCCESS;
×
2709
_error:
×
2710
  if (pMergeIntervalInfo != NULL) {
×
2711
    destroyMergeIntervalOperatorInfo(pMergeIntervalInfo);
×
2712
  }
2713
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2714
  pTaskInfo->code = code;
×
2715
  return code;
×
2716
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc