• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4811

16 Oct 2025 11:40AM UTC coverage: 58.693% (+0.2%) from 58.518%
#4811

push

travis-ci

web-flow
fix(tref): increase TSDB_REF_OBJECTS from 100 to 2000 for improved reference handling (#33281)

139835 of 303532 branches covered (46.07%)

Branch coverage included in aggregate %.

211576 of 295200 relevant lines covered (71.67%)

16841075.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.47
/source/libs/executor/src/timewindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "executorInt.h"
16
#include "filter.h"
17
#include "function.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "taoserror.h"
23
#include "tchecksum.h"
24
#include "tcommon.h"
25
#include "tcompare.h"
26
#include "tdatablock.h"
27
#include "tfill.h"
28
#include "tglobal.h"
29
#include "tlog.h"
30
#include "ttime.h"
31

32
typedef enum SResultTsInterpType {
33
  RESULT_ROW_START_INTERP = 1,
34
  RESULT_ROW_END_INTERP = 2,
35
} SResultTsInterpType;
36

37
typedef struct SOpenWindowInfo {
38
  SResultRowPosition pos;
39
  uint64_t           groupId;
40
} SOpenWindowInfo;
41

42
static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo);
43

44
static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult,
45
                                              uint64_t groupId, SExecTaskInfo* pTaskInfo);
46
static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult);
47

48
static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, bool masterscan,
181,908,039✔
49
                                      SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
50
                                      int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
51
                                      SExecTaskInfo* pTaskInfo) {
52
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
181,908,039✔
53
                                                  masterscan, tableGroupId, pTaskInfo, true, pAggSup, true);
54

55
  if (pResultRow == NULL || pTaskInfo->code != 0) {
182,007,250!
56
    *pResult = NULL;
×
57
    qError("failed to set result output buffer, error:%s", tstrerror(pTaskInfo->code));
×
58
    return pTaskInfo->code;
×
59
  }
60

61
  // set time window for current result
62
  TAOS_SET_POBJ_ALIGNED(&pResultRow->win, win);
182,014,621✔
63
  *pResult = pResultRow;
182,014,621✔
64
  return setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
182,014,621✔
65
}
66

67
void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, int32_t rowIndex, uint64_t groupId) {
153,363,849✔
68
  pRowSup->win.ekey = ts;
153,363,849✔
69
  pRowSup->prevTs = ts;
153,363,849✔
70
  pRowSup->groupId = groupId;
153,363,849✔
71
  pRowSup->numOfRows += 1;
153,363,849✔
72
  if (hasContinuousNullRows(pRowSup)) {
153,363,849✔
73
    // rows having null state col are wrapped by rows of same state
74
    pRowSup->numOfRows += pRowSup->numNullRows;
87,744✔
75
    resetNumNullRows(pRowSup);
87,744✔
76
  }
77
}
153,355,102✔
78

79
void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, uint64_t groupId) {
103,376,951✔
80
  pRowSup->startRowIndex = rowIndex;
103,376,951✔
81
  pRowSup->numOfRows = 0;
103,376,951✔
82
  pRowSup->win.skey = tsList[rowIndex];
103,376,951✔
83
  pRowSup->groupId = groupId;
103,376,951✔
84
}
103,376,951✔
85

86
FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos,
×
87
                                            int32_t order, int64_t* pData) {
88
  int32_t forwardRows = 0;
147,333,154✔
89

90
  if (order == TSDB_ORDER_ASC) {
×
91
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
100,456,533✔
92
    if (end >= 0) {
100,374,825!
93
      forwardRows = end;
100,382,772✔
94

95
      while (pData[end + pos] == ekey) {
102,350,362!
96
        forwardRows += 1;
1,967,590✔
97
        ++pos;
1,967,590✔
98
      }
99
    }
100
  } else {
101
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
46,876,621✔
102
    if (end >= 0) {
46,884,569!
103
      forwardRows = end;
46,890,981✔
104

105
      while (pData[end + pos] == ekey) {
49,311,080!
106
        forwardRows += 1;
2,420,099✔
107
        ++pos;
2,420,099✔
108
      }
109
    }
110
    //    int32_t end = searchFn((char*)pData, pos + 1, ekey, order);
111
    //    if (end >= 0) {
112
    //      forwardRows = pos - end;
113
    //
114
    //      if (pData[end] == ekey) {
115
    //        forwardRows += 1;
116
    //      }
117
    //    }
118
  }
119

120
  return forwardRows;
147,259,394✔
121
}
122

123
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
257,831,946✔
124
  int32_t midPos = -1;
257,831,946✔
125
  int32_t numOfRows;
126

127
  if (num <= 0) {
257,831,946!
128
    return -1;
×
129
  }
130

131
  TSKEY*  keyList = (TSKEY*)pValue;
257,831,946✔
132
  int32_t firstPos = 0;
257,831,946✔
133
  int32_t lastPos = num - 1;
257,831,946✔
134

135
  if (order == TSDB_ORDER_DESC) {
257,831,946✔
136
    // find the first position which is smaller than the key
137
    while (1) {
138
      if (key >= keyList[firstPos]) return firstPos;
511,813,193✔
139
      if (key == keyList[lastPos]) return lastPos;
485,821,378✔
140

141
      if (key < keyList[lastPos]) {
485,269,970✔
142
        lastPos += 1;
57,378,672✔
143
        if (lastPos >= num) {
57,378,672!
144
          return -1;
×
145
        } else {
146
          return lastPos;
57,378,672✔
147
        }
148
      }
149

150
      numOfRows = lastPos - firstPos + 1;
427,891,298✔
151
      midPos = (numOfRows >> 1) + firstPos;
427,891,298✔
152

153
      if (key < keyList[midPos]) {
427,891,298✔
154
        firstPos = midPos + 1;
98,969,043✔
155
      } else if (key > keyList[midPos]) {
328,922,255!
156
        lastPos = midPos - 1;
329,180,316✔
157
      } else {
158
        break;
×
159
      }
160
    }
161

162
  } else {
163
    // find the first position which is bigger than the key
164
    while (1) {
165
      if (key <= keyList[firstPos]) return firstPos;
1,126,434,482✔
166
      if (key == keyList[lastPos]) return lastPos;
1,073,930,586✔
167

168
      if (key > keyList[lastPos]) {
1,072,994,344✔
169
        lastPos = lastPos + 1;
119,136,501✔
170
        if (lastPos >= num)
119,136,501!
171
          return -1;
×
172
        else
173
          return lastPos;
119,136,501✔
174
      }
175

176
      numOfRows = lastPos - firstPos + 1;
953,857,843✔
177
      midPos = (numOfRows >> 1u) + firstPos;
953,857,843✔
178

179
      if (key < keyList[midPos]) {
953,857,843✔
180
        lastPos = midPos - 1;
731,554,782✔
181
      } else if (key > keyList[midPos]) {
222,303,061✔
182
        firstPos = midPos + 1;
220,711,588✔
183
      } else {
184
        break;
1,591,473✔
185
      }
186
    }
187
  }
188

189
  return midPos;
1,333,412✔
190
}
191

192
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
151,688,144✔
193
                                 __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order) {
194
  int32_t num = -1;
151,688,144✔
195
  int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
151,688,144✔
196

197
  if (order == TSDB_ORDER_ASC) {
151,688,144✔
198
    if (ekey < pDataBlockInfo->window.ekey && pPrimaryColumn) {
103,165,660!
199
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
100,400,490!
200
      if (item != NULL) {
100,374,825!
201
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
202
      }
203
    } else {
204
      num = pDataBlockInfo->rows - startPos;
2,765,170✔
205
      if (item != NULL) {
2,765,170!
206
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
207
      }
208
    }
209
  } else {  // desc
210
    if (ekey > pDataBlockInfo->window.skey && pPrimaryColumn) {
48,522,484!
211
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
46,932,664!
212
      if (item != NULL) {
46,884,569!
213
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
214
      }
215
    } else {
216
      num = pDataBlockInfo->rows - startPos;
1,589,820✔
217
      if (item != NULL) {
1,589,820!
218
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
219
      }
220
    }
221
  }
222

223
  return num;
151,614,384✔
224
}
225

226
void doTimeWindowInterpolation(SArray* pPrevValues, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs,
11,436,853✔
227
                               int32_t curRowIndex, TSKEY windowKey, int32_t type, SExprSupp* pSup) {
228
  SqlFunctionCtx* pCtx = pSup->pCtx;
11,436,853✔
229

230
  int32_t index = 1;
11,436,853✔
231
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
23,556,836✔
232
    if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) {
12,116,510✔
233
      pCtx[k].start.key = INT64_MIN;
678,704✔
234
      continue;
678,704✔
235
    }
236

237
    SFunctParam*     pParam = &pCtx[k].param[0];
11,440,045✔
238
    SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, pParam->pCol->slotId);
11,440,045✔
239

240
    double v1 = 0, v2 = 0, v = 0;
11,427,155✔
241
    if (prevRowIndex == -1) {
11,427,155!
242
      SGroupKeys* p = taosArrayGet(pPrevValues, index);
×
243
      GET_TYPED_DATA(v1, double, pColInfo->info.type, p->pData, typeGetTypeModFromColInfo(&pColInfo->info));
1!
244
    } else {
245
      GET_TYPED_DATA(v1, double, pColInfo->info.type, colDataGetData(pColInfo, prevRowIndex),
11,444,297!
246
                     typeGetTypeModFromColInfo(&pColInfo->info));
247
    }
248

249
    GET_TYPED_DATA(v2, double, pColInfo->info.type, colDataGetData(pColInfo, curRowIndex),
11,444,298!
250
                   typeGetTypeModFromColInfo(&pColInfo->info));
251

252
#if 0
253
    if (functionId == FUNCTION_INTERP) {
254
      if (type == RESULT_ROW_START_INTERP) {
255
        pCtx[k].start.key = prevTs;
256
        pCtx[k].start.val = v1;
257

258
        pCtx[k].end.key = curTs;
259
        pCtx[k].end.val = v2;
260

261
        if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_VARBINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR ||
262
            pColInfo->info.type == TSDB_DATA_TYPE_GEOMETRY) {
263
          if (prevRowIndex == -1) {
264
            //            pCtx[k].start.ptr = (char*)pRuntimeEnv->prevRow[index];
265
          } else {
266
            pCtx[k].start.ptr = (char*)pColInfo->pData + prevRowIndex * pColInfo->info.bytes;
267
          }
268

269
          pCtx[k].end.ptr = (char*)pColInfo->pData + curRowIndex * pColInfo->info.bytes;
270
        }
271
      }
272
    } else if (functionId == FUNCTION_TWA) {
273
#endif
274

275
    SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
11,444,298✔
276
    SPoint point2 = (SPoint){.key = curTs, .val = &v2};
11,444,298✔
277
    SPoint point = (SPoint){.key = windowKey, .val = &v};
11,444,298✔
278

279
    if (!fmIsElapsedFunc(pCtx[k].functionId)) {
11,444,298✔
280
      taosGetLinearInterpolationVal(&point, TSDB_DATA_TYPE_DOUBLE, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, 0);
6,761,000✔
281
    }
282

283
    if (type == RESULT_ROW_START_INTERP) {
11,441,279✔
284
      pCtx[k].start.key = point.key;
5,584,290✔
285
      pCtx[k].start.val = v;
5,584,290✔
286
    } else {
287
      pCtx[k].end.key = point.key;
5,856,989✔
288
      pCtx[k].end.val = v;
5,856,989✔
289
    }
290

291
    index += 1;
11,441,279✔
292
  }
293
#if 0
294
  }
295
#endif
296
}
11,440,326✔
297

298
static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t type) {
788,947✔
299
  if (type == RESULT_ROW_START_INTERP) {
788,947✔
300
    for (int32_t k = 0; k < numOfOutput; ++k) {
1,127,830✔
301
      pCtx[k].start.key = INT64_MIN;
597,110✔
302
    }
303
  } else {
304
    for (int32_t k = 0; k < numOfOutput; ++k) {
567,535✔
305
      pCtx[k].end.key = INT64_MIN;
309,308✔
306
    }
307
  }
308
}
788,947✔
309

310
static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, int32_t pos, SSDataBlock* pBlock,
6,085,545✔
311
                                              const TSKEY* tsCols, STimeWindow* win, SExprSupp* pSup) {
312
  bool ascQuery = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
6,085,545✔
313

314
  TSKEY curTs = tsCols[pos];
6,085,545✔
315

316
  SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
6,085,545✔
317
  TSKEY       lastTs = *(int64_t*)pTsKey->pData;
6,084,657✔
318

319
  // lastTs == INT64_MIN and pos == 0 means this is the first time window, interpolation is not needed.
320
  // start exactly from this point, no need to do interpolation
321
  TSKEY key = ascQuery ? win->skey : win->ekey;
6,084,657!
322
  if (key == curTs) {
6,084,657✔
323
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
340,451✔
324
    return true;
340,462✔
325
  }
326

327
  // it is the first time window, no need to do interpolation
328
  if (pTsKey->isNull && pos == 0) {
5,744,206!
329
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
165,092✔
330
  } else {
331
    TSKEY prevTs = ((pos == 0) ? lastTs : tsCols[pos - 1]);
5,579,114!
332
    doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, pos - 1, curTs, pos, key,
5,579,114✔
333
                              RESULT_ROW_START_INTERP, pSup);
334
  }
335

336
  return true;
5,749,198✔
337
}
338

339
static int32_t setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex,
6,115,736✔
340
                                               int32_t nextRowIndex, SArray* pDataBlock, const TSKEY* tsCols,
341
                                               TSKEY blockEkey, STimeWindow* win, bool* pRes) {
342
  int32_t code = TSDB_CODE_SUCCESS;
6,115,736✔
343
  int32_t lino = 0;
6,115,736✔
344
  int32_t order = pInfo->binfo.inputTsOrder;
6,115,736✔
345

346
  TSKEY actualEndKey = tsCols[endRowIndex];
6,115,736✔
347
  TSKEY key = (order == TSDB_ORDER_ASC) ? win->ekey : win->skey;
6,115,736!
348

349
  // not ended in current data block, do not invoke interpolation
350
  if ((key > blockEkey && (order == TSDB_ORDER_ASC)) || (key < blockEkey && (order == TSDB_ORDER_DESC))) {
6,115,736!
351
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
192,475✔
352
    (*pRes) = false;
193,003✔
353
    return code;
193,003✔
354
  }
355

356
  // there is actual end point of current time window, no interpolation needs
357
  if (key == actualEndKey) {
5,923,261✔
358
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
65,238✔
359
    (*pRes) = true;
65,238✔
360
    return code;
65,238✔
361
  }
362

363
  if (nextRowIndex < 0) {
5,858,023!
364
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
365
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
366
  }
367

368
  TSKEY nextKey = tsCols[nextRowIndex];
5,858,023✔
369
  doTimeWindowInterpolation(pInfo->pPrevValues, pDataBlock, actualEndKey, endRowIndex, nextKey, nextRowIndex, key,
5,858,023✔
370
                            RESULT_ROW_END_INTERP, pSup);
371
  (*pRes) = true;
5,856,924✔
372
  return code;
5,856,924✔
373
}
374

375
bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd, EStreamType blockType) {
150,134,362✔
376
  if (pInterval->interval != pInterval->sliding &&
150,134,362✔
377
      ((pWin->ekey < calStart || pWin->skey > calEnd) || (blockType == STREAM_PULL_DATA && pWin->skey < calStart))) {
113,677,605!
378
    return false;
×
379
  }
380

381
  return true;
150,134,362✔
382
}
383

384
bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo) {
150,137,608✔
385
  return inCalSlidingWindow(pInterval, pWin, pBlockInfo->calWin.skey, pBlockInfo->calWin.ekey, pBlockInfo->type);
150,137,608✔
386
}
387

388
int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
152,002,557✔
389
                               TSKEY* primaryKeys, int32_t prevPosition, int32_t order) {
390
  bool ascQuery = (order == TSDB_ORDER_ASC);
152,002,557✔
391

392
  int32_t precision = pInterval->precision;
152,002,557✔
393
  getNextTimeWindow(pInterval, pNext, order);
152,002,557✔
394

395
  // next time window is not in current block
396
  if ((pNext->skey > pDataBlockInfo->window.ekey && order == TSDB_ORDER_ASC) ||
151,777,805!
397
      (pNext->ekey < pDataBlockInfo->window.skey && order == TSDB_ORDER_DESC)) {
150,700,988✔
398
    return -1;
1,632,455✔
399
  }
400

401
  if (!inSlidingWindow(pInterval, pNext, pDataBlockInfo) && order == TSDB_ORDER_ASC) {
150,145,350!
402
    return -1;
×
403
  }
404

405
  TSKEY   skey = ascQuery ? pNext->skey : pNext->ekey;
150,134,852✔
406
  int32_t startPos = 0;
150,134,852✔
407

408
  // tumbling time window query, a special case of sliding time window query
409
  if (pInterval->sliding == pInterval->interval && prevPosition != -1) {
150,134,852!
410
    startPos = prevPosition + 1;
36,518,136✔
411
  } else {
412
    if ((skey <= pDataBlockInfo->window.skey && ascQuery) || (skey >= pDataBlockInfo->window.ekey && !ascQuery)) {
113,616,716✔
413
      startPos = 0;
2,895,053✔
414
    } else {
415
      startPos = binarySearchForKey((char*)primaryKeys, pDataBlockInfo->rows, skey, order);
110,721,663✔
416
    }
417
  }
418

419
  /* interp query with fill should not skip time window */
420
  //  if (pQueryAttr->pointInterpQuery && pQueryAttr->fillType != TSDB_FILL_NONE) {
421
  //    return startPos;
422
  //  }
423

424
  /*
425
   * This time window does not cover any data, try next time window,
426
   * this case may happen when the time window is too small
427
   */
428
  if (primaryKeys != NULL) {
150,099,751✔
429
    if (ascQuery && primaryKeys[startPos] > pNext->ekey) {
181,958,560✔
430
      TSKEY next = primaryKeys[startPos];
31,863,665✔
431
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
31,863,665✔
432
        pNext->skey = taosTimeTruncate(next, pInterval);
919✔
433
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
4✔
434
      } else {
435
        pNext->ekey += ((next - pNext->ekey + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
31,862,746✔
436
        pNext->skey = pNext->ekey - pInterval->interval + 1;
31,862,746✔
437
      }
438
    } else if ((!ascQuery) && primaryKeys[startPos] < pNext->skey) {
118,229,662✔
439
      TSKEY next = primaryKeys[startPos];
13,602,306✔
440
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
13,602,306!
441
        pNext->skey = taosTimeTruncate(next, pInterval);
×
442
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
×
443
      } else {
444
        pNext->skey -= ((pNext->skey - next + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
13,602,601✔
445
        pNext->ekey = pNext->skey + pInterval->interval - 1;
13,602,601✔
446
      }
447
    }
448
  }
449

450
  return startPos;
150,101,614✔
451
}
452

453
static bool isResultRowInterpolated(SResultRow* pResult, SResultTsInterpType type) {
18,329,616✔
454
  if (type == RESULT_ROW_START_INTERP) {
18,329,616✔
455
    return pResult->startInterp == true;
6,110,778✔
456
  } else {
457
    return pResult->endInterp == true;
12,218,838✔
458
  }
459
}
460

461
static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
12,010,285✔
462
  if (type == RESULT_ROW_START_INTERP) {
12,010,285✔
463
    pResult->startInterp = true;
6,090,005✔
464
  } else {
465
    pResult->endInterp = true;
5,920,280✔
466
  }
467
}
12,010,285✔
468

469
static int32_t doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataBlock* pBlock, SResultRow* pResult,
150,168,592✔
470
                                           STimeWindow* win, int32_t startPos, int32_t forwardRows, SExprSupp* pSup) {
471
  int32_t code = TSDB_CODE_SUCCESS;
150,168,592✔
472
  int32_t lino = 0;
150,168,592✔
473
  if (!pInfo->timeWindowInterpo) {
150,168,592✔
474
    return code;
144,092,528✔
475
  }
476

477
  if (pBlock == NULL) {
6,076,064!
478
    code = TSDB_CODE_INVALID_PARA;
×
479
    return code;
×
480
  }
481

482
  if (pBlock->pDataBlock == NULL) {
6,076,064!
483
    return code;
×
484
  }
485

486
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
6,076,064✔
487

488
  TSKEY* tsCols = (TSKEY*)(pColInfo->pData);
6,110,792✔
489
  bool   done = isResultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
6,110,792✔
490
  if (!done) {  // it is not interpolated, now start to generated the interpolated value
6,110,769✔
491
    bool interp = setTimeWindowInterpolationStartTs(pInfo, startPos, pBlock, tsCols, win, pSup);
6,085,573✔
492
    if (interp) {
6,089,924!
493
      setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
6,089,931✔
494
    }
495
  } else {
496
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
25,196✔
497
  }
498

499
  // point interpolation does not require the end key time window interpolation.
500
  // interpolation query does not generate the time window end interpolation
501
  done = isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
6,115,176✔
502
  if (!done) {
6,115,193!
503
    int32_t endRowIndex = startPos + forwardRows - 1;
6,115,250✔
504
    int32_t nextRowIndex = endRowIndex + 1;
6,115,250✔
505

506
    // duplicated ts row does not involve in the interpolation of end value for current time window
507
    int32_t x = endRowIndex;
6,115,250✔
508
    while (x > 0) {
6,115,303✔
509
      if (tsCols[x] == tsCols[x - 1]) {
6,055,712✔
510
        x -= 1;
53✔
511
      } else {
512
        endRowIndex = x;
6,055,659✔
513
        break;
6,055,659✔
514
      }
515
    }
516

517
    TSKEY endKey = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey;
6,115,250!
518
    bool  interp = false;
6,115,250✔
519
    code = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, nextRowIndex, pBlock->pDataBlock, tsCols, endKey,
6,115,250✔
520
                                           win, &interp);
521
    QUERY_CHECK_CODE(code, lino, _end);
6,115,565!
522
    if (interp) {
6,115,565✔
523
      setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
5,922,634✔
524
    }
525
  } else {
526
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
×
527
  }
528

529
_end:
6,115,501✔
530
  if (code != TSDB_CODE_SUCCESS) {
6,115,501!
531
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
532
  }
533
  return code;
6,115,468✔
534
}
535

536
static void saveDataBlockLastRow(SArray* pPrevKeys, const SSDataBlock* pBlock, SArray* pCols) {
69,436✔
537
  if (pBlock->pDataBlock == NULL) {
69,436!
538
    return;
×
539
  }
540

541
  size_t num = taosArrayGetSize(pPrevKeys);
69,436✔
542
  for (int32_t k = 0; k < num; ++k) {
208,305✔
543
    SColumn* pc = taosArrayGet(pCols, k);
138,870✔
544

545
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pc->slotId);
138,869✔
546

547
    SGroupKeys* pkey = taosArrayGet(pPrevKeys, k);
138,867✔
548
    for (int32_t i = pBlock->info.rows - 1; i >= 0; --i) {
138,869!
549
      if (colDataIsNull_s(pColInfo, i)) {
277,738!
550
        continue;
×
551
      }
552

553
      char* val = colDataGetData(pColInfo, i);
138,869!
554
      if (IS_VAR_DATA_TYPE(pkey->type)) {
138,869!
555
        if (IS_STR_DATA_BLOB(pkey->type)) {
×
556
          memcpy(pkey->pData, val, blobDataTLen(val));
×
557
        } else {
558
          memcpy(pkey->pData, val, varDataTLen(val));
×
559
        }
560
      } else {
561
        memcpy(pkey->pData, val, pkey->bytes);
138,869✔
562
      }
563

564
      break;
138,869✔
565
    }
566
  }
567
}
568

569
static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t numOfExprs, SResultRowInfo* pResultRowInfo,
69,436✔
570
                                       SSDataBlock* pBlock, int32_t scanFlag, int64_t* tsCols, SResultRowPosition* p) {
571
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
69,436✔
572

573
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
69,436✔
574
  SExprSupp*                pSup = &pOperatorInfo->exprSupp;
69,436✔
575

576
  int32_t startPos = 0;
69,436✔
577
  int32_t numOfOutput = pSup->numOfExprs;
69,436✔
578

579
  SResultRow* pResult = NULL;
69,436✔
580

581
  while (1) {
1✔
582
    SListNode*          pn = tdListGetHead(pResultRowInfo->openWindow);
69,437✔
583
    SOpenWindowInfo*    pOpenWin = (SOpenWindowInfo*)pn->data;
69,437✔
584
    uint64_t            groupId = pOpenWin->groupId;
69,437✔
585
    SResultRowPosition* p1 = &pOpenWin->pos;
69,437✔
586
    if (p->pageId == p1->pageId && p->offset == p1->offset) {
69,437!
587
      break;
69,436✔
588
    }
589

590
    SResultRow* pr = getResultRowByPos(pInfo->aggSup.pResultBuf, p1, false);
1✔
591
    if (NULL == pr) {
1!
592
      T_LONG_JMP(pTaskInfo->env, terrno);
×
593
    }
594

595
    if (!(pr->offset == p1->offset && pr->pageId == p1->pageId)) {
1!
596
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
597
      T_LONG_JMP(pTaskInfo->env, terrno);
×
598
    }
599

600
    if (pr->closed) {
1!
601
      if (!(isResultRowInterpolated(pr, RESULT_ROW_START_INTERP) &&
×
602
            isResultRowInterpolated(pr, RESULT_ROW_END_INTERP))) {
×
603
        pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
604
        T_LONG_JMP(pTaskInfo->env, terrno);
×
605
      }
606
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
×
607
      taosMemoryFree(pNode);
×
608
      continue;
×
609
    }
610

611
    STimeWindow w = pr->win;
1✔
612
    int32_t     ret = setTimeWindowOutputBuf(pResultRowInfo, &w, (scanFlag == MAIN_SCAN), &pResult, groupId, pSup->pCtx,
1✔
613
                                             numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
614
    if (ret != TSDB_CODE_SUCCESS) {
1!
615
      T_LONG_JMP(pTaskInfo->env, ret);
×
616
    }
617

618
    if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
1!
619
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
620
      T_LONG_JMP(pTaskInfo->env, terrno);
×
621
    }
622

623
    SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
1✔
624
    if (!pTsKey) {
1!
625
      pTaskInfo->code = terrno;
×
626
      T_LONG_JMP(pTaskInfo->env, terrno);
×
627
    }
628

629
    int64_t prevTs = *(int64_t*)pTsKey->pData;
1✔
630
    if (groupId == pBlock->info.id.groupId) {
1!
631
      TSKEY curTs = pBlock->info.window.skey;
1✔
632
      if (tsCols != NULL) {
1!
633
        curTs = tsCols[startPos];
1✔
634
      }
635
      doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, curTs, startPos, w.ekey,
1✔
636
                                RESULT_ROW_END_INTERP, pSup);
637
    }
638

639
    setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
1✔
640
    setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
1✔
641

642
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, 1);
1✔
643
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0,
1✔
644
                                          pBlock->info.rows, numOfExprs);
1✔
645
    if (ret != TSDB_CODE_SUCCESS) {
1!
646
      T_LONG_JMP(pTaskInfo->env, ret);
×
647
    }
648

649
    if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
1!
650
      closeResultRow(pr);
1✔
651
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
1✔
652
      taosMemoryFree(pNode);
1!
653
    } else {  // the remains are can not be closed yet.
654
      break;
×
655
    }
656
  }
657
}
69,436✔
658

659
static bool tsKeyCompFn(void* l, void* r, void* param) {
2,100,769✔
660
  TSKEY*                    lTS = (TSKEY*)l;
2,100,769✔
661
  TSKEY*                    rTS = (TSKEY*)r;
2,100,769✔
662
  SIntervalAggOperatorInfo* pInfo = param;
2,100,769✔
663
  return pInfo->binfo.outputTsOrder == ORDER_ASC ? *lTS < *rTS : *lTS > *rTS;
2,100,769!
664
}
665

666
static bool isCalculatedWin(SIntervalAggOperatorInfo* pInfo, const STimeWindow* win, uint64_t tableGroupId) {
268,922✔
667
  char keyBuf[sizeof(TSKEY) + sizeof(uint64_t)] = {0};
268,922✔
668
  SET_RES_WINDOW_KEY(keyBuf, (char*)&win->skey, sizeof(TSKEY), tableGroupId);
268,922✔
669
  return tSimpleHashGet(pInfo->aggSup.pResultRowHashTable, keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))) != NULL;
268,922✔
670
}
671

672
/**
673
 * @brief check if cur window should be filtered out by limit info
674
 * @retval true if should be filtered out
675
 * @retval false if not filtering out
676
 * @note If no limit info, we skip filtering.
677
 *       If input/output ts order mismatch, we skip filtering too.
678
 *       eg. input ts order: desc, and output ts order: asc, limit: 10
679
 *       IntervalOperator should output the first 10 windows, however, we can't find the first 10 windows until we scan
680
 *       every tuple in every block.
681
 *       And the boundedQueue keeps refreshing all records with smaller ts key.
682
 */
683
static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STimeWindow* win, uint64_t groupId,
151,747,912✔
684
                                  SExecTaskInfo* pTaskInfo) {
685
  int32_t code = TSDB_CODE_SUCCESS;
151,747,912✔
686
  int32_t lino = 0;
151,747,912✔
687
  if (!pOperatorInfo->limited  // if no limit info, no filter will be applied
151,747,912✔
688
      || pOperatorInfo->binfo.inputTsOrder != pOperatorInfo->binfo.outputTsOrder
1,682,352✔
689
      // if input/output ts order mismatch, no filter
690
  ) {
691
    return false;
151,452,619✔
692
  }
693

694
  if (pOperatorInfo->limit == 0) return true;
295,293✔
695

696
  if (pOperatorInfo->pBQ == NULL) {
295,289✔
697
    pOperatorInfo->pBQ = createBoundedQueue(pOperatorInfo->limit - 1, tsKeyCompFn, taosAutoMemoryFree, pOperatorInfo);
10,124✔
698
    QUERY_CHECK_NULL(pOperatorInfo->pBQ, code, lino, _end, terrno);
10,124!
699
  }
700

701
  bool shouldFilter = false;
295,289✔
702
  // if BQ has been full, compare it with top of BQ
703
  if (taosBQSize(pOperatorInfo->pBQ) == taosBQMaxSize(pOperatorInfo->pBQ) + 1) {
295,289✔
704
    PriorityQueueNode* top = taosBQTop(pOperatorInfo->pBQ);
46,597✔
705
    shouldFilter = tsKeyCompFn(top->data, &win->skey, pOperatorInfo);
46,597✔
706
  }
707
  if (shouldFilter) {
295,282✔
708
    return true;
26,359✔
709
  } else if (isCalculatedWin(pOperatorInfo, win, groupId)) {
268,923✔
710
    return false;
42,461✔
711
  }
712

713
  // cur win not been filtered out and not been pushed into BQ yet, push it into BQ
714
  PriorityQueueNode node = {.data = taosMemoryMalloc(sizeof(TSKEY))};
226,554!
715
  QUERY_CHECK_NULL(node.data, code, lino, _end, terrno);
226,622!
716

717
  *((TSKEY*)node.data) = win->skey;
226,622✔
718

719
  if (NULL == taosBQPush(pOperatorInfo->pBQ, &node)) {
226,622!
720
    taosMemoryFree(node.data);
×
721
    return true;
×
722
  }
723

724
_end:
226,449✔
725
  if (code != TSDB_CODE_SUCCESS) {
226,449!
726
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
727
    pTaskInfo->code = code;
×
728
    T_LONG_JMP(pTaskInfo->env, code);
×
729
  }
730
  return false;
226,449✔
731
}
732

733
static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
1,658,749✔
734
                            int32_t scanFlag) {
735
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
1,658,749✔
736

737
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
1,658,749✔
738
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
1,658,749✔
739

740
  int32_t     startPos = 0;
1,658,749✔
741
  int32_t     numOfOutput = pSup->numOfExprs;
1,658,749✔
742
  int64_t*    tsCols = extractTsCol(pBlock, pInfo, pTaskInfo);
1,658,749✔
743
  uint64_t    tableGroupId = pBlock->info.id.groupId;
1,658,663✔
744
  bool        ascScan = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
1,658,663✔
745
  TSKEY       ts = getStartTsKey(&pBlock->info.window, tsCols);
1,658,663✔
746
  SResultRow* pResult = NULL;
1,658,654✔
747

748
  if (tableGroupId != pInfo->curGroupId) {
1,658,654✔
749
    pInfo->handledGroupNum += 1;
23,066✔
750
    if (pInfo->slimited && pInfo->handledGroupNum > pInfo->slimit) {
23,066✔
751
      return true;
44✔
752
    } else {
753
      pInfo->curGroupId = tableGroupId;
23,022✔
754
      destroyBoundedQueue(pInfo->pBQ);
23,022✔
755
      pInfo->pBQ = NULL;
23,020✔
756
    }
757
  }
758

759
  STimeWindow win =
760
      getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->binfo.inputTsOrder);
1,658,608✔
761
  if (filterWindowWithLimit(pInfo, &win, tableGroupId, pTaskInfo)) return false;
1,658,692✔
762

763
  int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
1,637,613✔
764
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
765
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
1,637,643!
766
    T_LONG_JMP(pTaskInfo->env, ret);
5!
767
  }
768

769
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
1,637,638✔
770
  int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
1,637,638✔
771
                                                 pInfo->binfo.inputTsOrder);
772

773
  // prev time window not interpolation yet.
774
  if (pInfo->timeWindowInterpo) {
1,637,618✔
775
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
69,436✔
776
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
69,436✔
777

778
    // restore current time window
779
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
69,436✔
780
                                 numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
781
    if (ret != TSDB_CODE_SUCCESS) {
69,435!
782
      T_LONG_JMP(pTaskInfo->env, ret);
×
783
    }
784

785
    // window start key interpolation
786
    ret = doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup);
69,435✔
787
    if (ret != TSDB_CODE_SUCCESS) {
69,436!
788
      T_LONG_JMP(pTaskInfo->env, ret);
×
789
    }
790
  }
791
  // qDebug("hashIntervalAgg1 window skey: %lld, ekey:%lld, startPos: %d, forwardRows: %d",
792
  //   win.skey, win.ekey, startPos, forwardRows);
793
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, 1);
1,637,618✔
794
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
1,637,607✔
795
                                        pBlock->info.rows, numOfOutput);
1,637,607✔
796
  if (ret != TSDB_CODE_SUCCESS) {
1,637,744!
797
    T_LONG_JMP(pTaskInfo->env, ret);
×
798
  }
799

800
  doCloseWindow(pResultRowInfo, pInfo, pResult);
1,637,744✔
801

802
  STimeWindow nextWin = win;
1,637,738✔
803
  while (1) {
150,377,058✔
804
    int32_t prevEndPos = forwardRows - 1 + startPos;
152,014,796✔
805
    startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos,
152,014,796✔
806
                                      pInfo->binfo.inputTsOrder);
807
    if (startPos < 0 || filterWindowWithLimit(pInfo, &nextWin, tableGroupId, pTaskInfo)) {
151,703,909✔
808
      break;
809
    }
810
    // null data, failed to allocate more memory buffer
811
    int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
150,087,837✔
812
                                          pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
813
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
149,994,260!
814
      T_LONG_JMP(pTaskInfo->env, code);
×
815
    }
816

817
    // qDebug("hashIntervalAgg2 window skey: %lld, ekey:%lld, startPos: %d, forwardRows: %d",
818
      // nextWin.skey, nextWin.ekey, startPos, forwardRows);
819

820
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
150,014,316✔
821
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
150,014,316✔
822
                                           pInfo->binfo.inputTsOrder);
823
    // window start(end) key interpolation
824
    code = doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
150,014,475✔
825
    if (code != TSDB_CODE_SUCCESS) {
150,114,167!
826
      T_LONG_JMP(pTaskInfo->env, code);
×
827
    }
828
    // TODO: add to open window? how to close the open windows after input blocks exhausted?
829
#if 0
830
    if ((ascScan && ekey <= pBlock->info.window.ekey) ||
831
        (!ascScan && ekey >= pBlock->info.window.skey)) {
832
      // window start(end) key interpolation
833
      doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
834
    } else if (pInfo->timeWindowInterpo) {
835
      addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
836
    }
837
#endif
838
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, 1);
150,114,167✔
839
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
150,115,971✔
840
                                          pBlock->info.rows, numOfOutput);
150,115,971✔
841
    if (ret != TSDB_CODE_SUCCESS) {
150,404,731!
842
      T_LONG_JMP(pTaskInfo->env, ret);
×
843
    }
844
    doCloseWindow(pResultRowInfo, pInfo, pResult);
150,404,731✔
845
  }
846

847
  if (pInfo->timeWindowInterpo) {
1,616,592✔
848
    saveDataBlockLastRow(pInfo->pPrevValues, pBlock, pInfo->pInterpCols);
69,436✔
849
  }
850
  return false;
1,637,763✔
851
}
852

853
void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult) {
152,019,885✔
854
  // current result is done in computing final results.
855
  if (pInfo->timeWindowInterpo && isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
152,019,885✔
856
    closeResultRow(pResult);
5,922,795✔
857
    SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
5,922,605✔
858
    taosMemoryFree(pNode);
5,922,518!
859
  }
860
}
152,019,273✔
861

862
SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId,
69,436✔
863
                                       SExecTaskInfo* pTaskInfo) {
864
  int32_t         code = TSDB_CODE_SUCCESS;
69,436✔
865
  int32_t         lino = 0;
69,436✔
866
  SOpenWindowInfo openWin = {0};
69,436✔
867
  openWin.pos.pageId = pResult->pageId;
69,436✔
868
  openWin.pos.offset = pResult->offset;
69,436✔
869
  openWin.groupId = groupId;
69,436✔
870
  SListNode* pn = tdListGetTail(pResultRowInfo->openWindow);
69,436✔
871
  if (pn == NULL) {
69,436✔
872
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
64,669✔
873
    QUERY_CHECK_CODE(code, lino, _end);
64,669!
874
    return openWin.pos;
64,669✔
875
  }
876

877
  SOpenWindowInfo* px = (SOpenWindowInfo*)pn->data;
4,767✔
878
  if (px->pos.pageId != openWin.pos.pageId || px->pos.offset != openWin.pos.offset || px->groupId != openWin.groupId) {
4,767!
879
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
1✔
880
    QUERY_CHECK_CODE(code, lino, _end);
1!
881
  }
882

883
_end:
4,767✔
884
  if (code != TSDB_CODE_SUCCESS) {
4,767!
885
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
886
    pTaskInfo->code = code;
×
887
    T_LONG_JMP(pTaskInfo->env, code);
×
888
  }
889
  return openWin.pos;
4,767✔
890
}
891

892
int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo) {
2,047,739✔
893
  TSKEY* tsCols = NULL;
2,047,739✔
894

895
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
2,047,739!
896
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
2,047,745✔
897
    if (!pColDataInfo) {
2,047,547!
898
      pTaskInfo->code = terrno;
×
899
      T_LONG_JMP(pTaskInfo->env, terrno);
×
900
    }
901

902
    tsCols = (int64_t*)pColDataInfo->pData;
2,047,547✔
903
    if (tsCols[0] == 0) {
2,047,547✔
904
      qWarn("%s at line %d.block start ts:%" PRId64 ",end ts:%" PRId64, __func__, __LINE__, tsCols[0],
2!
905
            tsCols[pBlock->info.rows - 1]);
906
    }
907

908
    if (tsCols[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
2,047,587!
909
      int32_t code = blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
822,437✔
910
      if (code != TSDB_CODE_SUCCESS) {
822,442!
911
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
912
        pTaskInfo->code = code;
×
913
        T_LONG_JMP(pTaskInfo->env, code);
×
914
      }
915
    }
916
  }
917

918
  return tsCols;
2,047,586✔
919
}
920

921
static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
1,732,386✔
922
  if (OPTR_IS_OPENED(pOperator)) {
1,732,386✔
923
    return TSDB_CODE_SUCCESS;
308,643✔
924
  }
925

926
  int32_t        code = TSDB_CODE_SUCCESS;
1,423,743✔
927
  int32_t        lino = 0;
1,423,743✔
928
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1,423,743✔
929
  SOperatorInfo* downstream = pOperator->pDownstream[0];
1,423,743✔
930

931
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
1,423,743✔
932
  SExprSupp*                pSup = &pOperator->exprSupp;
1,423,743✔
933

934
  int32_t scanFlag = MAIN_SCAN;
1,423,743✔
935
  int64_t st = taosGetTimestampUs();
1,426,940✔
936

937
  pInfo->cleanGroupResInfo = false;
1,426,940✔
938
  while (1) {
1,658,817✔
939
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
3,085,757✔
940
    if (pBlock == NULL) {
3,087,650✔
941
      break;
1,429,115✔
942
    }
943

944
    pInfo->binfo.pRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
1,658,535✔
945

946
    if (pInfo->scalarSupp.pExprInfo != NULL) {
1,658,535✔
947
      SExprSupp* pExprSup = &pInfo->scalarSupp;
20,856✔
948
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
20,856✔
949
                                   GET_STM_RTINFO(pOperator->pTaskInfo));
20,856!
950
      QUERY_CHECK_CODE(code, lino, _end);
20,853!
951
    }
952

953
    // the pDataBlock are always the same one, no need to call this again
954
    code = setInputDataBlock(pSup, pBlock, pInfo->binfo.inputTsOrder, scanFlag, true);
1,658,532✔
955
    QUERY_CHECK_CODE(code, lino, _end);
1,658,750!
956
    if (hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag)) break;
1,658,750✔
957
  }
958

959
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->binfo.outputTsOrder);
1,429,159✔
960
  QUERY_CHECK_CODE(code, lino, _end);
1,429,180!
961
  pInfo->cleanGroupResInfo = true;
1,429,180✔
962

963
  OPTR_SET_OPENED(pOperator);
1,429,180✔
964

965
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
1,429,384✔
966

967
_end:
1,429,384✔
968
  if (code != TSDB_CODE_SUCCESS) {
1,429,384!
969
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
970
    pTaskInfo->code = code;
×
971
    T_LONG_JMP(pTaskInfo->env, code);
×
972
  }
973
  return code;
1,429,384✔
974
}
975

976
// start a new state window and record the start info
977
void doKeepNewStateWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList,
20,971,101✔
978
  int32_t rowIndex, uint64_t groupId, const EStateWinExtendOption* extendOption, bool hasPrevWin) {
979
  pRowSup->groupId = groupId;
20,971,101✔
980
  if (*extendOption == STATE_WIN_EXTEND_OPTION_DEFAULT ||
20,971,101✔
981
      *extendOption == STATE_WIN_EXTEND_OPTION_BACKWARD) {
92✔
982
    pRowSup->win.skey = hasPrevWin ? tsList[rowIndex] : tsList[0];
20,971,057✔
983
    pRowSup->startRowIndex = hasPrevWin ? rowIndex : 0;
20,971,057✔
984
    pRowSup->numOfRows = !hasPrevWin && hasContinuousNullRows(pRowSup) ?
21,031,260✔
985
      pRowSup->numNullRows : 0;
21,031,260✔
986
  } else {
987
    pRowSup->win.skey = hasPrevWin ? pRowSup->win.ekey + 1 : tsList[0];
44✔
988
    pRowSup->startRowIndex = hasContinuousNullRows(pRowSup) ?
44✔
989
      rowIndex - pRowSup->numNullRows : rowIndex;
44✔
990
    pRowSup->numOfRows = rowIndex - pRowSup->startRowIndex;
44✔
991
  }
992
  resetNumNullRows(pRowSup);
20,971,101✔
993
}
20,970,769✔
994

995
// close a state window and record its end info
996
// this functions is called when a new state row appears
997
// @param rowIndex the index of the first row of next window
998
void doKeepCurStateWindowEndInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, 
20,911,357✔
999
  int32_t rowIndex, const EStateWinExtendOption* extendOption) {
1000
  if (*extendOption == STATE_WIN_EXTEND_OPTION_BACKWARD) {
20,911,357✔
1001
      pRowSup->win.ekey = tsList[rowIndex] - 1;
33✔
1002
      // continuous rows having null state col should be included in this window
1003
      pRowSup->numOfRows += hasContinuousNullRows(pRowSup) ?
33✔
1004
        pRowSup->numNullRows : 0;
33✔
1005
      resetNumNullRows(pRowSup);
33✔
1006
  }
1007
}
20,911,357✔
1008

1009
void doKeepStateWindowNullInfo(SWindowRowsSup* pRowSup, int32_t nullRowIndex) {
599,849✔
1010
  pRowSup->numNullRows += 1;
599,849✔
1011
}
599,849✔
1012

1013
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock) {
80,151✔
1014
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
80,151✔
1015
  SExprSupp*     pSup = &pOperator->exprSupp;
80,151✔
1016

1017
  SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->stateCol.slotId);
80,151✔
1018
  if (!pStateColInfoData) {
80,151!
1019
    pTaskInfo->code = terrno;
×
1020
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1021
  }
1022
  int64_t gid = pBlock->info.id.groupId;
80,151✔
1023

1024
  bool    hasResult = false;
80,151✔
1025
  bool    masterScan = true;
80,151✔
1026
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
80,151✔
1027
  int32_t bytes = pStateColInfoData->info.bytes;
80,151✔
1028

1029
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
80,151✔
1030
  if (!pColInfoData) {
80,151!
1031
    pTaskInfo->code = terrno;
×
1032
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1033
  }
1034
  TSKEY* tsList = (TSKEY*)pColInfoData->pData;
80,151✔
1035

1036
  SWindowRowsSup* pRowSup = &pInfo->winSup;
80,151✔
1037
  pRowSup->numOfRows = 0;
80,151✔
1038
  pRowSup->startRowIndex = 0;
80,151✔
1039
  resetNumNullRows(pRowSup);
80,151✔
1040

1041
  struct SColumnDataAgg* pAgg = NULL;
80,151✔
1042
  EStateWinExtendOption extendOption = pInfo->extendOption;
80,151✔
1043
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
24,449,745✔
1044
    pAgg = (pBlock->pBlockAgg != NULL) ? &pBlock->pBlockAgg[pInfo->stateCol.slotId] : NULL;
24,369,536!
1045
    if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pAgg)) {
48,739,072✔
1046
      doKeepStateWindowNullInfo(pRowSup, j);
599,849✔
1047
      continue;
599,849✔
1048
    }
1049
    hasResult = true;
23,769,687✔
1050
    if (pStateColInfoData->pData == NULL) {
23,769,687!
1051
      qError("%s:%d state column data is null", __FILE__, __LINE__);
×
1052
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1053
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1054
    }
1055

1056
    char* val = colDataGetData(pStateColInfoData, j);
23,769,687!
1057

1058
    if (gid != pRowSup->groupId || !pInfo->hasKey) {
23,769,687✔
1059
      // todo extract method
1060
      if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) {
59,205!
1061
        if (IS_STR_DATA_BLOB(pInfo->stateKey.type)) {
×
1062
          blobDataCopy(pInfo->stateKey.pData, val);
×
1063
        } else {
1064
          varDataCopy(pInfo->stateKey.pData, val);
61✔
1065
        }
1066
      } else {
1067
        memcpy(pInfo->stateKey.pData, val, bytes);
60,156✔
1068
      }
1069

1070
      pInfo->hasKey = true;
59,205✔
1071

1072
      doKeepNewStateWindowStartInfo(pRowSup, tsList, j, gid, &extendOption, false);
59,205✔
1073
      doKeepTuple(pRowSup, tsList[j], j, gid);
60,217✔
1074
    } else if (compareVal(val, &pInfo->stateKey)) {
23,710,482✔
1075
      doKeepTuple(pRowSup, tsList[j], j, gid);
2,799,836✔
1076
    } else {  // a new state window started
1077
      SResultRow* pResult = NULL;
20,910,911✔
1078
      doKeepCurStateWindowEndInfo(pRowSup, tsList, j, &extendOption);
20,910,911✔
1079

1080
      // keep the time window for the closed time window.
1081
      STimeWindow window = pRowSup->win;
20,911,212✔
1082

1083
      int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
20,911,212✔
1084
                                           numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1085
      if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
20,910,462!
1086
        T_LONG_JMP(pTaskInfo->env, ret);
×
1087
      }
1088

1089
      updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
20,910,462✔
1090
      ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData,
20,910,071✔
1091
                                            pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
20,910,071✔
1092
      if (ret != TSDB_CODE_SUCCESS) {
20,911,108!
1093
        T_LONG_JMP(pTaskInfo->env, ret);
×
1094
      }
1095

1096
      doKeepNewStateWindowStartInfo(pRowSup, tsList, j, gid, &extendOption, true);
20,911,108✔
1097
      doKeepTuple(pRowSup, tsList[j], j, gid);
20,910,429✔
1098

1099
      // todo extract method
1100
      if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) {
20,910,326!
1101
        if (IS_STR_DATA_BLOB(pInfo->stateKey.type)) {
97!
1102
          blobDataCopy(pInfo->stateKey.pData, val);
×
1103
        } else {
1104
          varDataCopy(pInfo->stateKey.pData, val);
97✔
1105
        }
1106
      } else {
1107
        memcpy(pInfo->stateKey.pData, val, bytes);
20,910,229✔
1108
      }
1109
    }
1110
  }
1111

1112
  if (!hasResult) {
80,209✔
1113
    return;
1,118✔
1114
  }
1115
  SResultRow* pResult = NULL;
79,091✔
1116
  // if window hasn't been closed, set end key to ts of last element
1117
  pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
79,091✔
1118
  if (hasContinuousNullRows(pRowSup)) {
79,091✔
1119
    // and all left rows should be included in the last window
1120
    pRowSup->numOfRows += pRowSup->numNullRows;
937✔
1121
    resetNumNullRows(pRowSup);
937✔
1122
  }
1123

1124
  int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
79,033✔
1125
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1126
  if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
79,033!
1127
    T_LONG_JMP(pTaskInfo->env, ret);
×
1128
  }
1129

1130
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
79,033✔
1131
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
79,032✔
1132
                                        pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
79,032✔
1133
  if (ret != TSDB_CODE_SUCCESS) {
79,033✔
1134
    T_LONG_JMP(pTaskInfo->env, ret);
2!
1135
  }
1136
}
1137

1138
static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) {
72,607✔
1139
  if (OPTR_IS_OPENED(pOperator)) {
72,607✔
1140
    return TSDB_CODE_SUCCESS;
2,701✔
1141
  }
1142

1143
  int32_t                   code = TSDB_CODE_SUCCESS;
69,906✔
1144
  int32_t                   lino = 0;
69,906✔
1145
  SStateWindowOperatorInfo* pInfo = pOperator->info;
69,906✔
1146
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
69,906✔
1147

1148
  SExprSupp* pSup = &pOperator->exprSupp;
69,906✔
1149
  int32_t    order = pInfo->binfo.inputTsOrder;
69,906✔
1150
  int64_t    st = taosGetTimestampUs();
69,906✔
1151

1152
  SOperatorInfo* downstream = pOperator->pDownstream[0];
69,906✔
1153
  pInfo->cleanGroupResInfo = false;
69,906✔
1154
  while (1) {
80,149✔
1155
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
150,055✔
1156
    if (pBlock == NULL) {
150,052✔
1157
      break;
69,901✔
1158
    }
1159

1160
    pInfo->binfo.pRes->info.scanFlag = pBlock->info.scanFlag;
80,151✔
1161
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
80,151✔
1162
    QUERY_CHECK_CODE(code, lino, _end);
80,151!
1163

1164
    code = blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
80,151✔
1165
    QUERY_CHECK_CODE(code, lino, _end);
80,151!
1166

1167
    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
1168
    if (pInfo->scalarSup.pExprInfo != NULL) {
80,151✔
1169
      pTaskInfo->code =
476✔
1170
          projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
476✔
1171
                                pInfo->scalarSup.numOfExprs, NULL, GET_STM_RTINFO(pOperator->pTaskInfo));
476!
1172
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
476!
1173
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1174
      }
1175
    }
1176

1177
    doStateWindowAggImpl(pOperator, pInfo, pBlock);
80,151✔
1178
  }
1179

1180
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
69,901✔
1181
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
69,901✔
1182
  QUERY_CHECK_CODE(code, lino, _end);
69,901!
1183
  pInfo->cleanGroupResInfo = true;
69,901✔
1184
  pOperator->status = OP_RES_TO_RETURN;
69,901✔
1185

1186
_end:
69,901✔
1187
  if (code != TSDB_CODE_SUCCESS) {
69,901!
1188
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1189
    pTaskInfo->code = code;
×
1190
    T_LONG_JMP(pTaskInfo->env, code);
×
1191
  }
1192
  return code;
69,901✔
1193
}
1194

1195
static int32_t doStateWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
125,499✔
1196
  if (pOperator->status == OP_EXEC_DONE) {
125,499✔
1197
    (*ppRes) = NULL;
52,892✔
1198
    return TSDB_CODE_SUCCESS;
52,892✔
1199
  }
1200

1201
  int32_t                   code = TSDB_CODE_SUCCESS;
72,607✔
1202
  int32_t                   lino = 0;
72,607✔
1203
  SStateWindowOperatorInfo* pInfo = pOperator->info;
72,607✔
1204
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
72,607✔
1205
  SOptrBasicInfo*           pBInfo = &pInfo->binfo;
72,607✔
1206

1207
  code = pOperator->fpSet._openFn(pOperator);
72,607✔
1208
  QUERY_CHECK_CODE(code, lino, _end);
72,602!
1209

1210
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
72,602✔
1211
  QUERY_CHECK_CODE(code, lino, _end);
72,602!
1212

1213
  while (1) {
×
1214
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
72,602✔
1215
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
72,601✔
1216
    QUERY_CHECK_CODE(code, lino, _end);
72,601!
1217

1218
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
72,601✔
1219
    if (!hasRemain) {
72,602✔
1220
      setOperatorCompleted(pOperator);
69,857✔
1221
      break;
69,857✔
1222
    }
1223

1224
    if (pBInfo->pRes->info.rows > 0) {
2,745!
1225
      break;
2,745✔
1226
    }
1227
  }
1228

1229
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
72,602✔
1230

1231
_end:
72,602✔
1232
  if (code != TSDB_CODE_SUCCESS) {
72,602!
1233
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1234
    pTaskInfo->code = code;
×
1235
    T_LONG_JMP(pTaskInfo->env, code);
×
1236
  }
1237
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
72,602✔
1238
  return code;
72,602✔
1239
}
1240

1241
static int32_t doBuildIntervalResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
2,525,646✔
1242
  int32_t                   code = TSDB_CODE_SUCCESS;
2,525,646✔
1243
  int32_t                   lino = 0;
2,525,646✔
1244
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
2,525,646✔
1245
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
2,525,646✔
1246

1247
  if (pOperator->status == OP_EXEC_DONE) {
2,525,646✔
1248
    (*ppRes) = NULL;
793,213✔
1249
    return code;
793,213✔
1250
  }
1251

1252
  SSDataBlock* pBlock = pInfo->binfo.pRes;
1,732,433✔
1253
  code = pOperator->fpSet._openFn(pOperator);
1,732,433✔
1254
  QUERY_CHECK_CODE(code, lino, _end);
1,737,831!
1255

1256
  while (1) {
×
1257
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
1,737,831✔
1258
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL, NULL);
1,737,602✔
1259
    QUERY_CHECK_CODE(code, lino, _end);
1,737,558!
1260

1261
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
1,737,558✔
1262
    if (!hasRemain) {
1,737,670✔
1263
      setOperatorCompleted(pOperator);
1,428,744✔
1264
      break;
1,428,976✔
1265
    }
1266

1267
    if (pBlock->info.rows > 0) {
308,926!
1268
      break;
308,926✔
1269
    }
1270
  }
1271

1272
  size_t rows = pBlock->info.rows;
1,737,902✔
1273
  pOperator->resultInfo.totalRows += rows;
1,737,902✔
1274

1275
_end:
1,737,902✔
1276
  if (code != TSDB_CODE_SUCCESS) {
1,737,902!
1277
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1278
    pTaskInfo->code = code;
×
1279
    T_LONG_JMP(pTaskInfo->env, code);
×
1280
  }
1281
  (*ppRes) = (rows == 0) ? NULL : pBlock;
1,737,902✔
1282
  return code;
1,737,902✔
1283
}
1284

1285
static void destroyStateWindowOperatorInfo(void* param) {
69,795✔
1286
  if (param == NULL) {
69,795!
1287
    return;
×
1288
  }
1289
  SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
69,795✔
1290
  cleanupBasicInfo(&pInfo->binfo);
69,795✔
1291
  taosMemoryFreeClear(pInfo->stateKey.pData);
69,795!
1292
  if (pInfo->pOperator) {
69,795!
1293
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
69,795✔
1294
                      pInfo->cleanGroupResInfo);
69,795✔
1295
    pInfo->pOperator = NULL;
69,795✔
1296
  }
1297

1298
  cleanupExprSupp(&pInfo->scalarSup);
69,795✔
1299
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
69,795✔
1300
  cleanupAggSup(&pInfo->aggSup);
69,795✔
1301
  cleanupGroupResInfo(&pInfo->groupResInfo);
69,795✔
1302

1303
  taosMemoryFreeClear(param);
69,795!
1304
}
1305

1306
static void freeItem(void* param) {
184,586✔
1307
  SGroupKeys* pKey = (SGroupKeys*)param;
184,586✔
1308
  taosMemoryFree(pKey->pData);
184,586!
1309
}
184,586✔
1310

1311
void destroyIntervalOperatorInfo(void* param) {
1,970,190✔
1312
  if (param == NULL) {
1,970,190!
1313
    return;
×
1314
  }
1315

1316
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
1,970,190✔
1317

1318
  cleanupBasicInfo(&pInfo->binfo);
1,970,190✔
1319
  if (pInfo->pOperator) {
1,970,370!
1320
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
1,970,373✔
1321
                      pInfo->cleanGroupResInfo);
1,970,373✔
1322
    pInfo->pOperator = NULL;
1,970,224✔
1323
  }
1324

1325
  cleanupAggSup(&pInfo->aggSup);
1,970,221✔
1326
  cleanupExprSupp(&pInfo->scalarSupp);
1,970,330✔
1327

1328
  pInfo->binfo.resultRowInfo.openWindow = tdListFree(pInfo->binfo.resultRowInfo.openWindow);
1,970,257✔
1329

1330
  taosArrayDestroy(pInfo->pInterpCols);
1,970,262✔
1331
  pInfo->pInterpCols = NULL;
1,970,047✔
1332

1333
  taosArrayDestroyEx(pInfo->pPrevValues, freeItem);
1,970,047✔
1334
  pInfo->pPrevValues = NULL;
1,969,931✔
1335

1336
  cleanupGroupResInfo(&pInfo->groupResInfo);
1,969,931✔
1337
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
1,970,335✔
1338
  destroyBoundedQueue(pInfo->pBQ);
1,970,345✔
1339
  taosMemoryFreeClear(param);
1,969,960!
1340
}
1341

1342
static int32_t initWindowInterpPrevVal(SIntervalAggOperatorInfo* pInfo) {
92,292✔
1343
  int32_t code = TSDB_CODE_SUCCESS;
92,292✔
1344
  int32_t lino = 0;
92,292✔
1345
  void*   tmp = NULL;
92,292✔
1346

1347
  pInfo->pInterpCols = taosArrayInit(4, sizeof(SColumn));
92,292✔
1348
  QUERY_CHECK_NULL(pInfo->pInterpCols, code, lino, _end, terrno);
92,293!
1349

1350
  pInfo->pPrevValues = taosArrayInit(4, sizeof(SGroupKeys));
92,293✔
1351
  QUERY_CHECK_NULL(pInfo->pPrevValues, code, lino, _end, terrno);
92,293!
1352

1353
  {  // ts column
1354
    SColumn c = {0};
92,293✔
1355
    c.colId = 1;
92,293✔
1356
    c.slotId = pInfo->primaryTsIndex;
92,293✔
1357
    c.type = TSDB_DATA_TYPE_TIMESTAMP;
92,293✔
1358
    c.bytes = sizeof(int64_t);
92,293✔
1359
    tmp = taosArrayPush(pInfo->pInterpCols, &c);
92,293✔
1360
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
92,291!
1361

1362
    SGroupKeys key;
1363
    key.bytes = c.bytes;
92,291✔
1364
    key.type = c.type;
92,291✔
1365
    key.isNull = true;  // to denote no value is assigned yet
92,291✔
1366
    key.pData = taosMemoryCalloc(1, c.bytes);
92,291!
1367
    QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
92,293!
1368

1369
    tmp = taosArrayPush(pInfo->pPrevValues, &key);
92,293✔
1370
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
92,292!
1371
  }
1372
_end:
92,292✔
1373
  if (code != TSDB_CODE_SUCCESS) {
92,292!
1374
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1375
  }
1376
  return code;
92,291✔
1377
}
1378

1379
static int32_t timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SIntervalAggOperatorInfo* pInfo,
1,966,698✔
1380
                                      bool* pRes) {
1381
  // the primary timestamp column
1382
  bool    needed = false;
1,966,698✔
1383
  int32_t code = TSDB_CODE_SUCCESS;
1,966,698✔
1384
  int32_t lino = 0;
1,966,698✔
1385
  void*   tmp = NULL;
1,966,698✔
1386

1387
  for (int32_t i = 0; i < numOfCols; ++i) {
6,273,105✔
1388
    SExprInfo* pExpr = pCtx[i].pExpr;
4,397,497✔
1389
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
4,397,497✔
1390
      needed = true;
92,292✔
1391
      break;
92,292✔
1392
    }
1393
  }
1394

1395
  if (needed) {
1,967,900✔
1396
    code = initWindowInterpPrevVal(pInfo);
92,292✔
1397
    QUERY_CHECK_CODE(code, lino, _end);
92,292!
1398
  }
1399

1400
  for (int32_t i = 0; i < numOfCols; ++i) {
6,363,236✔
1401
    SExprInfo* pExpr = pCtx[i].pExpr;
4,392,894✔
1402

1403
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
4,392,894✔
1404
      SFunctParam* pParam = &pExpr->base.pParam[0];
93,014✔
1405

1406
      SColumn c = *pParam->pCol;
93,014✔
1407
      tmp = taosArrayPush(pInfo->pInterpCols, &c);
93,014✔
1408
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
92,293!
1409

1410
      SGroupKeys key = {0};
92,293✔
1411
      key.bytes = c.bytes;
92,293✔
1412
      key.type = c.type;
92,293✔
1413
      key.isNull = false;
92,293✔
1414
      key.pData = taosMemoryCalloc(1, c.bytes);
92,293!
1415
      QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
92,293!
1416

1417
      tmp = taosArrayPush(pInfo->pPrevValues, &key);
92,293✔
1418
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
92,291!
1419
    }
1420
  }
1421

1422
_end:
1,970,342✔
1423
  if (code != TSDB_CODE_SUCCESS) {
1,970,342!
1424
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1425
  }
1426
  *pRes = needed;
1,968,107✔
1427
  return code;
1,968,107✔
1428
}
1429

1430
static int32_t resetInterval(SOperatorInfo* pOper, SIntervalAggOperatorInfo* pIntervalInfo){
×
1431
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
×
1432
  SIntervalPhysiNode* pPhynode = (SIntervalPhysiNode*)pOper->pPhyNode;
×
1433
  pOper->status = OP_NOT_OPENED;
×
1434

1435
  resetBasicOperatorState(&pIntervalInfo->binfo);
×
1436
  cleanupResultInfo(pIntervalInfo->pOperator->pTaskInfo, &pIntervalInfo->pOperator->exprSupp, &pIntervalInfo->groupResInfo, &pIntervalInfo->aggSup,
×
1437
    pIntervalInfo->cleanGroupResInfo);
×
1438

1439
  colDataDestroy(&pIntervalInfo->twAggSup.timeWindowData);
×
1440
  int32_t code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pTaskInfo->window);
×
1441
  if (code == 0) {
×
1442
    code = resetAggSup(&pOper->exprSupp, &pIntervalInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
×
1443
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
×
1444
                       &pTaskInfo->storageAPI.functionStore);
1445
  }
1446
  if (code == 0) {
×
1447
    code = resetExprSupp(&pIntervalInfo->scalarSupp, pTaskInfo, pPhynode->window.pExprs, NULL,
×
1448
                         &pTaskInfo->storageAPI.functionStore);
1449
  }
1450

1451
  if (pIntervalInfo->binfo.resultRowInfo.openWindow != NULL){
×
1452
    tdListEmpty(pIntervalInfo->binfo.resultRowInfo.openWindow);
×
1453
  }
1454

1455
  if (pPhynode->window.node.pSlimit && ((SLimitNode*)pPhynode->window.node.pSlimit)->limit) {
×
1456
    pIntervalInfo->curGroupId = UINT64_MAX;
×
1457
  }
1458

1459
  pIntervalInfo->cleanGroupResInfo = false;
×
1460
  pIntervalInfo->handledGroupNum = 0;
×
1461
  pIntervalInfo->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder;
×
1462
  pIntervalInfo->binfo.outputTsOrder = pPhynode->window.node.outputTsOrder;
×
1463

1464
  taosArrayDestroy(pIntervalInfo->pInterpCols);
×
1465
  pIntervalInfo->pInterpCols = NULL;
×
1466

1467
  if (pIntervalInfo->pPrevValues != NULL) {
×
1468
    taosArrayDestroyEx(pIntervalInfo->pPrevValues, freeItem);
×
1469
    pIntervalInfo->pPrevValues = NULL;
×
1470
    code = initWindowInterpPrevVal(pIntervalInfo);
×
1471
  }
1472

1473
  cleanupGroupResInfo(&pIntervalInfo->groupResInfo);
×
1474
  destroyBoundedQueue(pIntervalInfo->pBQ);
×
1475
  pIntervalInfo->pBQ = NULL;
×
1476
  return code;
×
1477
}
1478

1479
static int32_t resetIntervalOperState(SOperatorInfo* pOper) {
×
1480
  SIntervalAggOperatorInfo* pInfo = pOper->info;
×
1481
  return resetInterval(pOper, pInfo);
×
1482
}
1483

1484
int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
1,426,471✔
1485
                                   SOperatorInfo** pOptrInfo) {
1486
  QRY_PARAM_CHECK(pOptrInfo);
1,426,471!
1487

1488
  int32_t                   code = TSDB_CODE_SUCCESS;
1,426,471✔
1489
  int32_t                   lino = 0;
1,426,471✔
1490
  SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
1,426,471!
1491
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1,427,685!
1492
  if (pInfo == NULL || pOperator == NULL) {
1,427,952!
1493
    code = terrno;
×
1494
    lino = __LINE__;
×
1495
    goto _error;
×
1496
  }
1497

1498
  pOperator->pPhyNode = pPhyNode;
1,428,031✔
1499
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->window.node.pOutputDataBlockDesc);
1,428,031✔
1500
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
1,427,330!
1501
  initBasicInfo(&pInfo->binfo, pResBlock);
1,427,330✔
1502

1503
  SExprSupp* pSup = &pOperator->exprSupp;
1,427,744✔
1504
  pSup->hasWindowOrGroup = true;
1,427,744✔
1505

1506
  pInfo->primaryTsIndex = ((SColumnNode*)pPhyNode->window.pTspk)->slotId;
1,427,744✔
1507

1508
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
1,427,744✔
1509
  initResultSizeInfo(&pOperator->resultInfo, 512);
1,427,744✔
1510
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
1,428,246✔
1511
  QUERY_CHECK_CODE(code, lino, _error);
1,428,505!
1512

1513
  int32_t    num = 0;
1,428,505✔
1514
  SExprInfo* pExprInfo = NULL;
1,428,505✔
1515
  code = createExprInfo(pPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
1,428,505✔
1516
  QUERY_CHECK_CODE(code, lino, _error);
1,428,284!
1517

1518
  code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
1,428,284✔
1519
                    &pTaskInfo->storageAPI.functionStore);
1520
  QUERY_CHECK_CODE(code, lino, _error);
1,428,748!
1521

1522
  SInterval interval = {.interval = pPhyNode->interval,
1,428,748✔
1523
                        .sliding = pPhyNode->sliding,
1,428,748✔
1524
                        .intervalUnit = pPhyNode->intervalUnit,
1,428,748✔
1525
                        .slidingUnit = pPhyNode->slidingUnit,
1,428,748✔
1526
                        .offset = pPhyNode->offset,
1,428,748✔
1527
                        .precision = ((SColumnNode*)pPhyNode->window.pTspk)->node.resType.precision,
1,428,748✔
1528
                        .timeRange = pPhyNode->timeRange};
1529
  calcIntervalAutoOffset(&interval);
1,428,748✔
1530

1531
  STimeWindowAggSupp as = {
1,427,832✔
1532
      .waterMark = pPhyNode->window.watermark,
1,427,832✔
1533
      .calTrigger = pPhyNode->window.triggerType,
1,427,832✔
1534
      .maxTs = INT64_MIN,
1535
  };
1536

1537
  pInfo->win = pTaskInfo->window;
1,427,832✔
1538
  pInfo->binfo.inputTsOrder = pPhyNode->window.node.inputTsOrder;
1,427,832✔
1539
  pInfo->binfo.outputTsOrder = pPhyNode->window.node.outputTsOrder;
1,427,832✔
1540
  pInfo->interval = interval;
1,427,832✔
1541
  pInfo->twAggSup = as;
1,427,832✔
1542
  pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock;
1,427,832✔
1543
  if (pPhyNode->window.node.pLimit && ((SLimitNode*)pPhyNode->window.node.pLimit)->limit) {
1,427,832!
1544
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pLimit;
41,079✔
1545
    pInfo->limited = true;
41,079✔
1546
    pInfo->limit = pLimit->limit->datum.i + (pLimit->offset ? pLimit->offset->datum.i : 0);
41,079✔
1547
  }
1548
  if (pPhyNode->window.node.pSlimit && ((SLimitNode*)pPhyNode->window.node.pSlimit)->limit) {
1,427,832!
1549
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pSlimit;
136✔
1550
    pInfo->slimited = true;
136✔
1551
    pInfo->slimit = pLimit->limit->datum.i + (pLimit->offset ? pLimit->offset->datum.i : 0);
136✔
1552
    pInfo->curGroupId = UINT64_MAX;
136✔
1553
  }
1554

1555
  if (pPhyNode->window.pExprs != NULL) {
1,427,832✔
1556
    int32_t    numOfScalar = 0;
467✔
1557
    SExprInfo* pScalarExprInfo = NULL;
467✔
1558
    code = createExprInfo(pPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
467✔
1559
    QUERY_CHECK_CODE(code, lino, _error);
467!
1560

1561
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
467✔
1562
    if (code != TSDB_CODE_SUCCESS) {
467!
1563
      goto _error;
×
1564
    }
1565
  }
1566

1567
  code = filterInitFromNode((SNode*)pPhyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
1,427,832✔
1568
                            pTaskInfo->pStreamRuntimeInfo);
1,427,832✔
1569
  if (code != TSDB_CODE_SUCCESS) {
1,425,679!
1570
    goto _error;
×
1571
  }
1572

1573
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
1,425,679✔
1574
  QUERY_CHECK_CODE(code, lino, _error);
1,428,184!
1575

1576
  pInfo->timeWindowInterpo = false;
1,428,184✔
1577
  code = timeWindowinterpNeeded(pSup->pCtx, num, pInfo, &pInfo->timeWindowInterpo);
1,428,184✔
1578
  QUERY_CHECK_CODE(code, lino, _error);
1,427,193!
1579
  if (pInfo->timeWindowInterpo) {
1,427,193✔
1580
    pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
92,291✔
1581
    if (pInfo->binfo.resultRowInfo.openWindow == NULL) {
92,293!
1582
      goto _error;
×
1583
    }
1584
  }
1585

1586
  pInfo->pOperator = pOperator;
1,427,195✔
1587
  pInfo->cleanGroupResInfo = false;
1,427,195✔
1588
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
1,427,195✔
1589
  setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED,
1,424,455✔
1590
                  pInfo, pTaskInfo);
1591

1592
  pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResultNext, NULL, destroyIntervalOperatorInfo,
1,424,899✔
1593
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1594
  setOperatorResetStateFn(pOperator, resetIntervalOperState);
1,424,828✔
1595
  code = appendDownstream(pOperator, &downstream, 1);
1,426,106✔
1596
  if (code != TSDB_CODE_SUCCESS) {
1,427,643!
1597
    goto _error;
×
1598
  }
1599

1600
  *pOptrInfo = pOperator;
1,427,643✔
1601
  return TSDB_CODE_SUCCESS;
1,427,643✔
1602

1603
_error:
×
1604
  if (pInfo != NULL) {
×
1605
    destroyIntervalOperatorInfo(pInfo);
×
1606
  }
1607

1608
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1609
  pTaskInfo->code = code;
×
1610
  qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code));
×
1611
  return code;
×
1612
}
1613

1614
// todo handle multiple timeline cases. assume no timeline interweaving
1615
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) {
103,335✔
1616
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
103,335✔
1617
  SExprSupp*     pSup = &pOperator->exprSupp;
103,335✔
1618

1619
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
103,335✔
1620
  if (!pColInfoData) {
103,334!
1621
    pTaskInfo->code = terrno;
×
1622
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1623
  }
1624

1625
  bool    masterScan = true;
103,335✔
1626
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
103,335✔
1627
  int64_t gid = pBlock->info.id.groupId;
103,335✔
1628

1629
  int64_t gap = pInfo->gap;
103,335✔
1630

1631
  if (!pInfo->reptScan) {
103,335✔
1632
    pInfo->reptScan = true;
76,179✔
1633
    pInfo->winSup.prevTs = INT64_MIN;
76,179✔
1634
  }
1635

1636
  SWindowRowsSup* pRowSup = &pInfo->winSup;
103,335✔
1637
  pRowSup->numOfRows = 0;
103,335✔
1638
  pRowSup->startRowIndex = 0;
103,335✔
1639

1640
  // In case of ascending or descending order scan data, only one time window needs to be kepted for each table.
1641
  TSKEY* tsList = (TSKEY*)pColInfoData->pData;
103,335✔
1642
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
35,110,605✔
1643
    if (gid != pRowSup->groupId || pInfo->winSup.prevTs == INT64_MIN) {
35,007,244✔
1644
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
82,591✔
1645
      doKeepTuple(pRowSup, tsList[j], j, gid);
82,562✔
1646
    } else if (((tsList[j] - pRowSup->prevTs >= 0) && (tsList[j] - pRowSup->prevTs <= gap)) ||
34,924,653✔
1647
               ((pRowSup->prevTs - tsList[j] >= 0) && (pRowSup->prevTs - tsList[j] <= gap))) {
9,015,773✔
1648
      // The gap is less than the threshold, so it belongs to current session window that has been opened already.
1649
      doKeepTuple(pRowSup, tsList[j], j, gid);
25,908,940✔
1650
    } else {  // start a new session window
1651
      // start a new session window
1652
      if (pRowSup->numOfRows > 0) {  // handled data that belongs to the previous session window
9,015,713✔
1653
        SResultRow* pResult = NULL;
9,008,607✔
1654

1655
        // keep the time window for the closed time window.
1656
        STimeWindow window = pRowSup->win;
9,008,607✔
1657

1658
        int32_t ret =
1659
            setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
9,008,607✔
1660
                                   numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1661
        if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
9,008,412!
1662
          T_LONG_JMP(pTaskInfo->env, ret);
×
1663
        }
1664

1665
        // pInfo->numOfRows data belong to the current session window
1666
        updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
9,008,412✔
1667
        ret =
1668
            applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData,
9,008,366✔
1669
                                            pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
9,008,366✔
1670
        if (ret != TSDB_CODE_SUCCESS) {
9,008,753!
1671
          T_LONG_JMP(pTaskInfo->env, ret);
×
1672
        }
1673
      }
1674

1675
      // here we start a new session window
1676
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
9,015,859✔
1677
      doKeepTuple(pRowSup, tsList[j], j, gid);
9,015,836✔
1678
    }
1679
  }
1680

1681
  SResultRow* pResult = NULL;
103,361✔
1682
  pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
103,361✔
1683
  int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
103,361✔
1684
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1685
  if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
103,334!
1686
    T_LONG_JMP(pTaskInfo->env, ret);
×
1687
  }
1688

1689
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
103,334✔
1690
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
103,333✔
1691
                                        pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
103,333✔
1692
  if (ret != TSDB_CODE_SUCCESS) {
103,334!
1693
    T_LONG_JMP(pTaskInfo->env, ret);
×
1694
  }
1695
}
103,334✔
1696

1697
static int32_t doSessionWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
168,726✔
1698
  if (pOperator->status == OP_EXEC_DONE) {
168,726✔
1699
    (*ppRes) = NULL;
75,350✔
1700
    return TSDB_CODE_SUCCESS;
75,350✔
1701
  }
1702

1703
  int32_t                  code = TSDB_CODE_SUCCESS;
93,376✔
1704
  int32_t                  lino = 0;
93,376✔
1705
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
93,376✔
1706
  SSessionAggOperatorInfo* pInfo = pOperator->info;
93,376✔
1707
  SOptrBasicInfo*          pBInfo = &pInfo->binfo;
93,376✔
1708
  SExprSupp*               pSup = &pOperator->exprSupp;
93,376✔
1709

1710
  if (pOperator->status == OP_RES_TO_RETURN) {
93,376✔
1711
    while (1) {
×
1712
      doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
1,472✔
1713
      code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
1,472✔
1714
      QUERY_CHECK_CODE(code, lino, _end);
1,472!
1715

1716
      bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
1,472✔
1717
      if (!hasRemain) {
1,472✔
1718
        setOperatorCompleted(pOperator);
21✔
1719
        break;
21✔
1720
      }
1721

1722
      if (pBInfo->pRes->info.rows > 0) {
1,451!
1723
        break;
1,451✔
1724
      }
1725
    }
1726
    pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
1,472✔
1727
    (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
1,472!
1728
    return code;
1,472✔
1729
  }
1730

1731
  int64_t st = taosGetTimestampUs();
91,905✔
1732
  int32_t order = pInfo->binfo.inputTsOrder;
91,905✔
1733

1734
  SOperatorInfo* downstream = pOperator->pDownstream[0];
91,905✔
1735

1736
  pInfo->cleanGroupResInfo = false;
91,905✔
1737
  while (1) {
103,334✔
1738
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
195,239✔
1739
    if (pBlock == NULL) {
195,238✔
1740
      break;
91,903✔
1741
    }
1742

1743
    pBInfo->pRes->info.scanFlag = pBlock->info.scanFlag;
103,335✔
1744
    if (pInfo->scalarSupp.pExprInfo != NULL) {
103,335✔
1745
      SExprSupp* pExprSup = &pInfo->scalarSupp;
3✔
1746
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
3✔
1747
                                   GET_STM_RTINFO(pOperator->pTaskInfo));
3!
1748
      QUERY_CHECK_CODE(code, lino, _end);
3!
1749
    }
1750
    // the pDataBlock are always the same one, no need to call this again
1751
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
103,335✔
1752
    QUERY_CHECK_CODE(code, lino, _end);
103,335!
1753

1754
    code = blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
103,335✔
1755
    QUERY_CHECK_CODE(code, lino, _end);
103,334!
1756

1757
    doSessionWindowAggImpl(pOperator, pInfo, pBlock);
103,334✔
1758
  }
1759

1760
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
91,903✔
1761

1762
  // restore the value
1763
  pOperator->status = OP_RES_TO_RETURN;
91,903✔
1764

1765
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
91,903✔
1766
  QUERY_CHECK_CODE(code, lino, _end);
91,905!
1767
  pInfo->cleanGroupResInfo = true;
91,905✔
1768

1769
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
91,905✔
1770
  QUERY_CHECK_CODE(code, lino, _end);
91,905!
1771
  while (1) {
×
1772
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
91,905✔
1773
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
91,904✔
1774
    QUERY_CHECK_CODE(code, lino, _end);
91,904!
1775

1776
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
91,904✔
1777
    if (!hasRemain) {
91,905✔
1778
      setOperatorCompleted(pOperator);
91,819✔
1779
      break;
91,819✔
1780
    }
1781

1782
    if (pBInfo->pRes->info.rows > 0) {
86!
1783
      break;
86✔
1784
    }
1785
  }
1786
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
91,905✔
1787

1788
_end:
91,905✔
1789
  if (code != TSDB_CODE_SUCCESS) {
91,905!
1790
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1791
    pTaskInfo->code = code;
×
1792
    T_LONG_JMP(pTaskInfo->env, code);
×
1793
  }
1794
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
91,905✔
1795
  return code;
91,905✔
1796
}
1797

1798
static int32_t resetStatewindowOperState(SOperatorInfo* pOper) {
120✔
1799
  SStateWindowOperatorInfo* pInfo = pOper->info;
120✔
1800
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
120✔
1801
  SStateWindowPhysiNode* pPhynode = (SStateWindowPhysiNode*)pOper->pPhyNode;
120✔
1802
  pOper->status = OP_NOT_OPENED;
120✔
1803

1804
  resetBasicOperatorState(&pInfo->binfo);
120✔
1805
  cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
120✔
1806
                    pInfo->cleanGroupResInfo);
120✔
1807

1808
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
120✔
1809
  int32_t code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
120✔
1810
  if (code == 0) {
120!
1811
    code = resetAggSup(&pOper->exprSupp, &pInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
120✔
1812
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
120✔
1813
                       &pTaskInfo->storageAPI.functionStore);
1814
  }
1815
  if (code == 0) {
120!
1816
    code = resetExprSupp(&pInfo->scalarSup, pTaskInfo, pPhynode->window.pExprs, NULL,
120✔
1817
                         &pTaskInfo->storageAPI.functionStore);
1818
  }
1819

1820
  pInfo->cleanGroupResInfo = false;
120✔
1821
  pInfo->hasKey = false;
120✔
1822

1823
  cleanupGroupResInfo(&pInfo->groupResInfo);
120✔
1824
  memset(pInfo->stateKey.pData, 0, pInfo->stateKey.bytes);
120✔
1825
  return code;
120✔
1826
}
1827

1828
// todo make this as an non-blocking operator
1829
int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWindowPhysiNode* pStateNode,
69,795✔
1830
                                      SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
1831
  QRY_PARAM_CHECK(pOptrInfo);
69,795!
1832

1833
  int32_t                   code = TSDB_CODE_SUCCESS;
69,795✔
1834
  int32_t                   lino = 0;
69,795✔
1835
  SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
69,795!
1836
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
69,795!
1837
  if (pInfo == NULL || pOperator == NULL) {
69,795!
1838
    code = terrno;
×
1839
    goto _error;
×
1840
  }
1841

1842
  pOperator->pPhyNode = pStateNode;
69,795✔
1843
  pOperator->exprSupp.hasWindowOrGroup = true;
69,795✔
1844
  int32_t      tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
69,795✔
1845
  SColumnNode* pColNode = (SColumnNode*)(pStateNode->pStateKey);
69,795✔
1846

1847
  if (pStateNode->window.pExprs != NULL) {
69,795✔
1848
    int32_t    numOfScalarExpr = 0;
355✔
1849
    SExprInfo* pScalarExprInfo = NULL;
355✔
1850
    code = createExprInfo(pStateNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
355✔
1851
    QUERY_CHECK_CODE(code, lino, _error);
355!
1852

1853
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
355✔
1854
    if (code != TSDB_CODE_SUCCESS) {
355!
1855
      goto _error;
×
1856
    }
1857
  }
1858

1859
  pInfo->stateCol = extractColumnFromColumnNode(pColNode);
69,795✔
1860
  pInfo->stateKey.type = pInfo->stateCol.type;
69,795✔
1861
  pInfo->stateKey.bytes = pInfo->stateCol.bytes;
69,795✔
1862
  pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes);
69,795✔
1863
  if (pInfo->stateKey.pData == NULL) {
69,795!
1864
    goto _error;
×
1865
  }
1866
  pInfo->binfo.inputTsOrder = pStateNode->window.node.inputTsOrder;
69,795✔
1867
  pInfo->binfo.outputTsOrder = pStateNode->window.node.outputTsOrder;
69,795✔
1868

1869
  code = filterInitFromNode((SNode*)pStateNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
69,795✔
1870
                            pTaskInfo->pStreamRuntimeInfo);
69,795✔
1871
  if (code != TSDB_CODE_SUCCESS) {
69,795!
1872
    goto _error;
×
1873
  }
1874

1875
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
69,795✔
1876

1877
  int32_t    num = 0;
69,795✔
1878
  SExprInfo* pExprInfo = NULL;
69,795✔
1879
  code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &num);
69,795✔
1880
  QUERY_CHECK_CODE(code, lino, _error);
69,795!
1881

1882
  initResultSizeInfo(&pOperator->resultInfo, 4096);
69,795✔
1883

1884
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
69,795✔
1885
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
69,795✔
1886
  if (code != TSDB_CODE_SUCCESS) {
69,795!
1887
    goto _error;
×
1888
  }
1889

1890
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pStateNode->window.node.pOutputDataBlockDesc);
69,795✔
1891
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
69,795!
1892
  initBasicInfo(&pInfo->binfo, pResBlock);
69,795✔
1893
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
69,795✔
1894

1895
  pInfo->twAggSup =
69,795✔
1896
      (STimeWindowAggSupp){.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};
69,795✔
1897

1898
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
69,795✔
1899
  QUERY_CHECK_CODE(code, lino, _error);
69,794!
1900

1901
  pInfo->tsSlotId = tsSlotId;
69,794✔
1902
  pInfo->pOperator = pOperator;
69,794✔
1903
  pInfo->cleanGroupResInfo = false;
69,794✔
1904
  pInfo->extendOption = pStateNode->extendOption;
69,794✔
1905
  pInfo->trueForLimit = pStateNode->trueForLimit;
69,794✔
1906
  setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
69,794✔
1907
                  pTaskInfo);
1908
  pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAggNext, NULL, destroyStateWindowOperatorInfo,
69,795✔
1909
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1910
  setOperatorResetStateFn(pOperator, resetStatewindowOperState);
69,795✔
1911

1912
  code = appendDownstream(pOperator, &downstream, 1);
69,794✔
1913
  if (code != TSDB_CODE_SUCCESS) {
69,794!
1914
    goto _error;
×
1915
  }
1916

1917
  *pOptrInfo = pOperator;
69,794✔
1918
  return TSDB_CODE_SUCCESS;
69,794✔
1919

1920
_error:
×
1921
  if (pInfo != NULL) {
×
1922
    destroyStateWindowOperatorInfo(pInfo);
×
1923
  }
1924

1925
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1926
  pTaskInfo->code = code;
×
1927
  return code;
×
1928
}
1929

1930
void destroySWindowOperatorInfo(void* param) {
91,905✔
1931
  SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param;
91,905✔
1932
  if (pInfo == NULL) {
91,905!
1933
    return;
×
1934
  }
1935

1936
  cleanupBasicInfo(&pInfo->binfo);
91,905✔
1937
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
91,905✔
1938
  if (pInfo->pOperator) {
91,905!
1939
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
91,905✔
1940
                      pInfo->cleanGroupResInfo);
91,905✔
1941
    pInfo->pOperator = NULL;
91,905✔
1942
  }
1943

1944
  cleanupAggSup(&pInfo->aggSup);
91,905✔
1945
  cleanupExprSupp(&pInfo->scalarSupp);
91,905✔
1946

1947
  cleanupGroupResInfo(&pInfo->groupResInfo);
91,905✔
1948
  taosMemoryFreeClear(param);
91,905!
1949
}
1950

1951
static int32_t resetSessionAggOperState(SOperatorInfo* pOper) {
×
1952
  SSessionAggOperatorInfo* pInfo = pOper->info;
×
1953
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
×
1954
  SSessionWinodwPhysiNode* pPhynode = (SSessionWinodwPhysiNode*)pOper->pPhyNode;
×
1955
  pOper->status = OP_NOT_OPENED;
×
1956

1957
  resetBasicOperatorState(&pInfo->binfo);
×
1958
  cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
×
1959
                    pInfo->cleanGroupResInfo);
×
1960

1961
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
×
1962
  int32_t code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
×
1963
  if (code == 0) {
×
1964
    code = resetAggSup(&pOper->exprSupp, &pInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
×
1965
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
×
1966
                       &pTaskInfo->storageAPI.functionStore);
1967
  }
1968
  if (code == 0) {
×
1969
    code = resetExprSupp(&pInfo->scalarSupp, pTaskInfo, pPhynode->window.pExprs, NULL,
×
1970
                         &pTaskInfo->storageAPI.functionStore);
1971
  }
1972

1973
  pInfo->cleanGroupResInfo = false;
×
1974
  pInfo->winSup = (SWindowRowsSup){0};
×
1975
  pInfo->winSup.prevTs = INT64_MIN;
×
1976
  pInfo->reptScan = false;
×
1977

1978
  cleanupGroupResInfo(&pInfo->groupResInfo);
×
1979
  return code;
×
1980
}
1981

1982
int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
91,905✔
1983
                                     SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
1984
  QRY_PARAM_CHECK(pOptrInfo);
91,905!
1985

1986
  int32_t                  code = TSDB_CODE_SUCCESS;
91,905✔
1987
  int32_t                  lino = 0;
91,905✔
1988
  SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
91,905!
1989
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
91,905!
1990
  if (pInfo == NULL || pOperator == NULL) {
91,904!
1991
    code = terrno;
×
1992
    goto _error;
×
1993
  }
1994

1995
  pOperator->pPhyNode = pSessionNode;
91,904✔
1996
  pOperator->exprSupp.hasWindowOrGroup = true;
91,904✔
1997

1998
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
91,904✔
1999
  initResultSizeInfo(&pOperator->resultInfo, 4096);
91,904✔
2000

2001
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc);
91,904✔
2002
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
91,904!
2003
  initBasicInfo(&pInfo->binfo, pResBlock);
91,904✔
2004

2005
  int32_t    numOfCols = 0;
91,904✔
2006
  SExprInfo* pExprInfo = NULL;
91,904✔
2007
  code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
91,904✔
2008
  QUERY_CHECK_CODE(code, lino, _error);
91,905!
2009

2010
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
91,905✔
2011
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
91,905✔
2012
  QUERY_CHECK_CODE(code, lino, _error);
91,905!
2013

2014
  pInfo->twAggSup.waterMark = pSessionNode->window.watermark;
91,905✔
2015
  pInfo->twAggSup.calTrigger = pSessionNode->window.triggerType;
91,905✔
2016
  pInfo->gap = pSessionNode->gap;
91,905✔
2017

2018
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
91,905✔
2019
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
91,905✔
2020
  QUERY_CHECK_CODE(code, lino, _error);
91,905!
2021

2022
  pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
91,905✔
2023
  pInfo->binfo.pRes = pResBlock;
91,905✔
2024
  pInfo->winSup.prevTs = INT64_MIN;
91,905✔
2025
  pInfo->reptScan = false;
91,905✔
2026
  pInfo->binfo.inputTsOrder = pSessionNode->window.node.inputTsOrder;
91,905✔
2027
  pInfo->binfo.outputTsOrder = pSessionNode->window.node.outputTsOrder;
91,905✔
2028

2029
  if (pSessionNode->window.pExprs != NULL) {
91,905✔
2030
    int32_t    numOfScalar = 0;
1✔
2031
    SExprInfo* pScalarExprInfo = NULL;
1✔
2032
    code = createExprInfo(pSessionNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
1✔
2033
    QUERY_CHECK_CODE(code, lino, _error);
1!
2034

2035
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
1✔
2036
    QUERY_CHECK_CODE(code, lino, _error);
1!
2037
  }
2038

2039
  code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
91,905✔
2040
                            pTaskInfo->pStreamRuntimeInfo);
91,905✔
2041
  QUERY_CHECK_CODE(code, lino, _error);
91,905!
2042

2043
  pInfo->pOperator = pOperator;
91,905✔
2044
  pInfo->cleanGroupResInfo = false;
91,905✔
2045
  setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED,
91,905✔
2046
                  pInfo, pTaskInfo);
2047
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSessionWindowAggNext, NULL, destroySWindowOperatorInfo,
91,905✔
2048
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2049
  pOperator->pTaskInfo = pTaskInfo;
91,905✔
2050
  setOperatorResetStateFn(pOperator, resetSessionAggOperState);
91,905✔
2051

2052
  code = appendDownstream(pOperator, &downstream, 1);
91,905✔
2053
  QUERY_CHECK_CODE(code, lino, _error);
91,905!
2054

2055
  *pOptrInfo = pOperator;
91,905✔
2056
  return TSDB_CODE_SUCCESS;
91,905✔
2057

2058
_error:
×
2059
  if (pInfo != NULL) destroySWindowOperatorInfo(pInfo);
×
2060
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2061
  pTaskInfo->code = code;
×
2062
  return code;
×
2063
}
2064

2065
void destroyMAIOperatorInfo(void* param) {
540,913✔
2066
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = (SMergeAlignedIntervalAggOperatorInfo*)param;
540,913✔
2067
  destroyIntervalOperatorInfo(miaInfo->intervalAggOperatorInfo);
540,913✔
2068
  taosMemoryFreeClear(param);
540,912!
2069
}
540,915✔
2070

2071
static SResultRow* doSetSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, SAggSupporter* pSup) {
345,312✔
2072
  SResultRow* pResult = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
345,312✔
2073
  if (NULL == pResult) {
345,314!
2074
    return pResult;
×
2075
  }
2076
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
345,314✔
2077
  return pResult;
345,314✔
2078
}
2079

2080
static int32_t setSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult,
57,668,034✔
2081
                                       SExprSupp* pExprSup, SAggSupporter* pAggSup) {
2082
  if (*pResult == NULL) {
57,668,034✔
2083
    *pResult = doSetSingleOutputTupleBuf(pResultRowInfo, pAggSup);
345,314✔
2084
    if (*pResult == NULL) {
345,314!
2085
      return terrno;
×
2086
    }
2087
  }
2088

2089
  // set time window for current result
2090
  (*pResult)->win = (*win);
57,668,034✔
2091
  return setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
57,668,034✔
2092
}
2093

2094
static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,
388,978✔
2095
                                          SSDataBlock* pBlock, SSDataBlock* pResultBlock) {
2096
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
388,978✔
2097
  SIntervalAggOperatorInfo*             iaInfo = miaInfo->intervalAggOperatorInfo;
388,978✔
2098

2099
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
388,978✔
2100
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
388,978✔
2101
  SInterval*     pInterval = &iaInfo->interval;
388,978✔
2102

2103
  int32_t  startPos = 0;
388,978✔
2104
  int64_t* tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
388,978✔
2105

2106
  TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
388,976✔
2107

2108
  // there is an result exists
2109
  if (miaInfo->curTs != INT64_MIN) {
388,977✔
2110
    if (ts != miaInfo->curTs) {
40,033✔
2111
      finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
22,790✔
2112
      resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
22,790✔
2113
      miaInfo->curTs = ts;
22,790✔
2114
    }
2115
  } else {
2116
    miaInfo->curTs = ts;
348,944✔
2117
  }
2118

2119
  STimeWindow win = {0};
388,977✔
2120
  win.skey = miaInfo->curTs;
388,977✔
2121
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
388,977✔
2122

2123
  int32_t ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
388,980✔
2124
  if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
388,980!
2125
    T_LONG_JMP(pTaskInfo->env, ret);
×
2126
  }
2127

2128
  int32_t currPos = startPos;
388,980✔
2129

2130
  STimeWindow currWin = win;
388,980✔
2131
  while (++currPos < pBlock->info.rows) {
87,109,688✔
2132
    if (tsCols[currPos] == miaInfo->curTs) {
86,720,311✔
2133
      continue;
29,444,872✔
2134
    }
2135

2136
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
57,275,439✔
2137
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
57,272,871✔
2138
                                          currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
57,272,871✔
2139
    if (ret != TSDB_CODE_SUCCESS) {
57,290,461!
2140
      T_LONG_JMP(pTaskInfo->env, ret);
×
2141
    }
2142

2143
    finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
57,290,461✔
2144
    resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
57,186,314✔
2145
    miaInfo->curTs = tsCols[currPos];
57,246,380✔
2146

2147
    currWin.skey = miaInfo->curTs;
57,246,380✔
2148
    currWin.ekey =
57,266,530✔
2149
        taosTimeAdd(currWin.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
57,246,380✔
2150

2151
    startPos = currPos;
57,266,530✔
2152
    ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
57,266,530✔
2153
    if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
57,275,414!
2154
      T_LONG_JMP(pTaskInfo->env, ret);
×
2155
    }
2156

2157
    miaInfo->curTs = currWin.skey;
57,275,836✔
2158
  }
2159

2160
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
389,377✔
2161
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
388,979✔
2162
                                        currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
388,979✔
2163
  if (ret != TSDB_CODE_SUCCESS) {
388,978!
2164
    T_LONG_JMP(pTaskInfo->env, ret);
×
2165
  }
2166
}
388,978✔
2167

2168
static void cleanupAfterGroupResultGen(SMergeAlignedIntervalAggOperatorInfo* pMiaInfo, SSDataBlock* pRes) {
348,559✔
2169
  pRes->info.id.groupId = pMiaInfo->groupId;
348,559✔
2170
  pMiaInfo->curTs = INT64_MIN;
348,559✔
2171
  pMiaInfo->groupId = 0;
348,559✔
2172
}
348,559✔
2173

2174
static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
599,012✔
2175
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
599,012✔
2176
  int32_t                               code = TSDB_CODE_SUCCESS;
599,012✔
2177
  int32_t                               lino = 0;
599,012✔
2178
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
599,012✔
2179
  SIntervalAggOperatorInfo*             pIaInfo = pMiaInfo->intervalAggOperatorInfo;
599,012✔
2180

2181
  SExprSupp*      pSup = &pOperator->exprSupp;
599,012✔
2182
  SSDataBlock*    pRes = pIaInfo->binfo.pRes;
599,012✔
2183
  SResultRowInfo* pResultRowInfo = &pIaInfo->binfo.resultRowInfo;
599,012✔
2184
  SOperatorInfo*  downstream = pOperator->pDownstream[0];
599,012✔
2185

2186
  while (1) {
334,090✔
2187
    SSDataBlock* pBlock = NULL;
933,102✔
2188
    if (pMiaInfo->prefetchedBlock == NULL) {
933,102✔
2189
      pBlock = getNextBlockFromDownstream(pOperator, 0);
929,471✔
2190
    } else {
2191
      pBlock = pMiaInfo->prefetchedBlock;
3,631✔
2192
      pMiaInfo->prefetchedBlock = NULL;
3,631✔
2193

2194
      pMiaInfo->groupId = pBlock->info.id.groupId;
3,631✔
2195
    }
2196

2197
    // no data exists, all query processing is done
2198
    if (pBlock == NULL) {
933,100✔
2199
      // close last unclosed time window
2200
      if (pMiaInfo->curTs != INT64_MIN) {
540,500✔
2201
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
344,934✔
2202
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
344,927✔
2203
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
344,929✔
2204
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
344,929✔
2205
        QUERY_CHECK_CODE(code, lino, _end);
344,928!
2206
      }
2207

2208
      setOperatorCompleted(pOperator);
540,494✔
2209
      break;
540,496✔
2210
    }
2211

2212
    if (pMiaInfo->groupId == 0) {
392,600✔
2213
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
384,748✔
2214
        pMiaInfo->groupId = pBlock->info.id.groupId;
494✔
2215
        pRes->info.id.groupId = pMiaInfo->groupId;
494✔
2216
      }
2217
    } else {
2218
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
7,852✔
2219
        // if there are unclosed time window, close it firstly.
2220
        if (pMiaInfo->curTs == INT64_MIN) {
3,631!
2221
          pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2222
          T_LONG_JMP(pTaskInfo->env, terrno);
×
2223
        }
2224
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
3,631✔
2225
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
3,631✔
2226

2227
        pMiaInfo->prefetchedBlock = pBlock;
3,631✔
2228
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
3,631✔
2229
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
3,631✔
2230
        QUERY_CHECK_CODE(code, lino, _end);
3,631!
2231
        if (pRes->info.rows == 0) {
3,631✔
2232
          // After filtering for last group, the result is empty, so we need to continue to process next group
2233
          continue;
50✔
2234
        } else {
2235
          break;
3,581✔
2236
        }
2237
      } else {
2238
        // continue
2239
        pRes->info.id.groupId = pMiaInfo->groupId;
4,221✔
2240
      }
2241
    }
2242

2243
    pRes->info.scanFlag = pBlock->info.scanFlag;
388,969✔
2244
    code = setInputDataBlock(pSup, pBlock, pIaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
388,969✔
2245
    QUERY_CHECK_CODE(code, lino, _end);
388,979!
2246

2247
    doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes);
388,979✔
2248

2249
    code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
388,978✔
2250
    QUERY_CHECK_CODE(code, lino, _end);
388,986!
2251

2252
    if (pRes->info.rows >= pOperator->resultInfo.capacity) {
388,986✔
2253
      break;
54,946✔
2254
    }
2255
  }
2256

2257
_end:
599,023✔
2258
  if (code != TSDB_CODE_SUCCESS) {
599,023!
2259
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2260
    pTaskInfo->code = code;
×
2261
    T_LONG_JMP(pTaskInfo->env, code);
×
2262
  }
2263
}
599,023✔
2264

2265
static int32_t mergeAlignedIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,224,649✔
2266
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
1,224,649✔
2267
  int32_t                               code = TSDB_CODE_SUCCESS;
1,224,649✔
2268
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
1,224,649✔
2269
  SIntervalAggOperatorInfo*             iaInfo = pMiaInfo->intervalAggOperatorInfo;
1,224,649✔
2270
  if (pOperator->status == OP_EXEC_DONE) {
1,224,649✔
2271
    (*ppRes) = NULL;
626,693✔
2272
    return code;
626,693✔
2273
  }
2274

2275
  SSDataBlock* pRes = iaInfo->binfo.pRes;
597,956✔
2276
  blockDataCleanup(pRes);
597,956✔
2277

2278
  if (iaInfo->binfo.mergeResultBlock) {
597,974✔
2279
    while (1) {
2280
      if (pOperator->status == OP_EXEC_DONE) {
504,264✔
2281
        break;
204,486✔
2282
      }
2283

2284
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
299,778✔
2285
        break;
47,126✔
2286
      }
2287

2288
      doMergeAlignedIntervalAgg(pOperator);
252,652✔
2289
    }
2290
  } else {
2291
    doMergeAlignedIntervalAgg(pOperator);
346,364✔
2292
  }
2293

2294
  size_t rows = pRes->info.rows;
597,983✔
2295
  pOperator->resultInfo.totalRows += rows;
597,983✔
2296
  (*ppRes) = (rows == 0) ? NULL : pRes;
597,983✔
2297
  return code;
597,983✔
2298
}
2299

2300
static int32_t resetMergeAlignedIntervalOperState(SOperatorInfo* pOper) {
×
2301
  SMergeAlignedIntervalAggOperatorInfo* pInfo = pOper->info;
×
2302
  
2303
  uint64_t     groupId;  // current groupId
2304
  int64_t      curTs;    // current ts
2305
  SSDataBlock* prefetchedBlock;
2306
  SResultRow*  pResultRow;
2307

2308
  pInfo->groupId = 0;
×
2309
  pInfo->curTs = INT64_MIN;
×
2310
  pInfo->prefetchedBlock = NULL;
×
2311
  pInfo->pResultRow = NULL;
×
2312

2313
  return resetInterval(pOper, pInfo->intervalAggOperatorInfo);
×
2314
}
2315

2316
int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
540,905✔
2317
                                               SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2318
  QRY_PARAM_CHECK(pOptrInfo);
540,905!
2319

2320
  int32_t                               code = TSDB_CODE_SUCCESS;
540,905✔
2321
  int32_t                               lino = 0;
540,905✔
2322
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedIntervalAggOperatorInfo));
540,905!
2323
  SOperatorInfo*                        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
540,914!
2324
  if (miaInfo == NULL || pOperator == NULL) {
540,903!
2325
    code = terrno;
×
2326
    goto _error;
×
2327
  }
2328

2329
  pOperator->pPhyNode = pNode;
540,903✔
2330
  miaInfo->intervalAggOperatorInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
540,903!
2331
  if (miaInfo->intervalAggOperatorInfo == NULL) {
540,904!
2332
    code = terrno;
×
2333
    goto _error;
×
2334
  }
2335

2336
  SInterval interval = {.interval = pNode->interval,
540,904✔
2337
                        .sliding = pNode->sliding,
540,904✔
2338
                        .intervalUnit = pNode->intervalUnit,
540,904✔
2339
                        .slidingUnit = pNode->slidingUnit,
540,904✔
2340
                        .offset = pNode->offset,
540,904✔
2341
                        .precision = ((SColumnNode*)pNode->window.pTspk)->node.resType.precision,
540,904✔
2342
                        .timeRange = pNode->timeRange};
2343
  calcIntervalAutoOffset(&interval);
540,904✔
2344

2345
  SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
540,908✔
2346
  SExprSupp*                pSup = &pOperator->exprSupp;
540,908✔
2347
  pSup->hasWindowOrGroup = true;
540,908✔
2348

2349
  code = filterInitFromNode((SNode*)pNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
540,908✔
2350
                            pTaskInfo->pStreamRuntimeInfo);
540,908✔
2351
  QUERY_CHECK_CODE(code, lino, _error);
540,902!
2352

2353
  miaInfo->curTs = INT64_MIN;
540,902✔
2354
  iaInfo->win = pTaskInfo->window;
540,902✔
2355
  iaInfo->binfo.inputTsOrder = pNode->window.node.inputTsOrder;
540,902✔
2356
  iaInfo->binfo.outputTsOrder = pNode->window.node.outputTsOrder;
540,902✔
2357
  iaInfo->interval = interval;
540,902✔
2358
  iaInfo->primaryTsIndex = ((SColumnNode*)pNode->window.pTspk)->slotId;
540,902✔
2359
  iaInfo->binfo.mergeResultBlock = pNode->window.mergeDataBlock;
540,902✔
2360

2361
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
540,902✔
2362
  initResultSizeInfo(&pOperator->resultInfo, 512);
540,902✔
2363

2364
  int32_t    num = 0;
540,907✔
2365
  SExprInfo* pExprInfo = NULL;
540,907✔
2366
  code = createExprInfo(pNode->window.pFuncs, NULL, &pExprInfo, &num);
540,907✔
2367
  QUERY_CHECK_CODE(code, lino, _error);
540,913!
2368

2369
  code = initAggSup(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
540,913✔
2370
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
540,913✔
2371
  QUERY_CHECK_CODE(code, lino, _error);
540,911!
2372

2373
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pNode->window.node.pOutputDataBlockDesc);
540,911✔
2374
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
540,903!
2375
  initBasicInfo(&iaInfo->binfo, pResBlock);
540,903✔
2376
  code = initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
540,904✔
2377
  QUERY_CHECK_CODE(code, lino, _error);
540,911!
2378

2379
  iaInfo->timeWindowInterpo = false;
540,911✔
2380
  code = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo, &iaInfo->timeWindowInterpo);
540,911✔
2381
  QUERY_CHECK_CODE(code, lino, _error);
540,901!
2382
  if (iaInfo->timeWindowInterpo) {
540,901!
2383
    iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
2384
  }
2385

2386
  initResultRowInfo(&iaInfo->binfo.resultRowInfo);
540,901✔
2387
  code = blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity);
540,891✔
2388
  QUERY_CHECK_CODE(code, lino, _error);
540,912!
2389
  iaInfo->pOperator = pOperator;
540,912✔
2390
  setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL,
540,912✔
2391
                  false, OP_NOT_OPENED, miaInfo, pTaskInfo);
2392

2393
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAggNext, NULL, destroyMAIOperatorInfo,
540,912✔
2394
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2395
  setOperatorResetStateFn(pOperator, resetMergeAlignedIntervalOperState);
540,913✔
2396

2397
  code = appendDownstream(pOperator, &downstream, 1);
540,913✔
2398
  QUERY_CHECK_CODE(code, lino, _error);
540,913!
2399

2400
  *pOptrInfo = pOperator;
540,913✔
2401
  return TSDB_CODE_SUCCESS;
540,913✔
2402

2403
_error:
×
2404
  if (miaInfo != NULL) destroyMAIOperatorInfo(miaInfo);
×
2405
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2406
  pTaskInfo->code = code;
×
2407
  return code;
×
2408
}
2409

2410
//=====================================================================================================================
2411
// merge interval operator
2412
typedef struct SMergeIntervalAggOperatorInfo {
2413
  SIntervalAggOperatorInfo intervalAggOperatorInfo;
2414
  SList*                   groupIntervals;
2415
  SListIter                groupIntervalsIter;
2416
  bool                     hasGroupId;
2417
  uint64_t                 groupId;
2418
  SSDataBlock*             prefetchedBlock;
2419
  bool                     inputBlocksFinished;
2420
} SMergeIntervalAggOperatorInfo;
2421

2422
typedef struct SGroupTimeWindow {
2423
  uint64_t    groupId;
2424
  STimeWindow window;
2425
} SGroupTimeWindow;
2426

2427
void destroyMergeIntervalOperatorInfo(void* param) {
×
2428
  SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
×
2429
  miaInfo->groupIntervals = tdListFree(miaInfo->groupIntervals);
×
2430
  destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo);
×
2431

2432
  taosMemoryFreeClear(param);
×
2433
}
×
2434

2435
static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock,
×
2436
                                        STimeWindow* newWin) {
2437
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
2438
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2439
  bool                           ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
2440

2441
  SGroupTimeWindow groupTimeWindow = {.groupId = tableGroupId, .window = *newWin};
×
2442
  int32_t          code = tdListAppend(miaInfo->groupIntervals, &groupTimeWindow);
×
2443
  if (code != TSDB_CODE_SUCCESS) {
×
2444
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2445
    return code;
×
2446
  }
2447

2448
  SListIter iter = {0};
×
2449
  tdListInitIter(miaInfo->groupIntervals, &iter, TD_LIST_FORWARD);
×
2450
  SListNode* listNode = NULL;
×
2451
  while ((listNode = tdListNext(&iter)) != NULL) {
×
2452
    SGroupTimeWindow* prevGrpWin = (SGroupTimeWindow*)listNode->data;
×
2453
    if (prevGrpWin->groupId != tableGroupId) {
×
2454
      continue;
×
2455
    }
2456

2457
    STimeWindow* prevWin = &prevGrpWin->window;
×
2458
    if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) {
×
2459
      SListNode* tmp = tdListPopNode(miaInfo->groupIntervals, listNode);
×
2460
      taosMemoryFreeClear(tmp);
×
2461
    }
2462
  }
2463

2464
  return TSDB_CODE_SUCCESS;
×
2465
}
2466

2467
static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
×
2468
                                   int32_t scanFlag, SSDataBlock* pResultBlock) {
2469
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
2470
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2471

2472
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
×
2473
  SExprSupp*     pExprSup = &pOperatorInfo->exprSupp;
×
2474

2475
  int32_t     startPos = 0;
×
2476
  int32_t     numOfOutput = pExprSup->numOfExprs;
×
2477
  int64_t*    tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
×
2478
  uint64_t    tableGroupId = pBlock->info.id.groupId;
×
2479
  bool        ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
2480
  TSKEY       blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
×
2481
  SResultRow* pResult = NULL;
×
2482

2483
  STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
×
2484
                                        iaInfo->binfo.inputTsOrder);
2485

2486
  int32_t ret =
2487
      setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
2488
                             numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2489
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
×
2490
    T_LONG_JMP(pTaskInfo->env, ret);
×
2491
  }
2492

2493
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
×
2494
  int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
2495
                                                 iaInfo->binfo.inputTsOrder);
2496
  if (forwardRows <= 0) {
×
2497
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2498
  }
2499

2500
  // prev time window not interpolation yet.
2501
  if (iaInfo->timeWindowInterpo) {
×
2502
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
×
2503
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
×
2504

2505
    // restore current time window
2506
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
2507
                                 numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2508
    if (ret != TSDB_CODE_SUCCESS) {
×
2509
      T_LONG_JMP(pTaskInfo->env, ret);
×
2510
    }
2511

2512
    // window start key interpolation
2513
    ret = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &win, startPos, forwardRows, pExprSup);
×
2514
    if (ret != TSDB_CODE_SUCCESS) {
×
2515
      T_LONG_JMP(pTaskInfo->env, ret);
×
2516
    }
2517
  }
2518

2519
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, 1);
×
2520
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
×
2521
                                        forwardRows, pBlock->info.rows, numOfOutput);
×
2522
  if (ret != TSDB_CODE_SUCCESS) {
×
2523
    T_LONG_JMP(pTaskInfo->env, ret);
×
2524
  }
2525
  doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
2526

2527
  // output previous interval results after this interval (&win) is closed
2528
  int32_t code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &win);
×
2529
  if (code != TSDB_CODE_SUCCESS) {
×
2530
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2531
    T_LONG_JMP(pTaskInfo->env, code);
×
2532
  }
2533

2534
  STimeWindow nextWin = win;
×
2535
  while (1) {
×
2536
    int32_t prevEndPos = forwardRows - 1 + startPos;
×
2537
    startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos,
×
2538
                                      iaInfo->binfo.inputTsOrder);
2539
    if (startPos < 0) {
×
2540
      break;
×
2541
    }
2542

2543
    // null data, failed to allocate more memory buffer
2544
    code =
2545
        setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
×
2546
                               pExprSup->pCtx, numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2547
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
×
2548
      T_LONG_JMP(pTaskInfo->env, code);
×
2549
    }
2550

2551
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
×
2552
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
2553
                                           iaInfo->binfo.inputTsOrder);
2554

2555
    // window start(end) key interpolation
2556
    code = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup);
×
2557
    if (code != TSDB_CODE_SUCCESS) {
×
2558
      T_LONG_JMP(pTaskInfo->env, code);
×
2559
    }
2560

2561
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, 1);
×
2562
    code = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
×
2563
                                           forwardRows, pBlock->info.rows, numOfOutput);
×
2564
    if (code != TSDB_CODE_SUCCESS) {
×
2565
      T_LONG_JMP(pTaskInfo->env, code);
×
2566
    }
2567
    doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
2568

2569
    // output previous interval results after this interval (&nextWin) is closed
2570
    code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &nextWin);
×
2571
    if (code != TSDB_CODE_SUCCESS) {
×
2572
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2573
      T_LONG_JMP(pTaskInfo->env, code);
×
2574
    }
2575
  }
2576

2577
  if (iaInfo->timeWindowInterpo) {
×
2578
    saveDataBlockLastRow(iaInfo->pPrevValues, pBlock, iaInfo->pInterpCols);
×
2579
  }
2580
}
×
2581

2582
static int32_t doMergeIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
2583
  int32_t        code = TSDB_CODE_SUCCESS;
×
2584
  int32_t        lino = 0;
×
2585
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
×
2586

2587
  SMergeIntervalAggOperatorInfo* miaInfo = pOperator->info;
×
2588
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2589
  SExprSupp*                     pExpSupp = &pOperator->exprSupp;
×
2590

2591
  if (pOperator->status == OP_EXEC_DONE) {
×
2592
    (*ppRes) = NULL;
×
2593
    return code;
×
2594
  }
2595

2596
  SSDataBlock* pRes = iaInfo->binfo.pRes;
×
2597
  blockDataCleanup(pRes);
×
2598
  code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
×
2599
  QUERY_CHECK_CODE(code, lino, _end);
×
2600

2601
  if (!miaInfo->inputBlocksFinished) {
×
2602
    SOperatorInfo* downstream = pOperator->pDownstream[0];
×
2603
    while (1) {
×
2604
      SSDataBlock* pBlock = NULL;
×
2605
      if (miaInfo->prefetchedBlock == NULL) {
×
2606
        pBlock = getNextBlockFromDownstream(pOperator, 0);
×
2607
      } else {
2608
        pBlock = miaInfo->prefetchedBlock;
×
2609
        miaInfo->groupId = pBlock->info.id.groupId;
×
2610
        miaInfo->prefetchedBlock = NULL;
×
2611
      }
2612

2613
      if (pBlock == NULL) {
×
2614
        tdListInitIter(miaInfo->groupIntervals, &miaInfo->groupIntervalsIter, TD_LIST_FORWARD);
×
2615
        miaInfo->inputBlocksFinished = true;
×
2616
        break;
×
2617
      }
2618

2619
      if (!miaInfo->hasGroupId) {
×
2620
        miaInfo->hasGroupId = true;
×
2621
        miaInfo->groupId = pBlock->info.id.groupId;
×
2622
      } else if (miaInfo->groupId != pBlock->info.id.groupId) {
×
2623
        miaInfo->prefetchedBlock = pBlock;
×
2624
        break;
×
2625
      }
2626

2627
      pRes->info.scanFlag = pBlock->info.scanFlag;
×
2628
      code = setInputDataBlock(pExpSupp, pBlock, iaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
×
2629
      QUERY_CHECK_CODE(code, lino, _end);
×
2630

2631
      doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, pBlock->info.scanFlag, pRes);
×
2632

2633
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
×
2634
        break;
×
2635
      }
2636
    }
2637

2638
    pRes->info.id.groupId = miaInfo->groupId;
×
2639
  }
2640

2641
  if (miaInfo->inputBlocksFinished) {
×
2642
    SListNode* listNode = tdListNext(&miaInfo->groupIntervalsIter);
×
2643

2644
    if (listNode != NULL) {
×
2645
      SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data);
×
2646
      pRes->info.id.groupId = grpWin->groupId;
×
2647
    }
2648
  }
2649

2650
  if (pRes->info.rows == 0) {
×
2651
    setOperatorCompleted(pOperator);
×
2652
  }
2653

2654
  size_t rows = pRes->info.rows;
×
2655
  pOperator->resultInfo.totalRows += rows;
×
2656

2657
_end:
×
2658
  if (code != TSDB_CODE_SUCCESS) {
×
2659
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2660
    pTaskInfo->code = code;
×
2661
    T_LONG_JMP(pTaskInfo->env, code);
×
2662
  }
2663
  (*ppRes) = (rows == 0) ? NULL : pRes;
×
2664
  return code;
×
2665
}
2666

2667
static int32_t resetMergeIntervalOperState(SOperatorInfo* pOper) {
×
2668
  SMergeIntervalAggOperatorInfo* pInfo = pOper->info;
×
2669

2670
  pInfo->hasGroupId = false;
×
2671
  pInfo->groupId = 0;
×
2672
  pInfo->prefetchedBlock = NULL;
×
2673
  pInfo->inputBlocksFinished = false;
×
2674
  tdListEmpty(pInfo->groupIntervals);
×
2675
  
2676
  SIntervalAggOperatorInfo* pIntervalInfo = &pInfo->intervalAggOperatorInfo;
×
2677
  return resetInterval(pOper, pIntervalInfo);
×
2678
}
2679

2680
int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
×
2681
                                        SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2682
  QRY_PARAM_CHECK(pOptrInfo);
×
2683

2684
  int32_t                        code = TSDB_CODE_SUCCESS;
×
2685
  int32_t                        lino = 0;
×
2686
  SMergeIntervalAggOperatorInfo* pMergeIntervalInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo));
×
2687
  SOperatorInfo*                 pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
2688
  if (pMergeIntervalInfo == NULL || pOperator == NULL) {
×
2689
    code = terrno;
×
2690
    goto _error;
×
2691
  }
2692

2693
  pOperator->pPhyNode = pIntervalPhyNode;
×
2694
  SInterval interval = {.interval = pIntervalPhyNode->interval,
×
2695
                        .sliding = pIntervalPhyNode->sliding,
×
2696
                        .intervalUnit = pIntervalPhyNode->intervalUnit,
×
2697
                        .slidingUnit = pIntervalPhyNode->slidingUnit,
×
2698
                        .offset = pIntervalPhyNode->offset,
×
2699
                        .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision,
×
2700
                        .timeRange = pIntervalPhyNode->timeRange};
2701
  calcIntervalAutoOffset(&interval);
×
2702

2703
  pMergeIntervalInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow));
×
2704

2705
  SIntervalAggOperatorInfo* pIntervalInfo = &pMergeIntervalInfo->intervalAggOperatorInfo;
×
2706
  pIntervalInfo->win = pTaskInfo->window;
×
2707
  pIntervalInfo->binfo.inputTsOrder = pIntervalPhyNode->window.node.inputTsOrder;
×
2708
  pIntervalInfo->interval = interval;
×
2709
  pIntervalInfo->binfo.mergeResultBlock = pIntervalPhyNode->window.mergeDataBlock;
×
2710
  pIntervalInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
×
2711
  pIntervalInfo->binfo.outputTsOrder = pIntervalPhyNode->window.node.outputTsOrder;
×
2712

2713
  SExprSupp* pExprSupp = &pOperator->exprSupp;
×
2714
  pExprSupp->hasWindowOrGroup = true;
×
2715

2716
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
×
2717
  initResultSizeInfo(&pOperator->resultInfo, 4096);
×
2718

2719
  int32_t    num = 0;
×
2720
  SExprInfo* pExprInfo = NULL;
×
2721
  code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
×
2722
  QUERY_CHECK_CODE(code, lino, _error);
×
2723

2724
  code = initAggSup(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
×
2725
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
×
2726
  if (code != TSDB_CODE_SUCCESS) {
×
2727
    goto _error;
×
2728
  }
2729

2730
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pIntervalPhyNode->window.node.pOutputDataBlockDesc);
×
2731
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
×
2732
  initBasicInfo(&pIntervalInfo->binfo, pResBlock);
×
2733
  code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win);
×
2734
  QUERY_CHECK_CODE(code, lino, _error);
×
2735

2736
  pIntervalInfo->timeWindowInterpo = false;
×
2737
  code = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo, &pIntervalInfo->timeWindowInterpo);
×
2738
  QUERY_CHECK_CODE(code, lino, _error);
×
2739
  if (pIntervalInfo->timeWindowInterpo) {
×
2740
    pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
2741
    if (pIntervalInfo->binfo.resultRowInfo.openWindow == NULL) {
×
2742
      goto _error;
×
2743
    }
2744
  }
2745

2746
  pIntervalInfo->pOperator = pOperator;
×
2747
  initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo);
×
2748
  setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false,
×
2749
                  OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo);
2750
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAggNext, NULL, destroyMergeIntervalOperatorInfo,
×
2751
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2752
  setOperatorResetStateFn(pOperator, resetMergeIntervalOperState);
×
2753

2754
  code = appendDownstream(pOperator, &downstream, 1);
×
2755
  if (code != TSDB_CODE_SUCCESS) {
×
2756
    goto _error;
×
2757
  }
2758

2759
  *pOptrInfo = pOperator;
×
2760
  return TSDB_CODE_SUCCESS;
×
2761
_error:
×
2762
  if (pMergeIntervalInfo != NULL) {
×
2763
    destroyMergeIntervalOperatorInfo(pMergeIntervalInfo);
×
2764
  }
2765
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2766
  pTaskInfo->code = code;
×
2767
  return code;
×
2768
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc