• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4917

07 Jan 2026 03:52PM UTC coverage: 65.42% (+0.02%) from 65.402%
#4917

push

travis-ci

web-flow
merge: from main to 3.0 branch #34204

31 of 34 new or added lines in 2 files covered. (91.18%)

819 existing lines in 129 files now uncovered.

202679 of 309814 relevant lines covered (65.42%)

116724351.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.53
/source/libs/executor/src/timewindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "executorInt.h"
16
#include "filter.h"
17
#include "function.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "taoserror.h"
23
#include "tchecksum.h"
24
#include "tcommon.h"
25
#include "tcompare.h"
26
#include "tdatablock.h"
27
#include "tfill.h"
28
#include "tglobal.h"
29
#include "tlog.h"
30
#include "ttime.h"
31

32
typedef enum SResultTsInterpType {
33
  RESULT_ROW_START_INTERP = 1,
34
  RESULT_ROW_END_INTERP = 2,
35
} SResultTsInterpType;
36

37
typedef struct SOpenWindowInfo {
38
  SResultRowPosition pos;
39
  uint64_t           groupId;
40
} SOpenWindowInfo;
41

42
static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo);
43

44
static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult,
45
                                              uint64_t groupId, SExecTaskInfo* pTaskInfo);
46
static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult);
47

48
static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, bool masterscan,
2,147,483,647✔
49
                                      SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
50
                                      int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
51
                                      SExecTaskInfo* pTaskInfo) {
52
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
2,147,483,647✔
53
                                                  masterscan, tableGroupId, pTaskInfo, true, pAggSup, true);
54

55
  if (pResultRow == NULL || pTaskInfo->code != 0) {
2,147,483,647✔
56
    *pResult = NULL;
877,912✔
57
    qError("failed to set result output buffer, error:%s", tstrerror(pTaskInfo->code));
×
58
    return pTaskInfo->code;
×
59
  }
60

61
  // set time window for current result
62
  TAOS_SET_POBJ_ALIGNED(&pResultRow->win, win);
2,147,483,647✔
63
  *pResult = pResultRow;
2,147,483,647✔
64
  return setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
2,147,483,647✔
65
}
66

67
void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, int32_t rowIndex, uint64_t groupId) {
504,916,756✔
68
  pRowSup->win.ekey = ts;
504,916,756✔
69
  pRowSup->prevTs = ts;
504,916,756✔
70
  pRowSup->groupId = groupId;
504,916,388✔
71
  pRowSup->numOfRows += 1;
504,916,388✔
72
  if (hasContinuousNullRows(pRowSup)) {
504,916,307✔
73
    // rows having null state col are wrapped by rows of same state
74
    // these rows can be counted into current window
75
    pRowSup->numOfRows += pRowSup->numNullRows;
4,109,511✔
76
    resetNumNullRows(pRowSup);
4,109,511✔
77
  }
78
}
504,915,571✔
79

80
void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, uint64_t groupId) {
163,946,860✔
81
  pRowSup->startRowIndex = rowIndex;
163,946,860✔
82
  pRowSup->numOfRows = 0;
163,946,860✔
83
  pRowSup->win.skey = tsList[rowIndex];
163,946,860✔
84
  pRowSup->groupId = groupId;
163,946,492✔
85
  resetNumNullRows(pRowSup);
163,946,860✔
86
}
163,945,675✔
87

88
FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos,
×
89
                                            int32_t order, int64_t* pData) {
90
  int32_t forwardRows = 0;
2,050,817,797✔
91

92
  if (order == TSDB_ORDER_ASC) {
×
93
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
1,691,347,005✔
94
    if (end >= 0) {
1,681,190,604✔
95
      forwardRows = end;
1,677,248,504✔
96

97
      while (pData[end + pos] == ekey) {
1,683,224,482✔
98
        forwardRows += 1;
5,975,978✔
99
        ++pos;
5,975,978✔
100
      }
101
    }
102
  } else {
103
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
359,470,792✔
104
    if (end >= 0) {
358,976,410✔
105
      forwardRows = end;
359,069,909✔
106

107
      while (pData[end + pos] == ekey) {
713,624,782✔
108
        forwardRows += 1;
354,554,873✔
109
        ++pos;
354,554,873✔
110
      }
111
    }
112
    //    int32_t end = searchFn((char*)pData, pos + 1, ekey, order);
113
    //    if (end >= 0) {
114
    //      forwardRows = pos - end;
115
    //
116
    //      if (pData[end] == ekey) {
117
    //        forwardRows += 1;
118
    //      }
119
    //    }
120
  }
121

122
  return forwardRows;
2,039,957,009✔
123
}
124

125
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
2,128,694,377✔
126
  int32_t midPos = -1;
2,128,694,377✔
127
  int32_t numOfRows;
128

129
  if (num <= 0) {
2,128,694,377✔
130
    return -1;
×
131
  }
132

133
  TSKEY*  keyList = (TSKEY*)pValue;
2,128,694,377✔
134
  int32_t firstPos = 0;
2,128,694,377✔
135
  int32_t lastPos = num - 1;
2,128,694,377✔
136

137
  if (order == TSDB_ORDER_DESC) {
2,128,694,377✔
138
    // find the first position which is smaller than the key
139
    while (1) {
140
      if (key >= keyList[firstPos]) return firstPos;
415,064,200✔
141
      if (key == keyList[lastPos]) return lastPos;
62,510,732✔
142

143
      if (key < keyList[lastPos]) {
62,061,178✔
144
        lastPos += 1;
6,600,244✔
145
        if (lastPos >= num) {
6,600,244✔
146
          return -1;
×
147
        } else {
148
          return lastPos;
6,600,244✔
149
        }
150
      }
151

152
      numOfRows = lastPos - firstPos + 1;
55,460,934✔
153
      midPos = (numOfRows >> 1) + firstPos;
55,460,934✔
154

155
      if (key < keyList[midPos]) {
55,460,934✔
156
        firstPos = midPos + 1;
1,967,979✔
157
      } else if (key > keyList[midPos]) {
53,492,955✔
158
        lastPos = midPos - 1;
52,989,380✔
159
      } else {
160
        break;
503,575✔
161
      }
162
    }
163

164
  } else {
165
    // find the first position which is bigger than the key
166
    while (1) {
167
      if (key <= keyList[firstPos]) return firstPos;
2,147,483,647✔
168
      if (key == keyList[lastPos]) return lastPos;
2,147,483,647✔
169

170
      if (key > keyList[lastPos]) {
2,147,483,647✔
171
        lastPos = lastPos + 1;
1,597,749,724✔
172
        if (lastPos >= num)
1,597,749,724✔
173
          return -1;
962,650✔
174
        else
175
          return lastPos;
1,596,787,074✔
176
      }
177

178
      numOfRows = lastPos - firstPos + 1;
2,147,483,647✔
179
      midPos = (numOfRows >> 1u) + firstPos;
2,147,483,647✔
180

181
      if (key < keyList[midPos]) {
2,147,483,647✔
182
        lastPos = midPos - 1;
2,147,483,647✔
183
      } else if (key > keyList[midPos]) {
195,663,468✔
184
        firstPos = midPos + 1;
191,002,297✔
185
      } else {
186
        break;
4,669,208✔
187
      }
188
    }
189
  }
190

191
  return midPos;
5,172,783✔
192
}
193

194
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
2,061,105,928✔
195
                                 __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order) {
196
  int32_t num = -1;
2,061,105,928✔
197
  int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
2,061,105,928✔
198

199
  if (order == TSDB_ORDER_ASC) {
2,061,105,928✔
200
    if (ekey < pDataBlockInfo->window.ekey && pPrimaryColumn) {
1,704,150,527✔
201
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
1,691,320,019✔
202
      if (item != NULL) {
1,680,505,523✔
203
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
204
      }
205
    } else {
206
      num = pDataBlockInfo->rows - startPos;
12,048,585✔
207
      if (item != NULL) {
15,486,826✔
208
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
209
      }
210
    }
211
  } else {  // desc
212
    if (ekey > pDataBlockInfo->window.skey && pPrimaryColumn) {
356,955,401✔
213
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
359,656,369✔
214
      if (item != NULL) {
359,451,486✔
215
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
216
      }
217
    } else {
218
      num = pDataBlockInfo->rows - startPos;
192,217✔
219
      if (item != NULL) {
527,735✔
220
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
221
      }
222
    }
223
  }
224

225
  return num;
2,055,390,430✔
226
}
227

228
void doTimeWindowInterpolation(SArray* pPrevValues, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs,
9,816,620✔
229
                               int32_t curRowIndex, TSKEY windowKey, int32_t type, SExprSupp* pSup) {
230
  SqlFunctionCtx* pCtx = pSup->pCtx;
9,816,620✔
231

232
  int32_t index = 1;
9,816,620✔
233
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
29,455,328✔
234
    if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) {
19,638,708✔
235
      pCtx[k].start.key = INT64_MIN;
9,822,088✔
236
      continue;
9,822,088✔
237
    }
238

239
    SFunctParam*     pParam = &pCtx[k].param[0];
9,816,620✔
240
    SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, pParam->pCol->slotId);
9,816,620✔
241

242
    double v1 = 0, v2 = 0, v = 0;
9,816,620✔
243
    if (prevRowIndex == -1) {
9,816,620✔
244
      SGroupKeys* p = taosArrayGet(pPrevValues, index);
×
245
      GET_TYPED_DATA(v1, double, pColInfo->info.type, p->pData, typeGetTypeModFromColInfo(&pColInfo->info));
×
246
    } else {
247
      GET_TYPED_DATA(v1, double, pColInfo->info.type, colDataGetData(pColInfo, prevRowIndex),
9,816,620✔
248
                     typeGetTypeModFromColInfo(&pColInfo->info));
249
    }
250

251
    GET_TYPED_DATA(v2, double, pColInfo->info.type, colDataGetData(pColInfo, curRowIndex),
9,816,620✔
252
                   typeGetTypeModFromColInfo(&pColInfo->info));
253

254
#if 0
255
    if (functionId == FUNCTION_INTERP) {
256
      if (type == RESULT_ROW_START_INTERP) {
257
        pCtx[k].start.key = prevTs;
258
        pCtx[k].start.val = v1;
259

260
        pCtx[k].end.key = curTs;
261
        pCtx[k].end.val = v2;
262

263
        if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_VARBINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR ||
264
            pColInfo->info.type == TSDB_DATA_TYPE_GEOMETRY) {
265
          if (prevRowIndex == -1) {
266
            //            pCtx[k].start.ptr = (char*)pRuntimeEnv->prevRow[index];
267
          } else {
268
            pCtx[k].start.ptr = (char*)pColInfo->pData + prevRowIndex * pColInfo->info.bytes;
269
          }
270

271
          pCtx[k].end.ptr = (char*)pColInfo->pData + curRowIndex * pColInfo->info.bytes;
272
        }
273
      }
274
    } else if (functionId == FUNCTION_TWA) {
275
#endif
276

277
    SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
9,816,620✔
278
    SPoint point2 = (SPoint){.key = curTs, .val = &v2};
9,816,620✔
279
    SPoint point = (SPoint){.key = windowKey, .val = &v};
9,816,620✔
280

281
    if (!fmIsElapsedFunc(pCtx[k].functionId)) {
9,816,620✔
282
      taosGetLinearInterpolationVal(&point, TSDB_DATA_TYPE_DOUBLE, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, 0);
9,758,474✔
283
    }
284

285
    if (type == RESULT_ROW_START_INTERP) {
9,816,620✔
286
      pCtx[k].start.key = point.key;
4,881,578✔
287
      pCtx[k].start.val = v;
4,881,578✔
288
    } else {
289
      pCtx[k].end.key = point.key;
4,935,042✔
290
      pCtx[k].end.val = v;
4,935,042✔
291
    }
292

293
    index += 1;
9,816,620✔
294
  }
295
#if 0
296
  }
297
#endif
298
}
9,816,620✔
299

300
static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t type) {
633,718✔
301
  if (type == RESULT_ROW_START_INTERP) {
633,718✔
302
    for (int32_t k = 0; k < numOfOutput; ++k) {
1,008,175✔
303
      pCtx[k].start.key = INT64_MIN;
664,584✔
304
    }
305
  } else {
306
    for (int32_t k = 0; k < numOfOutput; ++k) {
867,487✔
307
      pCtx[k].end.key = INT64_MIN;
577,360✔
308
    }
309
  }
310
}
633,718✔
311

312
static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, int32_t pos, SSDataBlock* pBlock,
5,225,169✔
313
                                              const TSKEY* tsCols, STimeWindow* win, SExprSupp* pSup) {
314
  bool ascQuery = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
5,225,169✔
315

316
  TSKEY curTs = tsCols[pos];
5,225,169✔
317

318
  SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
5,225,169✔
319
  TSKEY       lastTs = *(int64_t*)pTsKey->pData;
5,225,169✔
320

321
  // lastTs == INT64_MIN and pos == 0 means this is the first time window, interpolation is not needed.
322
  // start exactly from this point, no need to do interpolation
323
  TSKEY key = ascQuery ? win->skey : win->ekey;
5,225,169✔
324
  if (key == curTs) {
5,225,169✔
325
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
335,329✔
326
    return true;
335,329✔
327
  }
328

329
  // it is the first time window, no need to do interpolation
330
  if (pTsKey->isNull && pos == 0) {
4,889,840✔
331
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
8,262✔
332
  } else {
333
    TSKEY prevTs = ((pos == 0) ? lastTs : tsCols[pos - 1]);
4,881,578✔
334
    doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, pos - 1, curTs, pos, key,
4,881,578✔
335
                              RESULT_ROW_START_INTERP, pSup);
336
  }
337

338
  return true;
4,889,840✔
339
}
340

341
static int32_t setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex,
5,225,169✔
342
                                               int32_t nextRowIndex, SArray* pDataBlock, const TSKEY* tsCols,
343
                                               TSKEY blockEkey, STimeWindow* win, bool* pRes) {
344
  int32_t code = TSDB_CODE_SUCCESS;
5,225,169✔
345
  int32_t lino = 0;
5,225,169✔
346
  int32_t order = pInfo->binfo.inputTsOrder;
5,225,169✔
347

348
  TSKEY actualEndKey = tsCols[endRowIndex];
5,225,169✔
349
  TSKEY key = (order == TSDB_ORDER_ASC) ? win->ekey : win->skey;
5,225,169✔
350

351
  // not ended in current data block, do not invoke interpolation
352
  if ((key > blockEkey && (order == TSDB_ORDER_ASC)) || (key < blockEkey && (order == TSDB_ORDER_DESC))) {
5,225,169✔
353
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
12,257✔
354
    (*pRes) = false;
12,257✔
355
    return code;
12,257✔
356
  }
357

358
  // there is actual end point of current time window, no interpolation needs
359
  if (key == actualEndKey) {
5,212,912✔
360
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
277,870✔
361
    (*pRes) = true;
277,870✔
362
    return code;
277,870✔
363
  }
364

365
  if (nextRowIndex < 0) {
4,935,042✔
366
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
367
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
368
  }
369

370
  TSKEY nextKey = tsCols[nextRowIndex];
4,935,042✔
371
  doTimeWindowInterpolation(pInfo->pPrevValues, pDataBlock, actualEndKey, endRowIndex, nextKey, nextRowIndex, key,
4,935,042✔
372
                            RESULT_ROW_END_INTERP, pSup);
373
  (*pRes) = true;
4,935,042✔
374
  return code;
4,935,042✔
375
}
376

377
bool inCalSlidingWindow(const SInterval* pInterval, const STimeWindow* pWin, TSKEY calStart, TSKEY calEnd, EStreamType blockType) {
2,147,483,647✔
378
  if (pInterval->interval != pInterval->sliding &&
2,147,483,647✔
379
      ((pWin->ekey < calStart || pWin->skey > calEnd) || (blockType == STREAM_PULL_DATA && pWin->skey < calStart))) {
87,588,438✔
380
    return false;
×
381
  }
382

383
  return true;
2,147,483,647✔
384
}
385

386
bool inSlidingWindow(const SInterval* pInterval, const STimeWindow* pWin, const SDataBlockInfo* pBlockInfo) {
2,147,483,647✔
387
  return inCalSlidingWindow(pInterval, pWin, pBlockInfo->calWin.skey, pBlockInfo->calWin.ekey, pBlockInfo->type);
2,147,483,647✔
388
}
389

390
int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
2,147,483,647✔
391
                               TSKEY* primaryKeys, int32_t prevPosition, int32_t order) {
392
  bool ascQuery = (order != TSDB_ORDER_DESC);
2,147,483,647✔
393

394
  int32_t precision = pInterval->precision;
2,147,483,647✔
395
  getNextTimeWindow(pInterval, pNext, order);
2,147,483,647✔
396

397
  // next time window is not in current block
398
  if ((pNext->skey > pDataBlockInfo->window.ekey && order == TSDB_ORDER_ASC) ||
2,147,483,647✔
399
      (pNext->ekey < pDataBlockInfo->window.skey && order == TSDB_ORDER_DESC)) {
2,147,483,647✔
400
    return -1;
9,816,080✔
401
  }
402

403
  if (!inSlidingWindow(pInterval, pNext, pDataBlockInfo) && order == TSDB_ORDER_ASC) {
2,147,483,647✔
404
    return -1;
×
405
  }
406

407
  TSKEY   skey = ascQuery ? pNext->skey : pNext->ekey;
2,147,483,647✔
408
  int32_t startPos = 0;
2,147,483,647✔
409

410
  // tumbling time window query, a special case of sliding time window query
411
  if (pInterval->sliding == pInterval->interval && prevPosition != -1) {
2,147,483,647✔
412
    startPos = prevPosition + 1;
2,147,483,647✔
413
  } else {
414
    if ((skey <= pDataBlockInfo->window.skey && ascQuery) || (skey >= pDataBlockInfo->window.ekey && !ascQuery)) {
80,550,056✔
415
      startPos = 0;
6,130,133✔
416
    } else {
417
      startPos = binarySearchForKey((char*)primaryKeys, pDataBlockInfo->rows, skey, order);
81,458,774✔
418
    }
419
  }
420
  if(startPos < 0 || startPos >= pDataBlockInfo->rows) {
2,147,483,647✔
421
    return -1;
2,147,483,647✔
422
  }
423

424
  /* interp query with fill should not skip time window */
425
  //  if (pQueryAttr->pointInterpQuery && pQueryAttr->fillType != TSDB_FILL_NONE) {
426
  //    return startPos;
427
  //  }
428

429
  /*
430
   * This time window does not cover any data, try next time window,
431
   * this case may happen when the time window is too small
432
   */
433
  if (primaryKeys != NULL) {
2,125,746,436✔
434
    if (ascQuery && primaryKeys[startPos] > pNext->ekey) {
2,147,483,647✔
435
      TSKEY next = primaryKeys[startPos];
1,495,025,618✔
436
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
1,495,230,298✔
437
        pNext->skey = taosTimeTruncate(next, pInterval);
616,576✔
438
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
1,504✔
439
      } else {
440
        pNext->ekey += ((next - pNext->ekey + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
1,495,996,602✔
441
        pNext->skey = pNext->ekey - pInterval->interval + 1;
1,495,066,082✔
442
      }
443
    } else if ((!ascQuery) && primaryKeys[startPos] < pNext->skey) {
628,809,899✔
444
      TSKEY next = primaryKeys[startPos];
358,511,036✔
445
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
358,844,716✔
UNCOV
446
        pNext->skey = taosTimeTruncate(next, pInterval);
×
447
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
×
448
      } else {
449
        pNext->skey -= ((pNext->skey - next + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
358,892,532✔
450
        pNext->ekey = pNext->skey + pInterval->interval - 1;
358,795,783✔
451
      }
452
    }
453
  }
454

455
  return startPos;
2,122,077,156✔
456
}
457

458
static bool isResultRowInterpolated(SResultRow* pResult, SResultTsInterpType type) {
15,675,507✔
459
  if (type == RESULT_ROW_START_INTERP) {
15,675,507✔
460
    return pResult->startInterp == true;
5,225,169✔
461
  } else {
462
    return pResult->endInterp == true;
10,450,338✔
463
  }
464
}
465

466
static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
10,438,081✔
467
  if (type == RESULT_ROW_START_INTERP) {
10,438,081✔
468
    pResult->startInterp = true;
5,225,169✔
469
  } else {
470
    pResult->endInterp = true;
5,212,912✔
471
  }
472
}
10,438,081✔
473

474
static int32_t doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataBlock* pBlock, SResultRow* pResult,
2,147,483,647✔
475
                                           STimeWindow* win, int32_t startPos, int32_t forwardRows, SExprSupp* pSup) {
476
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
477
  int32_t lino = 0;
2,147,483,647✔
478
  if (!pInfo->timeWindowInterpo) {
2,147,483,647✔
479
    return code;
2,147,483,647✔
480
  }
481

482
  if (pBlock == NULL) {
3,577,682✔
483
    code = TSDB_CODE_INVALID_PARA;
×
484
    return code;
×
485
  }
486

487
  if (pBlock->pDataBlock == NULL) {
3,577,682✔
488
    return code;
×
489
  }
490

491
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
5,225,169✔
492

493
  TSKEY* tsCols = (TSKEY*)(pColInfo->pData);
5,225,169✔
494
  bool   done = isResultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
5,225,169✔
495
  if (!done) {  // it is not interpolated, now start to generated the interpolated value
5,225,169✔
496
    bool interp = setTimeWindowInterpolationStartTs(pInfo, startPos, pBlock, tsCols, win, pSup);
5,225,169✔
497
    if (interp) {
5,225,169✔
498
      setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
5,225,169✔
499
    }
500
  } else {
501
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
×
502
  }
503

504
  // point interpolation does not require the end key time window interpolation.
505
  // interpolation query does not generate the time window end interpolation
506
  done = isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
5,225,169✔
507
  if (!done) {
5,225,169✔
508
    int32_t endRowIndex = startPos + forwardRows - 1;
5,225,169✔
509
    int32_t nextRowIndex = endRowIndex + 1;
5,225,169✔
510

511
    // duplicated ts row does not involve in the interpolation of end value for current time window
512
    int32_t x = endRowIndex;
5,225,169✔
513
    while (x > 0) {
5,240,060✔
514
      if (tsCols[x] == tsCols[x - 1]) {
5,228,408✔
515
        x -= 1;
14,891✔
516
      } else {
517
        endRowIndex = x;
5,213,517✔
518
        break;
5,213,517✔
519
      }
520
    }
521

522
    TSKEY endKey = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey;
5,225,169✔
523
    bool  interp = false;
5,225,169✔
524
    code = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, nextRowIndex, pBlock->pDataBlock, tsCols, endKey,
5,225,169✔
525
                                           win, &interp);
526
    QUERY_CHECK_CODE(code, lino, _end);
5,225,169✔
527
    if (interp) {
5,225,169✔
528
      setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
5,212,912✔
529
    }
530
  } else {
531
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
×
532
  }
533

534
_end:
5,225,169✔
535
  if (code != TSDB_CODE_SUCCESS) {
5,225,169✔
536
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
537
  }
538
  return code;
5,225,169✔
539
}
540

541
static void saveDataBlockLastRow(SArray* pPrevKeys, const SSDataBlock* pBlock, SArray* pCols) {
16,427✔
542
  if (pBlock->pDataBlock == NULL) {
16,427✔
543
    return;
×
544
  }
545

546
  size_t num = taosArrayGetSize(pPrevKeys);
16,427✔
547
  for (int32_t k = 0; k < num; ++k) {
49,281✔
548
    SColumn* pc = taosArrayGet(pCols, k);
32,854✔
549

550
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pc->slotId);
32,854✔
551

552
    SGroupKeys* pkey = taosArrayGet(pPrevKeys, k);
32,854✔
553
    for (int32_t i = pBlock->info.rows - 1; i >= 0; --i) {
32,854✔
554
      if (colDataIsNull_s(pColInfo, i)) {
65,708✔
555
        continue;
×
556
      }
557

558
      char* val = colDataGetData(pColInfo, i);
32,854✔
559
      if (IS_VAR_DATA_TYPE(pkey->type)) {
32,854✔
560
        if (IS_STR_DATA_BLOB(pkey->type)) {
×
561
          memcpy(pkey->pData, val, blobDataTLen(val));
×
562
        } else {
563
          memcpy(pkey->pData, val, varDataTLen(val));
×
564
        }
565
      } else {
566
        memcpy(pkey->pData, val, pkey->bytes);
32,854✔
567
      }
568

569
      break;
32,854✔
570
    }
571
  }
572
}
573

574
static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t numOfExprs, SResultRowInfo* pResultRowInfo,
16,427✔
575
                                       SSDataBlock* pBlock, int32_t scanFlag, int64_t* tsCols, SResultRowPosition* p) {
576
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
16,427✔
577

578
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
16,427✔
579
  SExprSupp*                pSup = &pOperatorInfo->exprSupp;
16,427✔
580

581
  int32_t startPos = 0;
16,427✔
582
  int32_t numOfOutput = pSup->numOfExprs;
16,427✔
583

584
  SResultRow* pResult = NULL;
16,427✔
585

586
  while (1) {
×
587
    SListNode*          pn = tdListGetHead(pResultRowInfo->openWindow);
16,427✔
588
    SOpenWindowInfo*    pOpenWin = (SOpenWindowInfo*)pn->data;
16,427✔
589
    uint64_t            groupId = pOpenWin->groupId;
16,427✔
590
    SResultRowPosition* p1 = &pOpenWin->pos;
16,427✔
591
    if (p->pageId == p1->pageId && p->offset == p1->offset) {
16,427✔
592
      break;
16,427✔
593
    }
594

595
    SResultRow* pr = getResultRowByPos(pInfo->aggSup.pResultBuf, p1, false);
×
596
    if (NULL == pr) {
×
597
      T_LONG_JMP(pTaskInfo->env, terrno);
×
598
    }
599

600
    if (!(pr->offset == p1->offset && pr->pageId == p1->pageId)) {
×
601
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
602
      T_LONG_JMP(pTaskInfo->env, terrno);
×
603
    }
604

605
    if (pr->closed) {
×
606
      if (!(isResultRowInterpolated(pr, RESULT_ROW_START_INTERP) &&
×
607
            isResultRowInterpolated(pr, RESULT_ROW_END_INTERP))) {
×
608
        pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
609
        T_LONG_JMP(pTaskInfo->env, terrno);
×
610
      }
611
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
×
612
      taosMemoryFree(pNode);
×
613
      continue;
×
614
    }
615

616
    STimeWindow w = pr->win;
×
617
    int32_t     ret = setTimeWindowOutputBuf(pResultRowInfo, &w, (scanFlag == MAIN_SCAN), &pResult, groupId, pSup->pCtx,
×
618
                                             numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
619
    if (ret != TSDB_CODE_SUCCESS) {
×
620
      T_LONG_JMP(pTaskInfo->env, ret);
×
621
    }
622

623
    if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
×
624
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
625
      T_LONG_JMP(pTaskInfo->env, terrno);
×
626
    }
627

628
    SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
×
629
    if (!pTsKey) {
×
630
      pTaskInfo->code = terrno;
×
631
      T_LONG_JMP(pTaskInfo->env, terrno);
×
632
    }
633

634
    int64_t prevTs = *(int64_t*)pTsKey->pData;
×
635
    if (groupId == pBlock->info.id.groupId) {
×
636
      TSKEY curTs = pBlock->info.window.skey;
×
637
      if (tsCols != NULL) {
×
638
        curTs = tsCols[startPos];
×
639
      }
640
      doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, curTs, startPos, w.ekey,
×
641
                                RESULT_ROW_END_INTERP, pSup);
642
    }
643

644
    setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
×
645
    setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
×
646

647
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, 1);
×
648
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0,
×
649
                                          pBlock->info.rows, numOfExprs);
×
650
    if (ret != TSDB_CODE_SUCCESS) {
×
651
      T_LONG_JMP(pTaskInfo->env, ret);
×
652
    }
653

654
    if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
×
655
      closeResultRow(pr);
×
656
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
×
657
      taosMemoryFree(pNode);
×
658
    } else {  // the remains are can not be closed yet.
659
      break;
×
660
    }
661
  }
662
}
16,427✔
663

664
static bool tsKeyCompFn(void* l, void* r, void* param) {
1,695,451,844✔
665
  TSKEY*                    lTS = (TSKEY*)l;
1,695,451,844✔
666
  TSKEY*                    rTS = (TSKEY*)r;
1,695,451,844✔
667
  SIntervalAggOperatorInfo* pInfo = param;
1,695,451,844✔
668
  return pInfo->binfo.outputTsOrder == ORDER_ASC ? *lTS < *rTS : *lTS > *rTS;
1,695,451,844✔
669
}
670

671
static bool isCalculatedWin(SIntervalAggOperatorInfo* pInfo, const STimeWindow* win, uint64_t tableGroupId) {
267,337,303✔
672
  char keyBuf[sizeof(TSKEY) + sizeof(uint64_t)] = {0};
267,337,303✔
673
  SET_RES_WINDOW_KEY(keyBuf, (char*)&win->skey, sizeof(TSKEY), tableGroupId);
267,473,183✔
674
  return tSimpleHashGet(pInfo->aggSup.pResultRowHashTable, keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))) != NULL;
268,131,255✔
675
}
676

677
/**
678
 * @brief check if cur window should be filtered out by limit info
679
 * @retval true if should be filtered out
680
 * @retval false if not filtering out
681
 * @note If no limit info, we skip filtering.
682
 *       If input/output ts order mismatch, we skip filtering too.
683
 *       eg. input ts order: desc, and output ts order: asc, limit: 10
684
 *       IntervalOperator should output the first 10 windows, however, we can't find the first 10 windows until we scan
685
 *       every tuple in every block.
686
 *       And the boundedQueue keeps refreshing all records with smaller ts key.
687
 */
688
static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STimeWindow* win, uint64_t groupId,
2,147,483,647✔
689
                                  SExecTaskInfo* pTaskInfo) {
690
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
691
  int32_t lino = 0;
2,147,483,647✔
692
  if (!pOperatorInfo->limited  // if no limit info, no filter will be applied
2,147,483,647✔
693
      || pOperatorInfo->binfo.inputTsOrder != pOperatorInfo->binfo.outputTsOrder
663,541,161✔
694
      // if input/output ts order mismatch, no filter
695
  ) {
696
    return false;
2,147,483,647✔
697
  }
698

699
  if (pOperatorInfo->limit == 0) return true;
269,165,159✔
700

701
  if (pOperatorInfo->pBQ == NULL) {
269,101,945✔
702
    pOperatorInfo->pBQ = createBoundedQueue(pOperatorInfo->limit - 1, tsKeyCompFn, taosAutoMemoryFree, pOperatorInfo);
326,750✔
703
    QUERY_CHECK_NULL(pOperatorInfo->pBQ, code, lino, _end, terrno);
326,750✔
704
  }
705

706
  bool shouldFilter = false;
269,025,233✔
707
  // if BQ has been full, compare it with top of BQ
708
  if (taosBQSize(pOperatorInfo->pBQ) == taosBQMaxSize(pOperatorInfo->pBQ) + 1) {
269,025,233✔
709
    PriorityQueueNode* top = taosBQTop(pOperatorInfo->pBQ);
75,404,094✔
710
    shouldFilter = tsKeyCompFn(top->data, &win->skey, pOperatorInfo);
75,406,502✔
711
  }
712
  if (shouldFilter) {
267,969,153✔
713
    return true;
863,018✔
714
  } else if (isCalculatedWin(pOperatorInfo, win, groupId)) {
267,106,135✔
715
    return false;
110,870,979✔
716
  }
717

718
  // cur win not been filtered out and not been pushed into BQ yet, push it into BQ
719
  PriorityQueueNode node = {.data = taosMemoryMalloc(sizeof(TSKEY))};
157,378,956✔
720
  QUERY_CHECK_NULL(node.data, code, lino, _end, terrno);
157,265,092✔
721

722
  *((TSKEY*)node.data) = win->skey;
157,265,092✔
723

724
  if (NULL == taosBQPush(pOperatorInfo->pBQ, &node)) {
157,300,180✔
725
    taosMemoryFree(node.data);
×
726
    return true;
×
727
  }
728

729
_end:
157,363,132✔
730
  if (code != TSDB_CODE_SUCCESS) {
157,259,932✔
731
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
20,296✔
732
    pTaskInfo->code = code;
20,296✔
733
    T_LONG_JMP(pTaskInfo->env, code);
×
734
  }
735
  return false;
157,239,636✔
736
}
737

738
int32_t getNumOfRowsInTimeWinUnsorted(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, STimeWindow* win,
2,147,483,647✔
739
                                      int32_t startPos) {
740
  int32_t rows = pDataBlockInfo->rows;
2,147,483,647✔
741
  for (int32_t i = startPos; i < pDataBlockInfo->rows; ++i) {
2,147,483,647✔
742
    if (pPrimaryColumn[i] >= win->skey && pPrimaryColumn[i] <= win->ekey) {
2,147,483,647✔
743
      continue;
2,147,483,647✔
744
    } else {
745
      return i - startPos;
2,147,483,647✔
746
    }
747
  }
748
  return rows - startPos;
96,772,576✔
749
}
750

751
static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
106,026,882✔
752
                            int32_t scanFlag) {
753
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
106,026,882✔
754
  bool                      sorted = pInfo->binfo.inputTsOrder == ORDER_ASC || pInfo->binfo.inputTsOrder == ORDER_DESC;
106,023,708✔
755

756
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
106,027,117✔
757
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
106,012,911✔
758

759
  int32_t     startPos = 0;
106,025,786✔
760
  int32_t     numOfOutput = pSup->numOfExprs;
106,025,786✔
761
  int64_t*    tsCols = extractTsCol(pBlock, pInfo, pTaskInfo);
106,032,739✔
762
  uint64_t    tableGroupId = pBlock->info.id.groupId;
106,013,963✔
763
  bool        ascScan = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
106,014,577✔
764
  SResultRow* pResult = NULL;
105,991,405✔
765
  TSKEY       ts = sorted ? getStartTsKey(&pBlock->info.window, tsCols) : tsCols[startPos];
106,005,658✔
766

767
  if (tableGroupId != pInfo->curGroupId) {
105,965,784✔
768
    pInfo->handledGroupNum += 1;
11,988,972✔
769
    if (pInfo->slimited && pInfo->handledGroupNum > pInfo->slimit) {
11,985,684✔
770
      return true;
14,123✔
771
    } else {
772
      pInfo->curGroupId = tableGroupId;
11,971,845✔
773
      destroyBoundedQueue(pInfo->pBQ);
11,972,447✔
774
      pInfo->pBQ = NULL;
11,968,526✔
775
    }
776
  }
777

778
  STimeWindow win =
105,991,352✔
779
      getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->binfo.inputTsOrder);
105,989,021✔
780
  if (filterWindowWithLimit(pInfo, &win, tableGroupId, pTaskInfo)) return false;
106,004,077✔
781

782
  int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
105,281,098✔
783
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
784
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
105,284,961✔
785
    T_LONG_JMP(pTaskInfo->env, ret);
825✔
786
  }
787

788
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
105,289,532✔
789
  int32_t forwardRows = sorted ? getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey,
10,008,936✔
790
                                                          NULL, pInfo->binfo.inputTsOrder)
791
                               : getNumOfRowsInTimeWinUnsorted(&pBlock->info, tsCols, &win, startPos);
105,289,532✔
792

793
  // prev time window not interpolation yet.
794
  if (pInfo->timeWindowInterpo) {
105,316,158✔
795
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
16,427✔
796
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
16,427✔
797

798
    // restore current time window
799
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
16,427✔
800
                                 numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
801
    if (ret != TSDB_CODE_SUCCESS) {
16,427✔
802
      T_LONG_JMP(pTaskInfo->env, ret);
×
803
    }
804

805
    // window start key interpolation
806
    ret = doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup);
16,427✔
807
    if (ret != TSDB_CODE_SUCCESS) {
16,427✔
808
      T_LONG_JMP(pTaskInfo->env, ret);
×
809
    }
810
  }
811
  // qDebug("hashIntervalAgg1 window skey: %lld, ekey:%lld, startPos: %d, forwardRows: %d",
812
  //   win.skey, win.ekey, startPos, forwardRows);
813
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, 1);
105,320,034✔
814
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
105,303,268✔
815
                                        pBlock->info.rows, numOfOutput);
105,297,108✔
816
  if (ret != TSDB_CODE_SUCCESS) {
105,244,322✔
817
    T_LONG_JMP(pTaskInfo->env, ret);
×
818
  }
819

820
  doCloseWindow(pResultRowInfo, pInfo, pResult);
105,244,322✔
821

822
  STimeWindow nextWin = win;
105,289,680✔
823
  int32_t rows = pBlock->info.rows;
105,305,885✔
824

825
  while (startPos < pBlock->info.rows) {
2,147,483,647✔
826
    if (sorted) {
2,147,483,647✔
827
      startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, forwardRows - 1 + startPos,
2,058,371,440✔
828
                                        pInfo->binfo.inputTsOrder);
829
      if (startPos < 0) {
2,058,389,883✔
830
        break;
9,816,080✔
831
      }
832
    } else {
833
      pBlock->info.rows = forwardRows;
2,147,483,647✔
834
      int32_t newStartOff = forwardRows >= 1
2,147,483,647✔
835
                                ? getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols + startPos,
2,147,483,647✔
836
                                                         forwardRows - 1, pInfo->binfo.inputTsOrder)
837
                                : -1;
2,147,483,647✔
838
      pBlock->info.rows = rows;
2,147,483,647✔
839
      if (newStartOff >= 0) {
2,147,483,647✔
840
        startPos += newStartOff;
74,640,101✔
841
      } else if ((startPos += forwardRows) < pBlock->info.rows) {
2,147,483,647✔
842
        getInitialStartTimeWindow(&pInfo->interval, tsCols[startPos], &nextWin, true);
2,147,483,647✔
843
      }
844
      if (startPos >= pBlock->info.rows) {
2,147,483,647✔
845
        break;
95,297,800✔
846
      }
847
    }
848

849
    if (filterWindowWithLimit(pInfo, &nextWin, tableGroupId, pTaskInfo)) {
2,147,483,647✔
850
      break;
204,947✔
851
    }
852

853
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
2,147,483,647✔
854
    forwardRows = sorted ? getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
2,047,070,835✔
855
                                                    pInfo->binfo.inputTsOrder)
856
                         : getNumOfRowsInTimeWinUnsorted(&pBlock->info, tsCols, &nextWin, startPos);
2,147,483,647✔
857
    if (forwardRows == 0) continue;
2,147,483,647✔
858

859
    // null data, failed to allocate more memory buffer
860
    int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
2,147,483,647✔
861
                                          pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
862
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
2,147,483,647✔
863
      T_LONG_JMP(pTaskInfo->env, code);
150✔
864
    }
865

866
    // window start(end) key interpolation
867
    code = doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
2,147,483,647✔
868
    if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
869
      T_LONG_JMP(pTaskInfo->env, code);
×
870
    }
871
    // TODO: add to open window? how to close the open windows after input blocks exhausted?
872
#if 0
873
    if ((ascScan && ekey <= pBlock->info.window.ekey) ||
874
        (!ascScan && ekey >= pBlock->info.window.skey)) {
875
      // window start(end) key interpolation
876
      doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
877
    } else if (pInfo->timeWindowInterpo) {
878
      addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
879
    }
880
#endif
881
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, 1);
2,147,483,647✔
882
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
2,147,483,647✔
883
                                          pBlock->info.rows, numOfOutput);
2,147,483,647✔
884
    if (ret != TSDB_CODE_SUCCESS) {
2,147,483,647✔
885
      T_LONG_JMP(pTaskInfo->env, ret);
×
886
    }
887
    doCloseWindow(pResultRowInfo, pInfo, pResult);
2,147,483,647✔
888
  }
889

890
  if (pInfo->timeWindowInterpo) {
107,713,237✔
891
    saveDataBlockLastRow(pInfo->pPrevValues, pBlock, pInfo->pInterpCols);
16,427✔
892
  }
893
  return false;
105,317,031✔
894
}
895

896
void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult) {
2,147,483,647✔
897
  // current result is done in computing final results.
898
  if (pInfo->timeWindowInterpo && isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
2,147,483,647✔
899
    closeResultRow(pResult);
5,212,912✔
900
    SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
5,212,912✔
901
    taosMemoryFree(pNode);
5,212,912✔
902
  }
903
}
2,147,483,647✔
904

905
SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId,
16,427✔
906
                                       SExecTaskInfo* pTaskInfo) {
907
  int32_t         code = TSDB_CODE_SUCCESS;
16,427✔
908
  int32_t         lino = 0;
16,427✔
909
  SOpenWindowInfo openWin = {0};
16,427✔
910
  openWin.pos.pageId = pResult->pageId;
16,427✔
911
  openWin.pos.offset = pResult->offset;
16,427✔
912
  openWin.groupId = groupId;
16,427✔
913
  SListNode* pn = tdListGetTail(pResultRowInfo->openWindow);
16,427✔
914
  if (pn == NULL) {
16,427✔
915
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
16,427✔
916
    QUERY_CHECK_CODE(code, lino, _end);
16,427✔
917
    return openWin.pos;
16,427✔
918
  }
919

920
  SOpenWindowInfo* px = (SOpenWindowInfo*)pn->data;
×
921
  if (px->pos.pageId != openWin.pos.pageId || px->pos.offset != openWin.pos.offset || px->groupId != openWin.groupId) {
×
922
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
×
923
    QUERY_CHECK_CODE(code, lino, _end);
×
924
  }
925

926
_end:
×
927
  if (code != TSDB_CODE_SUCCESS) {
×
928
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
929
    pTaskInfo->code = code;
×
930
    T_LONG_JMP(pTaskInfo->env, code);
×
931
  }
932
  return openWin.pos;
×
933
}
934

935
int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo) {
110,409,027✔
936
  TSKEY* tsCols = NULL;
110,409,027✔
937

938
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
110,409,027✔
939
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
110,428,836✔
940
    if (!pColDataInfo) {
110,421,128✔
941
      pTaskInfo->code = terrno;
×
942
      T_LONG_JMP(pTaskInfo->env, terrno);
×
943
    }
944

945
    tsCols = (int64_t*)pColDataInfo->pData;
110,421,128✔
946
    if (tsCols[0] == 0) {
110,410,272✔
947
      qWarn("%s at line %d.block start ts:%" PRId64 ",end ts:%" PRId64, __func__, __LINE__, tsCols[0],
377✔
948
            tsCols[pBlock->info.rows - 1]);
949
    }
950

951
    if (tsCols[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
110,425,691✔
952
      int32_t code = blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
7,041,924✔
953
      if (code != TSDB_CODE_SUCCESS) {
7,030,347✔
954
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
955
        pTaskInfo->code = code;
×
956
        T_LONG_JMP(pTaskInfo->env, code);
×
957
      }
958
    }
959
  }
960

961
  return tsCols;
110,414,823✔
962
}
963

964
static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
29,041,420✔
965
  if (OPTR_IS_OPENED(pOperator)) {
29,041,420✔
966
    return TSDB_CODE_SUCCESS;
17,399,756✔
967
  }
968

969
  int32_t        code = TSDB_CODE_SUCCESS;
11,636,453✔
970
  int32_t        lino = 0;
11,636,453✔
971
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
11,636,453✔
972
  SOperatorInfo* downstream = pOperator->pDownstream[0];
11,643,495✔
973

974
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
11,607,601✔
975
  SExprSupp*                pSup = &pOperator->exprSupp;
11,617,199✔
976

977
  int32_t scanFlag = MAIN_SCAN;
11,639,803✔
978
  int64_t st = taosGetTimestampUs();
11,643,269✔
979

980
  pInfo->cleanGroupResInfo = false;
11,643,269✔
981
  while (1) {
106,013,392✔
982
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
117,638,427✔
983
    if (pBlock == NULL) {
117,645,778✔
984
      break;
11,037,606✔
985
    }
986

987
    pInfo->binfo.pRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
106,608,172✔
988

989
    if (pInfo->scalarSupp.pExprInfo != NULL) {
106,624,732✔
990
      SExprSupp* pExprSup = &pInfo->scalarSupp;
12,908,799✔
991
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
12,911,171✔
992
                                   GET_STM_RTINFO(pOperator->pTaskInfo));
12,908,289✔
993
      QUERY_CHECK_CODE(code, lino, _end);
12,909,651✔
994
    }
995

996
    // the pDataBlock are always the same one, no need to call this again
997
    code = setInputDataBlock(pSup, pBlock, pInfo->binfo.inputTsOrder, scanFlag, true);
106,022,211✔
998
    QUERY_CHECK_CODE(code, lino, _end);
106,031,633✔
999
    if (hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag)) break;
106,031,633✔
1000
  }
1001

1002
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->binfo.outputTsOrder);
11,051,729✔
1003
  QUERY_CHECK_CODE(code, lino, _end);
11,051,246✔
1004
  pInfo->cleanGroupResInfo = true;
11,051,246✔
1005

1006
  OPTR_SET_OPENED(pOperator);
11,051,246✔
1007

1008
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
11,051,161✔
1009

1010
_end:
11,652,867✔
1011
  if (code != TSDB_CODE_SUCCESS) {
11,652,867✔
1012
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
602,473✔
1013
    pTaskInfo->code = code;
602,473✔
1014
    T_LONG_JMP(pTaskInfo->env, code);
602,473✔
1015
  }
1016
  return code;
11,050,394✔
1017
}
1018

1019
// start a new state window and record the start info
1020
void doKeepNewStateWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList,
8,213,099✔
1021
  int32_t rowIndex, uint64_t groupId, const EStateWinExtendOption* extendOption, bool hasPrevWin) {
1022
  pRowSup->groupId = groupId;
8,213,099✔
1023
  if (*extendOption == STATE_WIN_EXTEND_OPTION_DEFAULT ||
8,213,099✔
1024
      *extendOption == STATE_WIN_EXTEND_OPTION_BACKWARD) {
85,224✔
1025
    pRowSup->win.skey = tsList[rowIndex];
8,163,107✔
1026
    pRowSup->startRowIndex = rowIndex;
8,163,107✔
1027
    pRowSup->numOfRows = 0;  // does not include the current row yet
8,163,107✔
1028
  } else {
1029
    pRowSup->startRowIndex = hasContinuousNullRows(pRowSup) ?
99,984✔
1030
      rowIndex - pRowSup->numNullRows : rowIndex;
49,992✔
1031
    pRowSup->win.skey = hasPrevWin ?
49,992✔
1032
                        pRowSup->win.ekey + 1 : tsList[pRowSup->startRowIndex];
49,992✔
1033
    pRowSup->numOfRows = rowIndex - pRowSup->startRowIndex;
49,992✔
1034
  }
1035
  resetNumNullRows(pRowSup);
8,213,099✔
1036
}
8,213,099✔
1037

1038
// close a state window and record its end info
1039
// this functions is called when a new state row appears
1040
// @param rowIndex the index of the first row of next window
1041
void doKeepCurStateWindowEndInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, 
8,291,654✔
1042
                                 int32_t rowIndex,
1043
                                 const EStateWinExtendOption* extendOption,
1044
                                 bool hasNextWin) {
1045
  if (*extendOption == STATE_WIN_EXTEND_OPTION_BACKWARD) {
8,291,654✔
1046
      pRowSup->win.ekey = hasNextWin?
41,136✔
1047
                          tsList[rowIndex] - 1 : pRowSup->prevTs;
41,136✔
1048
      // continuous rows having null state col should be included in this window
1049
      pRowSup->numOfRows += hasContinuousNullRows(pRowSup) ?
82,272✔
1050
        pRowSup->numNullRows : 0;
41,136✔
1051
      resetNumNullRows(pRowSup);
41,136✔
1052
  }
1053
}
8,291,654✔
1054

1055
void doKeepStateWindowNullInfo(SWindowRowsSup* pRowSup, TSKEY nullRowTs) {
19,374,373✔
1056
  pRowSup->numNullRows += 1;
19,374,373✔
1057
  pRowSup->prevTs = nullRowTs;
19,374,373✔
1058
}
19,374,373✔
1059

1060
// process a closed state window
1061
// do aggregation on the tuples within the window
1062
// partial aggregation results are stored in the output buffer
1063
static int32_t processClosedStateWindow(SStateWindowOperatorInfo* pInfo,
8,291,654✔
1064
  SWindowRowsSup* pRowSup, SExecTaskInfo* pTaskInfo,
1065
  SExprSupp* pSup, int32_t numOfOutput) {
1066
  int32_t     code = 0;
8,291,654✔
1067
  int32_t     lino = 0;
8,291,654✔
1068
  SResultRow* pResult = NULL;
8,291,654✔
1069
  if (pRowSup->numOfRows == 0) {
8,291,654✔
1070
    // no valid rows within the window
1071
    return TSDB_CODE_SUCCESS;
139,835✔
1072
  }
1073
  code = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win,
16,303,638✔
1074
    true, &pResult, pRowSup->groupId, pSup->pCtx, numOfOutput,
8,151,819✔
1075
    pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1076
  QUERY_CHECK_CODE(code, lino, _return);
8,151,819✔
1077

1078
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
8,151,819✔
1079
  code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx,
8,151,819✔
1080
    &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
1081
    pRowSup->numOfRows, 0, numOfOutput);
1082
  QUERY_CHECK_CODE(code, lino, _return);
8,151,819✔
1083

1084
_return:
8,151,819✔
1085
  if (code != TSDB_CODE_SUCCESS) {
8,151,819✔
1086
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
418✔
1087
  }
1088
  return code;
8,151,819✔
1089
}
1090

1091
// process a data block for state window aggregation
1092
// scan from startIndex to endIndex
1093
// numPartialCalcRows returns the number of rows that have been
1094
// partially calculated within the block
1095
static void doStateWindowAggImpl(SOperatorInfo* pOperator,
1,151,062✔
1096
                                 SStateWindowOperatorInfo* pInfo,
1097
                                 SSDataBlock* pBlock, int32_t* startIndex,
1098
                                 int32_t* endIndex,
1099
                                 int32_t* numPartialCalcRows) {
1100
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1,151,062✔
1101
  SExprSupp*     pExprSup = &pOperator->exprSupp;
1,151,062✔
1102

1103
  SColumnInfoData* pStateColInfoData = 
1104
    taosArrayGet(pBlock->pDataBlock, pInfo->stateCol.slotId);
1,151,062✔
1105
  if (!pStateColInfoData) {
1,151,062✔
1106
    pTaskInfo->code = terrno;
×
1107
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1108
  }
1109
  uint64_t gid = pBlock->info.id.groupId;
1,151,062✔
1110
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
1,151,062✔
1111
  int32_t bytes = pStateColInfoData->info.bytes;
1,151,062✔
1112

1113
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock,
1,151,062✔
1114
                                               pInfo->tsSlotId);
1,151,062✔
1115
  if (NULL == pColInfoData) {
1,151,062✔
1116
    pTaskInfo->code = terrno;
×
1117
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1118
  }
1119
  TSKEY* tsList = (TSKEY*)pColInfoData->pData;
1,151,062✔
1120

1121
  struct SColumnDataAgg* pAgg = (pBlock->pBlockAgg != NULL) ?
1,151,062✔
1122
                                &pBlock->pBlockAgg[pInfo->stateCol.slotId] :
1,151,062✔
1123
                                NULL;
1124
  EStateWinExtendOption  extendOption = pInfo->extendOption;
1,151,062✔
1125
  SWindowRowsSup*        pRowSup = &pInfo->winSup;
1,151,062✔
1126

1127
  if (pRowSup->groupId != gid) {
1,151,062✔
1128
    /*
1129
      group changed, process the previous group's unclosed state window first
1130
    */
1131
    doKeepCurStateWindowEndInfo(pRowSup, tsList, 0, &extendOption, false);
136,883✔
1132
    int32_t code = processClosedStateWindow(pInfo, pRowSup, pTaskInfo,
136,883✔
1133
                                            pExprSup, numOfOutput);
1134
    if (TSDB_CODE_SUCCESS != code) T_LONG_JMP(pTaskInfo->env, code);
136,883✔
1135
    *numPartialCalcRows = pRowSup->startRowIndex + pRowSup->numOfRows;
136,883✔
1136

1137
    /*
1138
      unhandled null rows should be ignored, since they belong to previous group
1139
    */
1140
    *numPartialCalcRows += pRowSup->numNullRows;
136,883✔
1141

1142
    /*
1143
      reset state window info for new group
1144
    */
1145
    pInfo->hasKey = false;
136,883✔
1146
    resetWindowRowsSup(pRowSup);
136,883✔
1147
  }
1148

1149
  for (int32_t j = *startIndex; j < *endIndex; ++j) {
86,849,017✔
1150
    if (pBlock->info.scanFlag != PRE_SCAN) {
85,775,054✔
1151
      if (pInfo->winSup.lastTs == INT64_MIN || gid != pRowSup->groupId || !pInfo->hasKey) {
85,717,336✔
1152
        pInfo->winSup.lastTs = tsList[j];
5,217,518✔
1153
      } else {
1154
        if (tsList[j] == pInfo->winSup.lastTs) {
80,499,818✔
1155
          // forbid duplicated ts rows
1156
          qError("%s:%d duplicated ts found in state window aggregation", __FILE__, __LINE__);
77,099✔
1157
          pTaskInfo->code = TSDB_CODE_QRY_WINDOW_DUP_TIMESTAMP;
77,099✔
1158
          T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_WINDOW_DUP_TIMESTAMP);
77,099✔
1159
        } else {
1160
          pInfo->winSup.lastTs = tsList[j];
80,422,719✔
1161
        }
1162
      }
1163
    }
1164
    if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pAgg)) {
171,395,910✔
1165
      doKeepStateWindowNullInfo(pRowSup, tsList[j]);
19,374,373✔
1166
      continue;
19,374,373✔
1167
    }
1168
    if (pStateColInfoData->pData == NULL) {
66,323,582✔
1169
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1170
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1171
    }
1172
    char* val = colDataGetData(pStateColInfoData, j);
66,323,582✔
1173

1174
    if (!pInfo->hasKey) {
66,323,582✔
1175
      assignVal(pInfo->stateKey.pData, val, bytes, pInfo->stateKey.type);
1,120,798✔
1176
      pInfo->hasKey = true;
1,120,798✔
1177
      doKeepNewStateWindowStartInfo(
1,120,798✔
1178
        pRowSup, tsList, j, gid, &extendOption, false);
1179
      doKeepTuple(pRowSup, tsList[j], j, gid);
1,120,798✔
1180
    } else if (!compareVal(val, &pInfo->stateKey)) {
65,202,784✔
1181
      doKeepCurStateWindowEndInfo(pRowSup, tsList, j, &extendOption, true);
7,092,301✔
1182
      int32_t code = processClosedStateWindow(pInfo, pRowSup, pTaskInfo,
7,092,301✔
1183
                                              pExprSup, numOfOutput);
1184
      if (TSDB_CODE_SUCCESS != code) {
7,092,301✔
1185
        T_LONG_JMP(pTaskInfo->env, code);
×
1186
      }
1187
      *numPartialCalcRows = pRowSup->startRowIndex + pRowSup->numOfRows;
7,092,301✔
1188

1189
      doKeepNewStateWindowStartInfo(pRowSup, tsList, j, gid,
7,092,301✔
1190
                                    &extendOption, true);
1191
      doKeepTuple(pRowSup, tsList[j], j, gid);
7,092,301✔
1192
      assignVal(pInfo->stateKey.pData, val, bytes, pInfo->stateKey.type);
7,092,301✔
1193
    } else {
1194
      doKeepTuple(pRowSup, tsList[j], j, gid);
58,110,483✔
1195
    }
1196
  }
1197

1198
  if (!pInfo->hasKey && extendOption != STATE_WIN_EXTEND_OPTION_FORWARD) {
1,073,963✔
1199
    /*
1200
      No valid state rows within the block and we don't care about
1201
      null rows before valid state window, mark them as processed and drop them
1202
    */
1203
    *numPartialCalcRows = pBlock->info.rows;
8,910✔
1204
    return;
8,910✔
1205
  }
1206
  if (pRowSup->numOfRows == 0 && 
1,065,053✔
1207
      extendOption != STATE_WIN_EXTEND_OPTION_BACKWARD) {
3,690✔
1208
    /*
1209
      If no valid state window or we don't know the belonging of
1210
      these null rows, return and handle them with next block
1211
    */
1212
    return;
2,583✔
1213
  }
1214
  doKeepCurStateWindowEndInfo(pRowSup, tsList, *endIndex, &extendOption, false);
1,062,470✔
1215
  int32_t code = processClosedStateWindow(pInfo, pRowSup, pTaskInfo,
1,062,470✔
1216
                                          pExprSup, numOfOutput);
1217
  if (TSDB_CODE_SUCCESS != code) {
1,062,470✔
1218
    pTaskInfo->code = code;
418✔
1219
    T_LONG_JMP(pTaskInfo->env, code);
418✔
1220
  }
1221
  *numPartialCalcRows = pRowSup->startRowIndex + pRowSup->numOfRows;
1,062,052✔
1222
  // reset part of pRowSup after doing agg calculation
1223
  pRowSup->startRowIndex = 0;
1,062,052✔
1224
  pRowSup->numOfRows = 0;
1,062,052✔
1225
}
1226

1227
static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) {
2,082,972✔
1228
  if (OPTR_IS_OPENED(pOperator)) {
2,082,972✔
1229
    return TSDB_CODE_SUCCESS;
73,402✔
1230
  }
1231

1232
  int32_t                   code = TSDB_CODE_SUCCESS;
2,009,570✔
1233
  int32_t                   lino = 0;
2,009,570✔
1234
  SStateWindowOperatorInfo* pInfo = pOperator->info;
2,009,570✔
1235
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
2,009,570✔
1236

1237
  SExprSupp* pSup = &pOperator->exprSupp;
2,009,570✔
1238
  int32_t    order = pInfo->binfo.inputTsOrder;
2,009,570✔
1239
  int64_t    st = taosGetTimestampUs();
2,009,570✔
1240

1241
  SOperatorInfo* downstream = pOperator->pDownstream[0];
2,009,570✔
1242
  pInfo->cleanGroupResInfo = false;
2,009,570✔
1243

1244
  SSDataBlock* pUnfinishedBlock = NULL;
2,009,570✔
1245
  int32_t      startIndex = 0;
2,009,570✔
1246
  int32_t      endIndex = 0;
2,009,570✔
1247
  int32_t      numPartialCalcRows = 0;
2,009,570✔
1248
  while (1) {
1,073,545✔
1249
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
3,083,115✔
1250
    if (pBlock == NULL) {
3,082,017✔
1251
      if (pUnfinishedBlock != NULL) {
1,540,041✔
1252
        blockDataDestroy(pUnfinishedBlock);
11,397✔
1253
        pUnfinishedBlock = NULL;
11,397✔
1254
        resetWindowRowsSup(&pInfo->winSup);
11,397✔
1255
      }
1256
      break;
1,540,041✔
1257
    }
1258
    
1259
    // mark whether pUnfinishedBlock is a reference to pBlock
1260
    bool isRef = false;
1,541,976✔
1261
    startIndex = 0;
1,541,976✔
1262
    if (pUnfinishedBlock != NULL) {
1,541,976✔
1263
      startIndex = pUnfinishedBlock->info.rows;
5,904✔
1264
      // merge unfinished block with current block
1265
      code = blockDataMerge(pUnfinishedBlock, pBlock);
5,904✔
1266
      // reset id to current block id
1267
      pUnfinishedBlock->info.id = pBlock->info.id;
5,904✔
1268
      QUERY_CHECK_CODE(code, lino, _end);
5,904✔
1269
    } else {
1270
      pUnfinishedBlock = pBlock;
1,536,072✔
1271
      isRef = true;
1,536,072✔
1272
    }
1273
    endIndex = pUnfinishedBlock->info.rows;
1,541,976✔
1274

1275
    pInfo->binfo.pRes->info.scanFlag = pUnfinishedBlock->info.scanFlag;
1,541,976✔
1276
    code = setInputDataBlock(
1,541,976✔
1277
      pSup, pUnfinishedBlock, order, pUnfinishedBlock->info.scanFlag, true);
1,541,976✔
1278
    QUERY_CHECK_CODE(code, lino, _end);
1,541,976✔
1279

1280
    code = blockDataUpdateTsWindow(pUnfinishedBlock, pInfo->tsSlotId);
1,541,976✔
1281
    QUERY_CHECK_CODE(code, lino, _end);
1,541,976✔
1282

1283
    // there is an scalar expression that 
1284
    // needs to be calculated right before apply the group aggregation.
1285
    if (pInfo->scalarSup.pExprInfo != NULL) {
1,541,976✔
1286
      pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo,
1,368,795✔
1287
        pUnfinishedBlock, pUnfinishedBlock, pInfo->scalarSup.pCtx,
1288
        pInfo->scalarSup.numOfExprs, NULL,
1289
        GET_STM_RTINFO(pOperator->pTaskInfo));
1,368,795✔
1290
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
1,368,795✔
1291
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
390,914✔
1292
      }
1293
    }
1294

1295
    doStateWindowAggImpl(pOperator, pInfo, pUnfinishedBlock, 
1,151,062✔
1296
      &startIndex, &endIndex, &numPartialCalcRows);
1297
    if (numPartialCalcRows < pUnfinishedBlock->info.rows) {
1,073,545✔
1298
      // save unfinished block for next round processing
1299
      if (isRef) {
17,301✔
1300
        code = createOneDataBlock(pBlock, true, &pUnfinishedBlock);
14,718✔
1301
        QUERY_CHECK_CODE(code, lino, _end);
14,718✔
1302
      }
1303
      code = blockDataTrimFirstRows(pUnfinishedBlock, numPartialCalcRows);
17,301✔
1304
      QUERY_CHECK_NULL(pUnfinishedBlock, code, lino, _end, terrno);
17,301✔
1305
    } else {
1306
      if (!isRef) {
1,056,244✔
1307
        blockDataDestroy(pUnfinishedBlock);
3,321✔
1308
      }
1309
      pUnfinishedBlock = NULL;
1,056,244✔
1310
    }
1311
    numPartialCalcRows = 0;
1,073,545✔
1312
  }
1313

1314
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
1,540,041✔
1315
  code = initGroupedResultInfo(
1,540,041✔
1316
    &pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
1317
  QUERY_CHECK_CODE(code, lino, _end);
1,540,041✔
1318
  pInfo->cleanGroupResInfo = true;
1,540,041✔
1319
  pOperator->status = OP_RES_TO_RETURN;
1,540,041✔
1320

1321
_end:
1,540,041✔
1322
  if (code != TSDB_CODE_SUCCESS) {
1,540,041✔
1323
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1324
    pTaskInfo->code = code;
×
1325
    T_LONG_JMP(pTaskInfo->env, code);
×
1326
  }
1327
  return code;
1,540,041✔
1328
}
1329

1330
static int32_t doStateWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
2,999,285✔
1331
  if (pOperator->status == OP_EXEC_DONE) {
2,999,285✔
1332
    (*ppRes) = NULL;
916,313✔
1333
    return TSDB_CODE_SUCCESS;
916,313✔
1334
  }
1335

1336
  int32_t                   code = TSDB_CODE_SUCCESS;
2,082,972✔
1337
  int32_t                   lino = 0;
2,082,972✔
1338
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
2,082,972✔
1339
  SStateWindowOperatorInfo* pInfo = pOperator->info;
2,082,972✔
1340
  SOptrBasicInfo*           pBInfo = &pInfo->binfo;
2,082,972✔
1341

1342
  code = pOperator->fpSet._openFn(pOperator);
2,082,972✔
1343
  QUERY_CHECK_CODE(code, lino, _end);
1,613,443✔
1344

1345
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
1,613,443✔
1346
  QUERY_CHECK_CODE(code, lino, _end);
1,613,443✔
1347

1348
  while (1) {
×
1349
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
1,613,443✔
1350
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
1,613,443✔
1351
    QUERY_CHECK_CODE(code, lino, _end);
1,613,443✔
1352

1353
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
1,613,443✔
1354
    if (!hasRemain) {
1,613,443✔
1355
      setOperatorCompleted(pOperator);
1,540,041✔
1356
      break;
1,540,041✔
1357
    }
1358

1359
    if (pBInfo->pRes->info.rows > 0) {
73,402✔
1360
      break;
73,402✔
1361
    }
1362
  }
1363

1364
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
1,613,443✔
1365

1366
_end:
1,613,443✔
1367
  if (code != TSDB_CODE_SUCCESS) {
1,613,443✔
1368
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1369
    pTaskInfo->code = code;
×
1370
    T_LONG_JMP(pTaskInfo->env, code);
×
1371
  }
1372
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
1,613,443✔
1373
  return code;
1,613,443✔
1374
}
1375

1376
static int32_t doBuildIntervalResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
36,502,627✔
1377
  int32_t                   code = TSDB_CODE_SUCCESS;
36,502,627✔
1378
  int32_t                   lino = 0;
36,502,627✔
1379
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
36,502,627✔
1380
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
36,510,011✔
1381

1382
  if (pOperator->status == OP_EXEC_DONE) {
36,499,787✔
1383
    (*ppRes) = NULL;
7,458,808✔
1384
    return code;
7,458,524✔
1385
  }
1386

1387
  SSDataBlock* pBlock = pInfo->binfo.pRes;
29,015,877✔
1388
  code = pOperator->fpSet._openFn(pOperator);
29,006,105✔
1389
  QUERY_CHECK_CODE(code, lino, _end);
28,451,002✔
1390

1391
  while (1) {
5,595✔
1392
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
28,456,597✔
1393
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL, NULL);
28,457,449✔
1394
    QUERY_CHECK_CODE(code, lino, _end);
28,457,449✔
1395

1396
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
28,457,449✔
1397
    if (!hasRemain) {
28,457,165✔
1398
      setOperatorCompleted(pOperator);
11,041,475✔
1399
      break;
11,041,475✔
1400
    }
1401

1402
    if (pBlock->info.rows > 0) {
17,415,690✔
1403
      break;
17,410,095✔
1404
    }
1405
  }
1406

1407
  size_t rows = pBlock->info.rows;
28,451,570✔
1408
  pOperator->resultInfo.totalRows += rows;
28,451,286✔
1409

1410
_end:
28,451,286✔
1411
  if (code != TSDB_CODE_SUCCESS) {
28,451,286✔
1412
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1413
    pTaskInfo->code = code;
×
1414
    T_LONG_JMP(pTaskInfo->env, code);
×
1415
  }
1416
  (*ppRes) = (rows == 0) ? NULL : pBlock;
28,451,286✔
1417
  return code;
28,451,002✔
1418
}
1419

1420
static void destroyStateWindowOperatorInfo(void* param) {
1,972,392✔
1421
  if (param == NULL) {
1,972,392✔
1422
    return;
×
1423
  }
1424
  SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
1,972,392✔
1425
  cleanupBasicInfo(&pInfo->binfo);
1,972,392✔
1426
  taosMemoryFreeClear(pInfo->stateKey.pData);
1,972,392✔
1427
  if (pInfo->pOperator) {
1,972,392✔
1428
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
1,972,392✔
1429
                      pInfo->cleanGroupResInfo);
1,972,392✔
1430
    pInfo->pOperator = NULL;
1,972,392✔
1431
  }
1432

1433
  cleanupExprSupp(&pInfo->scalarSup);
1,972,392✔
1434
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
1,972,392✔
1435
  cleanupAggSup(&pInfo->aggSup);
1,972,392✔
1436
  cleanupGroupResInfo(&pInfo->groupResInfo);
1,972,392✔
1437

1438
  taosMemoryFreeClear(param);
1,972,392✔
1439
}
1440

1441
static void freeItem(void* param) {
30,798✔
1442
  SGroupKeys* pKey = (SGroupKeys*)param;
30,798✔
1443
  taosMemoryFree(pKey->pData);
30,798✔
1444
}
30,798✔
1445

1446
void destroyIntervalOperatorInfo(void* param) {
13,480,279✔
1447
  if (param == NULL) {
13,480,279✔
1448
    return;
×
1449
  }
1450

1451
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
13,480,279✔
1452

1453
  cleanupBasicInfo(&pInfo->binfo);
13,480,279✔
1454
  if (pInfo->pOperator) {
13,479,427✔
1455
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
12,930,857✔
1456
                      pInfo->cleanGroupResInfo);
12,930,857✔
1457
    pInfo->pOperator = NULL;
12,930,857✔
1458
  }
1459

1460
  cleanupAggSup(&pInfo->aggSup);
13,479,427✔
1461
  cleanupExprSupp(&pInfo->scalarSupp);
13,479,427✔
1462

1463
  pInfo->binfo.resultRowInfo.openWindow = tdListFree(pInfo->binfo.resultRowInfo.openWindow);
13,480,279✔
1464

1465
  taosArrayDestroy(pInfo->pInterpCols);
13,479,711✔
1466
  pInfo->pInterpCols = NULL;
13,479,143✔
1467

1468
  taosArrayDestroyEx(pInfo->pPrevValues, freeItem);
13,479,711✔
1469
  pInfo->pPrevValues = NULL;
13,478,007✔
1470

1471
  cleanupGroupResInfo(&pInfo->groupResInfo);
13,478,575✔
1472
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
13,477,439✔
1473
  destroyBoundedQueue(pInfo->pBQ);
13,478,291✔
1474
  taosMemoryFreeClear(param);
13,478,007✔
1475
}
1476

1477
static int32_t initWindowInterpPrevVal(SIntervalAggOperatorInfo* pInfo) {
15,399✔
1478
  int32_t code = TSDB_CODE_SUCCESS;
15,399✔
1479
  int32_t lino = 0;
15,399✔
1480
  void*   tmp = NULL;
15,399✔
1481

1482
  pInfo->pInterpCols = taosArrayInit(4, sizeof(SColumn));
15,399✔
1483
  QUERY_CHECK_NULL(pInfo->pInterpCols, code, lino, _end, terrno);
15,399✔
1484

1485
  pInfo->pPrevValues = taosArrayInit(4, sizeof(SGroupKeys));
15,399✔
1486
  QUERY_CHECK_NULL(pInfo->pPrevValues, code, lino, _end, terrno);
15,399✔
1487

1488
  {  // ts column
1489
    SColumn c = {0};
15,399✔
1490
    c.colId = 1;
15,399✔
1491
    c.slotId = pInfo->primaryTsIndex;
15,399✔
1492
    c.type = TSDB_DATA_TYPE_TIMESTAMP;
15,399✔
1493
    c.bytes = sizeof(int64_t);
15,399✔
1494
    tmp = taosArrayPush(pInfo->pInterpCols, &c);
15,399✔
1495
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
15,399✔
1496

1497
    SGroupKeys key;
15,399✔
1498
    key.bytes = c.bytes;
15,399✔
1499
    key.type = c.type;
15,399✔
1500
    key.isNull = true;  // to denote no value is assigned yet
15,399✔
1501
    key.pData = taosMemoryCalloc(1, c.bytes);
15,399✔
1502
    QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
15,399✔
1503

1504
    tmp = taosArrayPush(pInfo->pPrevValues, &key);
15,399✔
1505
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
15,399✔
1506
  }
1507
_end:
15,399✔
1508
  if (code != TSDB_CODE_SUCCESS) {
15,399✔
1509
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1510
  }
1511
  return code;
15,399✔
1512
}
1513

1514
static int32_t timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SIntervalAggOperatorInfo* pInfo,
12,915,380✔
1515
                                      bool* pRes) {
1516
  // the primary timestamp column
1517
  bool    needed = false;
12,915,380✔
1518
  int32_t code = TSDB_CODE_SUCCESS;
12,915,380✔
1519
  int32_t lino = 0;
12,915,380✔
1520
  void*   tmp = NULL;
12,915,380✔
1521

1522
  for (int32_t i = 0; i < numOfCols; ++i) {
39,737,921✔
1523
    SExprInfo* pExpr = pCtx[i].pExpr;
26,879,350✔
1524
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
26,845,312✔
1525
      needed = true;
15,399✔
1526
      break;
15,399✔
1527
    }
1528
  }
1529

1530
  if (needed) {
12,873,970✔
1531
    code = initWindowInterpPrevVal(pInfo);
15,399✔
1532
    QUERY_CHECK_CODE(code, lino, _end);
15,399✔
1533
  }
1534

1535
  for (int32_t i = 0; i < numOfCols; ++i) {
39,645,744✔
1536
    SExprInfo* pExpr = pCtx[i].pExpr;
26,825,206✔
1537

1538
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
26,813,399✔
1539
      SFunctParam* pParam = &pExpr->base.pParam[0];
15,399✔
1540

1541
      SColumn c = *pParam->pCol;
15,399✔
1542
      tmp = taosArrayPush(pInfo->pInterpCols, &c);
15,399✔
1543
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
15,399✔
1544

1545
      SGroupKeys key = {0};
15,399✔
1546
      key.bytes = c.bytes;
15,399✔
1547
      key.type = c.type;
15,399✔
1548
      key.isNull = false;
15,399✔
1549
      key.pData = taosMemoryCalloc(1, c.bytes);
15,399✔
1550
      QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
15,399✔
1551

1552
      tmp = taosArrayPush(pInfo->pPrevValues, &key);
15,399✔
1553
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
15,399✔
1554
    }
1555
  }
1556

1557
_end:
12,820,538✔
1558
  if (code != TSDB_CODE_SUCCESS) {
12,830,597✔
1559
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1560
  }
1561
  *pRes = needed;
12,830,597✔
1562
  return code;
12,871,997✔
1563
}
1564

1565
static int32_t resetInterval(SOperatorInfo* pOper, SIntervalAggOperatorInfo* pIntervalInfo){
1,416✔
1566
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
1,416✔
1567
  SIntervalPhysiNode* pPhynode = (SIntervalPhysiNode*)pOper->pPhyNode;
1,416✔
1568
  pOper->status = OP_NOT_OPENED;
1,416✔
1569

1570
  resetBasicOperatorState(&pIntervalInfo->binfo);
1,416✔
1571
  cleanupResultInfo(pIntervalInfo->pOperator->pTaskInfo, &pIntervalInfo->pOperator->exprSupp, &pIntervalInfo->groupResInfo, &pIntervalInfo->aggSup,
1,416✔
1572
    pIntervalInfo->cleanGroupResInfo);
1,416✔
1573

1574
  colDataDestroy(&pIntervalInfo->twAggSup.timeWindowData);
1,416✔
1575
  int32_t code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pTaskInfo->window);
1,416✔
1576
  if (code == 0) {
1,416✔
1577
    code = resetAggSup(&pOper->exprSupp, &pIntervalInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
2,832✔
1578
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
1,416✔
1579
                       &pTaskInfo->storageAPI.functionStore);
1580
  }
1581
  if (code == 0) {
1,416✔
1582
    code = resetExprSupp(&pIntervalInfo->scalarSupp, pTaskInfo, pPhynode->window.pExprs, NULL,
1,416✔
1583
                         &pTaskInfo->storageAPI.functionStore);
1584
  }
1585

1586
  if (pIntervalInfo->binfo.resultRowInfo.openWindow != NULL){
1,416✔
1587
    tdListEmpty(pIntervalInfo->binfo.resultRowInfo.openWindow);
×
1588
  }
1589

1590
  if (pPhynode->window.node.pSlimit && ((SLimitNode*)pPhynode->window.node.pSlimit)->limit) {
1,416✔
1591
    pIntervalInfo->curGroupId = UINT64_MAX;
×
1592
  }
1593

1594
  pIntervalInfo->cleanGroupResInfo = false;
1,416✔
1595
  pIntervalInfo->handledGroupNum = 0;
1,416✔
1596
  pIntervalInfo->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder;
1,416✔
1597
  pIntervalInfo->binfo.outputTsOrder = pPhynode->window.node.outputTsOrder;
1,416✔
1598

1599
  taosArrayDestroy(pIntervalInfo->pInterpCols);
1,416✔
1600
  pIntervalInfo->pInterpCols = NULL;
1,416✔
1601

1602
  if (pIntervalInfo->pPrevValues != NULL) {
1,416✔
1603
    taosArrayDestroyEx(pIntervalInfo->pPrevValues, freeItem);
×
1604
    pIntervalInfo->pPrevValues = NULL;
×
1605
    code = initWindowInterpPrevVal(pIntervalInfo);
×
1606
  }
1607

1608
  cleanupGroupResInfo(&pIntervalInfo->groupResInfo);
1,416✔
1609
  destroyBoundedQueue(pIntervalInfo->pBQ);
1,416✔
1610
  pIntervalInfo->pBQ = NULL;
1,416✔
1611
  return code;
1,416✔
1612
}
1613

1614
static int32_t resetIntervalOperState(SOperatorInfo* pOper) {
1,416✔
1615
  SIntervalAggOperatorInfo* pInfo = pOper->info;
1,416✔
1616
  return resetInterval(pOper, pInfo);
1,416✔
1617
}
1618

1619
int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
12,235,530✔
1620
                                   SOperatorInfo** pOptrInfo) {
1621
  QRY_PARAM_CHECK(pOptrInfo);
12,235,530✔
1622

1623
  int32_t                   code = TSDB_CODE_SUCCESS;
12,240,149✔
1624
  int32_t                   lino = 0;
12,240,149✔
1625
  SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
12,240,149✔
1626
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
12,181,184✔
1627
  if (pInfo == NULL || pOperator == NULL) {
12,190,635✔
1628
    code = terrno;
×
1629
    lino = __LINE__;
×
1630
    goto _error;
×
1631
  }
1632

1633
  pOperator->pPhyNode = pPhyNode;
12,191,487✔
1634
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->window.node.pOutputDataBlockDesc);
12,201,740✔
1635
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
12,247,413✔
1636
  initBasicInfo(&pInfo->binfo, pResBlock);
12,247,413✔
1637

1638
  SExprSupp* pSup = &pOperator->exprSupp;
12,230,682✔
1639
  pSup->hasWindowOrGroup = true;
12,231,509✔
1640
  pSup->hasWindow = true;
12,229,970✔
1641

1642
  pInfo->primaryTsIndex = ((SColumnNode*)pPhyNode->window.pTspk)->slotId;
12,234,962✔
1643

1644
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
12,231,169✔
1645
  initResultSizeInfo(&pOperator->resultInfo, 512);
12,231,169✔
1646
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
12,228,358✔
1647
  QUERY_CHECK_CODE(code, lino, _error);
12,234,834✔
1648

1649
  int32_t    num = 0;
12,234,834✔
1650
  SExprInfo* pExprInfo = NULL;
12,233,414✔
1651
  code = createExprInfo(pPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
12,236,254✔
1652
  QUERY_CHECK_CODE(code, lino, _error);
12,240,678✔
1653

1654
  code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
12,240,678✔
1655
                    &pTaskInfo->storageAPI.functionStore);
1656
  QUERY_CHECK_CODE(code, lino, _error);
12,222,555✔
1657

1658
  SInterval interval = {.interval = pPhyNode->interval,
36,639,106✔
1659
                        .sliding = pPhyNode->sliding,
12,209,686✔
1660
                        .intervalUnit = pPhyNode->intervalUnit,
12,223,613✔
1661
                        .slidingUnit = pPhyNode->slidingUnit,
12,192,143✔
1662
                        .offset = pPhyNode->offset,
12,185,027✔
1663
                        .precision = ((SColumnNode*)pPhyNode->window.pTspk)->node.resType.precision,
12,211,362✔
1664
                        .timeRange = pPhyNode->timeRange};
1665
  calcIntervalAutoOffset(&interval);
12,227,414✔
1666

1667
  STimeWindowAggSupp as = {
12,189,223✔
1668
      .maxTs = INT64_MIN,
1669
  };
1670

1671
  pInfo->win = pTaskInfo->window;
12,189,223✔
1672
  pInfo->binfo.inputTsOrder = pPhyNode->window.node.inputTsOrder;
12,197,513✔
1673
  pInfo->binfo.outputTsOrder = pPhyNode->window.node.outputTsOrder;
12,235,814✔
1674
  pInfo->interval = interval;
12,190,741✔
1675
  pInfo->twAggSup = as;
12,196,968✔
1676
  pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock;
12,238,041✔
1677
  if (pPhyNode->window.node.pLimit && ((SLimitNode*)pPhyNode->window.node.pLimit)->limit) {
12,186,052✔
1678
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pLimit;
976,589✔
1679
    pInfo->limited = true;
977,487✔
1680
    pInfo->limit = pLimit->limit->datum.i + (pLimit->offset ? pLimit->offset->datum.i : 0);
977,076✔
1681
  }
1682
  if (pPhyNode->window.node.pSlimit && ((SLimitNode*)pPhyNode->window.node.pSlimit)->limit) {
12,210,395✔
1683
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pSlimit;
68,153✔
1684
    pInfo->slimited = true;
68,564✔
1685
    pInfo->slimit = pLimit->limit->datum.i + (pLimit->offset ? pLimit->offset->datum.i : 0);
68,153✔
1686
    pInfo->curGroupId = UINT64_MAX;
68,153✔
1687
  }
1688

1689
  if (pPhyNode->window.pExprs != NULL) {
12,218,704✔
1690
    int32_t    numOfScalar = 0;
4,599,834✔
1691
    SExprInfo* pScalarExprInfo = NULL;
4,599,102✔
1692
    code = createExprInfo(pPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
4,582,952✔
1693
    QUERY_CHECK_CODE(code, lino, _error);
4,602,554✔
1694

1695
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
4,053,416✔
1696
    if (code != TSDB_CODE_SUCCESS) {
4,051,952✔
1697
      goto _error;
×
1698
    }
1699
  }
1700

1701
  code = filterInitFromNode((SNode*)pPhyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
11,683,223✔
1702
                            pTaskInfo->pStreamRuntimeInfo);
11,692,996✔
1703
  if (code != TSDB_CODE_SUCCESS) {
11,662,114✔
1704
    goto _error;
×
1705
  }
1706

1707
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
11,662,114✔
1708
  QUERY_CHECK_CODE(code, lino, _error);
11,671,462✔
1709

1710
  pInfo->timeWindowInterpo = false;
11,671,462✔
1711
  code = timeWindowinterpNeeded(pSup->pCtx, num, pInfo, &pInfo->timeWindowInterpo);
11,676,820✔
1712
  QUERY_CHECK_CODE(code, lino, _error);
11,666,342✔
1713
  if (pInfo->timeWindowInterpo) {
11,666,342✔
1714
    pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
15,399✔
1715
    if (pInfo->binfo.resultRowInfo.openWindow == NULL) {
15,399✔
1716
      goto _error;
×
1717
    }
1718
  }
1719

1720
  pInfo->pOperator = pOperator;
11,670,849✔
1721
  pInfo->cleanGroupResInfo = false;
11,685,457✔
1722
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
11,675,738✔
1723
  setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED,
11,684,545✔
1724
                  pInfo, pTaskInfo);
1725

1726
  pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResultNext, NULL, destroyIntervalOperatorInfo,
11,682,396✔
1727
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1728
  setOperatorResetStateFn(pOperator, resetIntervalOperState);
11,660,747✔
1729
  code = appendDownstream(pOperator, &downstream, 1);
11,676,829✔
1730
  if (code != TSDB_CODE_SUCCESS) {
11,656,237✔
1731
    goto _error;
×
1732
  }
1733

1734
  *pOptrInfo = pOperator;
11,656,237✔
1735
  return TSDB_CODE_SUCCESS;
11,676,396✔
1736

1737
_error:
549,138✔
1738
  if (pInfo != NULL) {
549,138✔
1739
    destroyIntervalOperatorInfo(pInfo);
549,138✔
1740
  }
1741

1742
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
548,570✔
1743
  pTaskInfo->code = code;
548,854✔
1744
  qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code));
548,854✔
1745
  return code;
549,138✔
1746
}
1747

1748
// todo handle multiple timeline cases. assume no timeline interweaving
1749
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) {
2,022,928✔
1750
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2,022,928✔
1751
  SExprSupp*     pSup = &pOperator->exprSupp;
2,022,928✔
1752

1753
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
2,022,928✔
1754
  if (!pColInfoData) {
2,022,928✔
1755
    pTaskInfo->code = terrno;
×
1756
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1757
  }
1758

1759
  bool    masterScan = true;
2,022,928✔
1760
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
2,022,928✔
1761
  int64_t gid = pBlock->info.id.groupId;
2,022,928✔
1762

1763
  int64_t gap = pInfo->gap;
2,022,928✔
1764

1765
  if (!pInfo->reptScan) {
2,022,928✔
1766
    pInfo->reptScan = true;
1,651,931✔
1767
    pInfo->winSup.prevTs = INT64_MIN;
1,651,931✔
1768
  }
1769

1770
  SWindowRowsSup* pRowSup = &pInfo->winSup;
2,022,928✔
1771
  pRowSup->numOfRows = 0;
2,022,928✔
1772
  pRowSup->startRowIndex = 0;
2,022,928✔
1773

1774
  // In case of ascending or descending order scan data, only one time window needs to be kepted for each table.
1775
  TSKEY* tsList = (TSKEY*)pColInfoData->pData;
2,022,928✔
1776
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
396,481,596✔
1777
    if (gid != pRowSup->groupId || pInfo->winSup.prevTs == INT64_MIN) {
394,458,668✔
1778
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
1,656,110✔
1779
      doKeepTuple(pRowSup, tsList[j], j, gid);
1,656,110✔
1780
    } else if (((tsList[j] - pRowSup->prevTs >= 0) && (tsList[j] - pRowSup->prevTs <= gap)) ||
392,802,558✔
1781
               ((pRowSup->prevTs - tsList[j] >= 0) && (pRowSup->prevTs - tsList[j] <= gap))) {
118,405,608✔
1782
      // The gap is less than the threshold, so it belongs to current session window that has been opened already.
1783
      doKeepTuple(pRowSup, tsList[j], j, gid);
274,441,997✔
1784
    } else {  // start a new session window
1785
      // start a new session window
1786
      if (pRowSup->numOfRows > 0) {  // handled data that belongs to the previous session window
118,360,561✔
1787
        SResultRow* pResult = NULL;
118,236,461✔
1788

1789
        // keep the time window for the closed time window.
1790
        STimeWindow window = pRowSup->win;
118,236,461✔
1791

1792
        int32_t ret =
1793
            setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
118,236,461✔
1794
                                   numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1795
        if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
118,236,461✔
1796
          T_LONG_JMP(pTaskInfo->env, ret);
×
1797
        }
1798

1799
        // pInfo->numOfRows data belong to the current session window
1800
        updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
118,236,461✔
1801
        ret =
1802
            applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData,
118,236,461✔
1803
                                            pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
118,236,461✔
1804
        if (ret != TSDB_CODE_SUCCESS) {
118,236,461✔
1805
          T_LONG_JMP(pTaskInfo->env, ret);
×
1806
        }
1807
      }
1808

1809
      // here we start a new session window
1810
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
118,360,561✔
1811
      doKeepTuple(pRowSup, tsList[j], j, gid);
118,360,561✔
1812
    }
1813
  }
1814

1815
  SResultRow* pResult = NULL;
2,022,928✔
1816
  pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
2,022,928✔
1817
  int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
2,022,928✔
1818
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1819
  if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
2,022,928✔
1820
    T_LONG_JMP(pTaskInfo->env, ret);
×
1821
  }
1822

1823
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
2,022,928✔
1824
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
2,022,928✔
1825
                                        pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
2,022,928✔
1826
  if (ret != TSDB_CODE_SUCCESS) {
2,022,928✔
1827
    T_LONG_JMP(pTaskInfo->env, ret);
×
1828
  }
1829
}
2,022,928✔
1830

1831
static int32_t doSessionWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
3,612,354✔
1832
  if (pOperator->status == OP_EXEC_DONE) {
3,612,354✔
1833
    (*ppRes) = NULL;
1,179,027✔
1834
    return TSDB_CODE_SUCCESS;
1,179,027✔
1835
  }
1836

1837
  int32_t                  code = TSDB_CODE_SUCCESS;
2,433,327✔
1838
  int32_t                  lino = 0;
2,433,327✔
1839
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
2,433,327✔
1840
  SSessionAggOperatorInfo* pInfo = pOperator->info;
2,433,327✔
1841
  SOptrBasicInfo*          pBInfo = &pInfo->binfo;
2,433,327✔
1842
  SExprSupp*               pSup = &pOperator->exprSupp;
2,433,327✔
1843

1844
  if (pOperator->status == OP_RES_TO_RETURN) {
2,433,327✔
1845
    while (1) {
×
1846
      doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
4,179✔
1847
      code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
4,179✔
1848
      QUERY_CHECK_CODE(code, lino, _end);
4,179✔
1849

1850
      bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
4,179✔
1851
      if (!hasRemain) {
4,179✔
1852
        setOperatorCompleted(pOperator);
2,751✔
1853
        break;
2,751✔
1854
      }
1855

1856
      if (pBInfo->pRes->info.rows > 0) {
1,428✔
1857
        break;
1,428✔
1858
      }
1859
    }
1860
    pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
4,179✔
1861
    (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
4,179✔
1862
    return code;
4,179✔
1863
  }
1864

1865
  int64_t st = taosGetTimestampUs();
2,429,148✔
1866
  int32_t order = pInfo->binfo.inputTsOrder;
2,429,148✔
1867

1868
  SOperatorInfo* downstream = pOperator->pDownstream[0];
2,429,148✔
1869

1870
  pInfo->cleanGroupResInfo = false;
2,429,148✔
1871
  while (1) {
2,022,928✔
1872
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
4,452,076✔
1873
    if (pBlock == NULL) {
4,452,076✔
1874
      break;
2,429,148✔
1875
    }
1876

1877
    pBInfo->pRes->info.scanFlag = pBlock->info.scanFlag;
2,022,928✔
1878
    if (pInfo->scalarSupp.pExprInfo != NULL) {
2,022,928✔
1879
      SExprSupp* pExprSup = &pInfo->scalarSupp;
1,347✔
1880
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
1,347✔
1881
                                   GET_STM_RTINFO(pOperator->pTaskInfo));
1,347✔
1882
      QUERY_CHECK_CODE(code, lino, _end);
1,347✔
1883
    }
1884
    // the pDataBlock are always the same one, no need to call this again
1885
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
2,022,928✔
1886
    QUERY_CHECK_CODE(code, lino, _end);
2,022,928✔
1887

1888
    code = blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
2,022,928✔
1889
    QUERY_CHECK_CODE(code, lino, _end);
2,022,928✔
1890

1891
    doSessionWindowAggImpl(pOperator, pInfo, pBlock);
2,022,928✔
1892
  }
1893

1894
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
2,429,148✔
1895

1896
  // restore the value
1897
  pOperator->status = OP_RES_TO_RETURN;
2,429,148✔
1898

1899
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
2,429,148✔
1900
  QUERY_CHECK_CODE(code, lino, _end);
2,429,148✔
1901
  pInfo->cleanGroupResInfo = true;
2,429,148✔
1902

1903
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
2,429,148✔
1904
  QUERY_CHECK_CODE(code, lino, _end);
2,429,148✔
1905
  while (1) {
×
1906
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
2,429,148✔
1907
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
2,429,148✔
1908
    QUERY_CHECK_CODE(code, lino, _end);
2,429,148✔
1909

1910
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
2,429,148✔
1911
    if (!hasRemain) {
2,429,148✔
1912
      setOperatorCompleted(pOperator);
2,421,348✔
1913
      break;
2,421,348✔
1914
    }
1915

1916
    if (pBInfo->pRes->info.rows > 0) {
7,800✔
1917
      break;
7,800✔
1918
    }
1919
  }
1920
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
2,429,148✔
1921

1922
_end:
2,429,148✔
1923
  if (code != TSDB_CODE_SUCCESS) {
2,429,148✔
1924
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1925
    pTaskInfo->code = code;
×
1926
    T_LONG_JMP(pTaskInfo->env, code);
×
1927
  }
1928
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
2,429,148✔
1929
  return code;
2,429,148✔
1930
}
1931

1932
static int32_t resetStatewindowOperState(SOperatorInfo* pOper) {
40,944✔
1933
  SStateWindowOperatorInfo* pInfo = pOper->info;
40,944✔
1934
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
40,944✔
1935
  SStateWindowPhysiNode* pPhynode = (SStateWindowPhysiNode*)pOper->pPhyNode;
40,944✔
1936
  pOper->status = OP_NOT_OPENED;
40,944✔
1937

1938
  resetBasicOperatorState(&pInfo->binfo);
40,944✔
1939
  cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
40,944✔
1940
                    pInfo->cleanGroupResInfo);
40,578✔
1941

1942
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
40,944✔
1943
  int32_t code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
40,944✔
1944
  if (code == 0) {
40,944✔
1945
    code = resetAggSup(&pOper->exprSupp, &pInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
81,888✔
1946
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
40,944✔
1947
                       &pTaskInfo->storageAPI.functionStore);
1948
  }
1949
  if (code == 0) {
40,944✔
1950
    code = resetExprSupp(&pInfo->scalarSup, pTaskInfo, pPhynode->window.pExprs, NULL,
40,944✔
1951
                         &pTaskInfo->storageAPI.functionStore);
1952
  }
1953

1954
  pInfo->cleanGroupResInfo = false;
40,944✔
1955
  pInfo->hasKey = false;
40,944✔
1956
  pInfo->winSup.lastTs = INT64_MIN;
40,944✔
1957
  cleanupGroupResInfo(&pInfo->groupResInfo);
40,944✔
1958
  memset(pInfo->stateKey.pData, 0, pInfo->stateKey.bytes);
40,944✔
1959
  return code;
40,578✔
1960
}
1961

1962
// todo make this as an non-blocking operator
1963
int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWindowPhysiNode* pStateNode,
1,972,392✔
1964
                                      SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
1965
  QRY_PARAM_CHECK(pOptrInfo);
1,972,392✔
1966

1967
  int32_t                   code = TSDB_CODE_SUCCESS;
1,972,392✔
1968
  int32_t                   lino = 0;
1,972,392✔
1969
  SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
1,972,392✔
1970
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1,972,392✔
1971
  if (pInfo == NULL || pOperator == NULL) {
1,972,392✔
1972
    code = terrno;
×
1973
    goto _error;
×
1974
  }
1975

1976
  pOperator->pPhyNode = pStateNode;
1,972,392✔
1977
  pOperator->exprSupp.hasWindowOrGroup = true;
1,972,392✔
1978
  pOperator->exprSupp.hasWindow = true;
1,972,392✔
1979
  int32_t      tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
1,972,392✔
1980
  SColumnNode* pColNode = (SColumnNode*)(pStateNode->pStateKey);
1,972,392✔
1981

1982
  if (pStateNode->window.pExprs != NULL) {
1,972,392✔
1983
    int32_t    numOfScalarExpr = 0;
1,847,590✔
1984
    SExprInfo* pScalarExprInfo = NULL;
1,847,590✔
1985
    code = createExprInfo(pStateNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
1,847,590✔
1986
    QUERY_CHECK_CODE(code, lino, _error);
1,847,590✔
1987

1988
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
1,847,590✔
1989
    if (code != TSDB_CODE_SUCCESS) {
1,847,590✔
1990
      goto _error;
×
1991
    }
1992
  }
1993

1994
  pInfo->stateCol = extractColumnFromColumnNode(pColNode);
1,972,392✔
1995
  pInfo->stateKey.type = pInfo->stateCol.type;
1,972,392✔
1996
  pInfo->stateKey.bytes = pInfo->stateCol.bytes;
1,972,392✔
1997
  pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes);
1,972,392✔
1998
  if (pInfo->stateKey.pData == NULL) {
1,972,392✔
1999
    goto _error;
×
2000
  }
2001
  pInfo->binfo.inputTsOrder = pStateNode->window.node.inputTsOrder;
1,972,392✔
2002
  pInfo->binfo.outputTsOrder = pStateNode->window.node.outputTsOrder;
1,972,392✔
2003

2004
  code = filterInitFromNode((SNode*)pStateNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
1,972,392✔
2005
                            pTaskInfo->pStreamRuntimeInfo);
1,972,392✔
2006
  if (code != TSDB_CODE_SUCCESS) {
1,972,392✔
2007
    goto _error;
×
2008
  }
2009

2010
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
1,972,392✔
2011

2012
  int32_t    num = 0;
1,972,392✔
2013
  SExprInfo* pExprInfo = NULL;
1,972,392✔
2014
  code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &num);
1,972,392✔
2015
  QUERY_CHECK_CODE(code, lino, _error);
1,972,392✔
2016

2017
  initResultSizeInfo(&pOperator->resultInfo, 4096);
1,972,392✔
2018

2019
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
3,944,784✔
2020
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
1,972,392✔
2021
  if (code != TSDB_CODE_SUCCESS) {
1,972,392✔
2022
    goto _error;
×
2023
  }
2024

2025
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pStateNode->window.node.pOutputDataBlockDesc);
1,972,392✔
2026
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
1,972,392✔
2027
  initBasicInfo(&pInfo->binfo, pResBlock);
1,972,392✔
2028
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
1,972,392✔
2029

2030
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
1,972,392✔
2031
  QUERY_CHECK_CODE(code, lino, _error);
1,972,392✔
2032

2033
  pInfo->tsSlotId = tsSlotId;
1,972,392✔
2034
  pInfo->pOperator = pOperator;
1,972,392✔
2035
  pInfo->cleanGroupResInfo = false;
1,972,392✔
2036
  pInfo->extendOption = pStateNode->extendOption;
1,972,392✔
2037
  pInfo->trueForLimit = pStateNode->trueForLimit;
1,972,392✔
2038
  pInfo->winSup.lastTs = INT64_MIN;
1,972,392✔
2039

2040
  setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
1,972,392✔
2041
                  pTaskInfo);
2042
  pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAggNext, NULL, destroyStateWindowOperatorInfo,
1,972,392✔
2043
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2044
  setOperatorResetStateFn(pOperator, resetStatewindowOperState);
1,972,392✔
2045

2046
  code = appendDownstream(pOperator, &downstream, 1);
1,972,392✔
2047
  if (code != TSDB_CODE_SUCCESS) {
1,972,392✔
2048
    goto _error;
×
2049
  }
2050

2051
  *pOptrInfo = pOperator;
1,972,392✔
2052
  return TSDB_CODE_SUCCESS;
1,972,392✔
2053

2054
_error:
×
2055
  if (pInfo != NULL) {
×
2056
    destroyStateWindowOperatorInfo(pInfo);
×
2057
  }
2058

2059
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2060
  pTaskInfo->code = code;
×
2061
  return code;
×
2062
}
2063

2064
void destroySWindowOperatorInfo(void* param) {
2,492,268✔
2065
  SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param;
2,492,268✔
2066
  if (pInfo == NULL) {
2,492,268✔
2067
    return;
×
2068
  }
2069

2070
  cleanupBasicInfo(&pInfo->binfo);
2,492,268✔
2071
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
2,492,268✔
2072
  if (pInfo->pOperator) {
2,492,268✔
2073
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
2,492,268✔
2074
                      pInfo->cleanGroupResInfo);
2,492,268✔
2075
    pInfo->pOperator = NULL;
2,492,268✔
2076
  }
2077

2078
  cleanupAggSup(&pInfo->aggSup);
2,492,268✔
2079
  cleanupExprSupp(&pInfo->scalarSupp);
2,492,268✔
2080

2081
  cleanupGroupResInfo(&pInfo->groupResInfo);
2,492,268✔
2082
  taosMemoryFreeClear(param);
2,492,268✔
2083
}
2084

2085
static int32_t resetSessionAggOperState(SOperatorInfo* pOper) {
1,416✔
2086
  SSessionAggOperatorInfo* pInfo = pOper->info;
1,416✔
2087
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
1,416✔
2088
  SSessionWinodwPhysiNode* pPhynode = (SSessionWinodwPhysiNode*)pOper->pPhyNode;
1,416✔
2089
  pOper->status = OP_NOT_OPENED;
1,416✔
2090

2091
  resetBasicOperatorState(&pInfo->binfo);
1,416✔
2092
  cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
1,416✔
2093
                    pInfo->cleanGroupResInfo);
1,416✔
2094

2095
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
1,416✔
2096
  int32_t code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
1,416✔
2097
  if (code == 0) {
1,416✔
2098
    code = resetAggSup(&pOper->exprSupp, &pInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
2,832✔
2099
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
1,416✔
2100
                       &pTaskInfo->storageAPI.functionStore);
2101
  }
2102
  if (code == 0) {
1,416✔
2103
    code = resetExprSupp(&pInfo->scalarSupp, pTaskInfo, pPhynode->window.pExprs, NULL,
1,416✔
2104
                         &pTaskInfo->storageAPI.functionStore);
2105
  }
2106

2107
  pInfo->cleanGroupResInfo = false;
1,416✔
2108
  pInfo->winSup = (SWindowRowsSup){0};
1,416✔
2109
  pInfo->winSup.prevTs = INT64_MIN;
1,416✔
2110
  pInfo->reptScan = false;
1,416✔
2111

2112
  cleanupGroupResInfo(&pInfo->groupResInfo);
1,416✔
2113
  return code;
1,416✔
2114
}
2115

2116
int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
2,492,268✔
2117
                                     SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2118
  QRY_PARAM_CHECK(pOptrInfo);
2,492,268✔
2119

2120
  int32_t                  code = TSDB_CODE_SUCCESS;
2,492,268✔
2121
  int32_t                  lino = 0;
2,492,268✔
2122
  SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
2,492,268✔
2123
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2,492,268✔
2124
  if (pInfo == NULL || pOperator == NULL) {
2,492,268✔
2125
    code = terrno;
×
2126
    goto _error;
×
2127
  }
2128

2129
  pOperator->pPhyNode = pSessionNode;
2,492,268✔
2130
  pOperator->exprSupp.hasWindowOrGroup = true;
2,492,268✔
2131
  pOperator->exprSupp.hasWindow = true;
2,492,268✔
2132

2133
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
2,492,268✔
2134
  initResultSizeInfo(&pOperator->resultInfo, 4096);
2,492,268✔
2135

2136
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc);
2,492,268✔
2137
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
2,492,268✔
2138
  initBasicInfo(&pInfo->binfo, pResBlock);
2,492,268✔
2139

2140
  int32_t    numOfCols = 0;
2,492,268✔
2141
  SExprInfo* pExprInfo = NULL;
2,492,268✔
2142
  code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
2,492,268✔
2143
  QUERY_CHECK_CODE(code, lino, _error);
2,492,268✔
2144

2145
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
4,984,536✔
2146
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
2,492,268✔
2147
  QUERY_CHECK_CODE(code, lino, _error);
2,492,268✔
2148

2149
  pInfo->gap = pSessionNode->gap;
2,492,268✔
2150

2151
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
2,492,268✔
2152
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
2,492,268✔
2153
  QUERY_CHECK_CODE(code, lino, _error);
2,492,268✔
2154

2155
  pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
2,492,268✔
2156
  pInfo->binfo.pRes = pResBlock;
2,492,268✔
2157
  pInfo->winSup.prevTs = INT64_MIN;
2,492,268✔
2158
  pInfo->reptScan = false;
2,492,268✔
2159
  pInfo->binfo.inputTsOrder = pSessionNode->window.node.inputTsOrder;
2,492,268✔
2160
  pInfo->binfo.outputTsOrder = pSessionNode->window.node.outputTsOrder;
2,492,268✔
2161

2162
  if (pSessionNode->window.pExprs != NULL) {
2,492,268✔
2163
    int32_t    numOfScalar = 0;
449✔
2164
    SExprInfo* pScalarExprInfo = NULL;
449✔
2165
    code = createExprInfo(pSessionNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
449✔
2166
    QUERY_CHECK_CODE(code, lino, _error);
449✔
2167

2168
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
449✔
2169
    QUERY_CHECK_CODE(code, lino, _error);
449✔
2170
  }
2171

2172
  code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
2,492,268✔
2173
                            pTaskInfo->pStreamRuntimeInfo);
2,492,268✔
2174
  QUERY_CHECK_CODE(code, lino, _error);
2,492,268✔
2175

2176
  pInfo->pOperator = pOperator;
2,492,268✔
2177
  pInfo->cleanGroupResInfo = false;
2,492,268✔
2178
  setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED,
2,492,268✔
2179
                  pInfo, pTaskInfo);
2180
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSessionWindowAggNext, NULL, destroySWindowOperatorInfo,
2,492,268✔
2181
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2182
  pOperator->pTaskInfo = pTaskInfo;
2,492,268✔
2183
  setOperatorResetStateFn(pOperator, resetSessionAggOperState);
2,492,268✔
2184

2185
  code = appendDownstream(pOperator, &downstream, 1);
2,492,268✔
2186
  QUERY_CHECK_CODE(code, lino, _error);
2,492,268✔
2187

2188
  *pOptrInfo = pOperator;
2,492,268✔
2189
  return TSDB_CODE_SUCCESS;
2,492,268✔
2190

2191
_error:
×
2192
  if (pInfo != NULL) destroySWindowOperatorInfo(pInfo);
×
2193
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2194
  pTaskInfo->code = code;
×
2195
  return code;
×
2196
}
2197

2198
void destroyMAIOperatorInfo(void* param) {
1,232,298✔
2199
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = (SMergeAlignedIntervalAggOperatorInfo*)param;
1,232,298✔
2200
  destroyIntervalOperatorInfo(miaInfo->intervalAggOperatorInfo);
1,232,298✔
2201
  taosMemoryFreeClear(param);
1,232,298✔
2202
}
1,232,298✔
2203

2204
static SResultRow* doSetSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, SAggSupporter* pSup) {
1,143,590✔
2205
  SResultRow* pResult = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
1,143,590✔
2206
  if (NULL == pResult) {
1,143,590✔
2207
    return pResult;
×
2208
  }
2209
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
1,143,590✔
2210
  return pResult;
1,143,590✔
2211
}
2212

2213
static int32_t setSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult,
858,696,536✔
2214
                                       SExprSupp* pExprSup, SAggSupporter* pAggSup) {
2215
  if (*pResult == NULL) {
858,696,536✔
2216
    *pResult = doSetSingleOutputTupleBuf(pResultRowInfo, pAggSup);
1,143,590✔
2217
    if (*pResult == NULL) {
1,143,590✔
2218
      return terrno;
×
2219
    }
2220
  }
2221

2222
  // set time window for current result
2223
  (*pResult)->win = (*win);
858,696,536✔
2224
  return setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
858,696,536✔
2225
}
2226

2227
static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,
4,404,993✔
2228
                                          SSDataBlock* pBlock, SSDataBlock* pResultBlock) {
2229
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
4,404,993✔
2230
  SIntervalAggOperatorInfo*             iaInfo = miaInfo->intervalAggOperatorInfo;
4,404,993✔
2231

2232
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
4,404,993✔
2233
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
4,404,993✔
2234
  SInterval*     pInterval = &iaInfo->interval;
4,404,993✔
2235

2236
  int32_t  startPos = 0;
4,404,993✔
2237
  int64_t* tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
4,404,993✔
2238

2239
  TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
4,404,993✔
2240

2241
  // there is an result exists
2242
  if (miaInfo->curTs != INT64_MIN) {
4,404,993✔
2243
    if (ts != miaInfo->curTs) {
1,173,384✔
2244
      finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
1,124,740✔
2245
      resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
1,124,740✔
2246
      miaInfo->curTs = ts;
1,124,740✔
2247
    }
2248
  } else {
2249
    miaInfo->curTs = ts;
3,231,609✔
2250
  }
2251

2252
  STimeWindow win = {0};
4,404,993✔
2253
  win.skey = miaInfo->curTs;
4,404,993✔
2254
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
4,404,993✔
2255

2256
  int32_t ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
4,404,993✔
2257
  if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
4,404,993✔
2258
    T_LONG_JMP(pTaskInfo->env, ret);
×
2259
  }
2260

2261
  int32_t currPos = startPos;
4,404,993✔
2262

2263
  STimeWindow currWin = win;
4,404,993✔
2264
  while (++currPos < pBlock->info.rows) {
1,808,423,733✔
2265
    if (tsCols[currPos] == miaInfo->curTs) {
1,804,003,820✔
2266
      continue;
949,727,570✔
2267
    }
2268

2269
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
854,281,845✔
2270
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
1,708,570,404✔
2271
                                          currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
854,285,202✔
2272
    if (ret != TSDB_CODE_SUCCESS) {
854,285,202✔
2273
      T_LONG_JMP(pTaskInfo->env, ret);
×
2274
    }
2275

2276
    finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
854,285,202✔
2277
    resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
854,292,289✔
2278
    miaInfo->curTs = tsCols[currPos];
854,292,289✔
2279

2280
    currWin.skey = miaInfo->curTs;
854,292,289✔
2281
    currWin.ekey =
854,291,543✔
2282
        taosTimeAdd(currWin.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
854,292,289✔
2283

2284
    startPos = currPos;
854,291,543✔
2285
    ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
854,291,543✔
2286
    if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
854,291,170✔
2287
      T_LONG_JMP(pTaskInfo->env, ret);
×
2288
    }
2289

2290
    miaInfo->curTs = currWin.skey;
854,291,170✔
2291
  }
2292

2293
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
4,404,993✔
2294
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
8,809,986✔
2295
                                        currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
4,404,993✔
2296
  if (ret != TSDB_CODE_SUCCESS) {
4,404,993✔
2297
    T_LONG_JMP(pTaskInfo->env, ret);
×
2298
  }
2299
}
4,404,993✔
2300

2301
static void cleanupAfterGroupResultGen(SMergeAlignedIntervalAggOperatorInfo* pMiaInfo, SSDataBlock* pRes) {
3,223,074✔
2302
  pRes->info.id.groupId = pMiaInfo->groupId;
3,223,074✔
2303
  pMiaInfo->curTs = INT64_MIN;
3,223,074✔
2304
  pMiaInfo->groupId = 0;
3,223,074✔
2305
}
3,223,074✔
2306

2307
static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
4,061,103✔
2308
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
4,061,103✔
2309
  int32_t                               code = TSDB_CODE_SUCCESS;
4,061,103✔
2310
  int32_t                               lino = 0;
4,061,103✔
2311
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
4,061,103✔
2312
  SIntervalAggOperatorInfo*             pIaInfo = pMiaInfo->intervalAggOperatorInfo;
4,061,103✔
2313

2314
  SExprSupp*      pSup = &pOperator->exprSupp;
4,061,103✔
2315
  SSDataBlock*    pRes = pIaInfo->binfo.pRes;
4,061,103✔
2316
  SResultRowInfo* pResultRowInfo = &pIaInfo->binfo.resultRowInfo;
4,061,103✔
2317
  SOperatorInfo*  downstream = pOperator->pDownstream[0];
4,061,103✔
2318

2319
  while (1) {
3,636,256✔
2320
    SSDataBlock* pBlock = NULL;
7,697,359✔
2321
    if (pMiaInfo->prefetchedBlock == NULL) {
7,697,359✔
2322
      pBlock = getNextBlockFromDownstream(pOperator, 0);
5,609,340✔
2323
    } else {
2324
      pBlock = pMiaInfo->prefetchedBlock;
2,088,019✔
2325
      pMiaInfo->prefetchedBlock = NULL;
2,088,019✔
2326

2327
      pMiaInfo->groupId = pBlock->info.id.groupId;
2,088,019✔
2328
    }
2329

2330
    // no data exists, all query processing is done
2331
    if (pBlock == NULL) {
7,697,359✔
2332
      // close last unclosed time window
2333
      if (pMiaInfo->curTs != INT64_MIN) {
1,204,347✔
2334
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
1,135,055✔
2335
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
1,135,055✔
2336
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
1,135,055✔
2337
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
1,135,055✔
2338
        QUERY_CHECK_CODE(code, lino, _end);
1,135,055✔
2339
      }
2340

2341
      setOperatorCompleted(pOperator);
1,204,347✔
2342
      break;
1,204,347✔
2343
    }
2344

2345
    if (pMiaInfo->groupId == 0) {
6,493,012✔
2346
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
1,748,314✔
2347
        pMiaInfo->groupId = pBlock->info.id.groupId;
215,056✔
2348
        pRes->info.id.groupId = pMiaInfo->groupId;
215,056✔
2349
      }
2350
    } else {
2351
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
4,744,698✔
2352
        // if there are unclosed time window, close it firstly.
2353
        if (pMiaInfo->curTs == INT64_MIN) {
2,088,019✔
2354
          pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2355
          T_LONG_JMP(pTaskInfo->env, terrno);
×
2356
        }
2357
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
2,088,019✔
2358
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
2,088,019✔
2359

2360
        pMiaInfo->prefetchedBlock = pBlock;
2,088,019✔
2361
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
2,088,019✔
2362
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
2,088,019✔
2363
        QUERY_CHECK_CODE(code, lino, _end);
2,088,019✔
2364
        if (pRes->info.rows == 0) {
2,088,019✔
2365
          // After filtering for last group, the result is empty, so we need to continue to process next group
2366
          continue;
12,682✔
2367
        } else {
2368
          break;
2,075,337✔
2369
        }
2370
      } else {
2371
        // continue
2372
        pRes->info.id.groupId = pMiaInfo->groupId;
2,656,679✔
2373
      }
2374
    }
2375

2376
    pRes->info.scanFlag = pBlock->info.scanFlag;
4,404,993✔
2377
    code = setInputDataBlock(pSup, pBlock, pIaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
4,404,993✔
2378
    QUERY_CHECK_CODE(code, lino, _end);
4,404,993✔
2379

2380
    doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes);
4,404,993✔
2381

2382
    code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
4,404,993✔
2383
    QUERY_CHECK_CODE(code, lino, _end);
4,404,993✔
2384

2385
    if (pRes->info.rows >= pOperator->resultInfo.capacity) {
4,404,993✔
2386
      break;
781,419✔
2387
    }
2388
  }
2389

2390
_end:
4,061,103✔
2391
  if (code != TSDB_CODE_SUCCESS) {
4,061,103✔
2392
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2393
    pTaskInfo->code = code;
×
2394
    T_LONG_JMP(pTaskInfo->env, code);
×
2395
  }
2396
}
4,061,103✔
2397

2398
static int32_t mergeAlignedIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
4,361,175✔
2399
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
4,361,175✔
2400
  int32_t                               code = TSDB_CODE_SUCCESS;
4,361,175✔
2401
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
4,361,175✔
2402
  SIntervalAggOperatorInfo*             iaInfo = pMiaInfo->intervalAggOperatorInfo;
4,361,175✔
2403
  if (pOperator->status == OP_EXEC_DONE) {
4,361,175✔
2404
    (*ppRes) = NULL;
1,150,666✔
2405
    return code;
1,150,666✔
2406
  }
2407

2408
  SSDataBlock* pRes = iaInfo->binfo.pRes;
3,210,509✔
2409
  blockDataCleanup(pRes);
3,210,509✔
2410

2411
  if (iaInfo->binfo.mergeResultBlock) {
3,210,509✔
2412
    while (1) {
2413
      if (pOperator->status == OP_EXEC_DONE) {
4,178,098✔
2414
        break;
870,560✔
2415
      }
2416

2417
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
3,307,538✔
2418
        break;
793,192✔
2419
      }
2420

2421
      doMergeAlignedIntervalAgg(pOperator);
2,514,346✔
2422
    }
2423
  } else {
2424
    doMergeAlignedIntervalAgg(pOperator);
1,546,757✔
2425
  }
2426

2427
  size_t rows = pRes->info.rows;
3,210,509✔
2428
  pOperator->resultInfo.totalRows += rows;
3,210,509✔
2429
  (*ppRes) = (rows == 0) ? NULL : pRes;
3,210,509✔
2430
  return code;
3,210,509✔
2431
}
2432

2433
static int32_t resetMergeAlignedIntervalOperState(SOperatorInfo* pOper) {
×
2434
  SMergeAlignedIntervalAggOperatorInfo* pInfo = pOper->info;
×
2435
  
2436
  uint64_t     groupId;  // current groupId
2437
  int64_t      curTs;    // current ts
2438
  SSDataBlock* prefetchedBlock;
2439
  SResultRow*  pResultRow;
2440

2441
  pInfo->groupId = 0;
×
2442
  pInfo->curTs = INT64_MIN;
×
2443
  pInfo->prefetchedBlock = NULL;
×
2444
  pInfo->pResultRow = NULL;
×
2445

2446
  return resetInterval(pOper, pInfo->intervalAggOperatorInfo);
×
2447
}
2448

2449
int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
1,232,298✔
2450
                                               SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2451
  QRY_PARAM_CHECK(pOptrInfo);
1,232,298✔
2452

2453
  int32_t                               code = TSDB_CODE_SUCCESS;
1,232,298✔
2454
  int32_t                               lino = 0;
1,232,298✔
2455
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedIntervalAggOperatorInfo));
1,232,298✔
2456
  SOperatorInfo*                        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1,232,298✔
2457
  if (miaInfo == NULL || pOperator == NULL) {
1,232,298✔
2458
    code = terrno;
×
2459
    goto _error;
×
2460
  }
2461

2462
  pOperator->pPhyNode = pNode;
1,232,298✔
2463
  miaInfo->intervalAggOperatorInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
1,232,298✔
2464
  if (miaInfo->intervalAggOperatorInfo == NULL) {
1,232,298✔
2465
    code = terrno;
×
2466
    goto _error;
×
2467
  }
2468

2469
  SInterval interval = {.interval = pNode->interval,
3,696,894✔
2470
                        .sliding = pNode->sliding,
1,232,298✔
2471
                        .intervalUnit = pNode->intervalUnit,
1,232,298✔
2472
                        .slidingUnit = pNode->slidingUnit,
1,232,298✔
2473
                        .offset = pNode->offset,
1,232,298✔
2474
                        .precision = ((SColumnNode*)pNode->window.pTspk)->node.resType.precision,
1,232,298✔
2475
                        .timeRange = pNode->timeRange};
2476
  calcIntervalAutoOffset(&interval);
1,232,298✔
2477

2478
  SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
1,232,298✔
2479
  SExprSupp*                pSup = &pOperator->exprSupp;
1,232,298✔
2480
  pSup->hasWindowOrGroup = true;
1,232,298✔
2481
  pSup->hasWindow = true;
1,232,298✔
2482

2483
  code = filterInitFromNode((SNode*)pNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
1,232,298✔
2484
                            pTaskInfo->pStreamRuntimeInfo);
1,232,298✔
2485
  QUERY_CHECK_CODE(code, lino, _error);
1,232,298✔
2486

2487
  miaInfo->curTs = INT64_MIN;
1,232,298✔
2488
  iaInfo->win = pTaskInfo->window;
1,232,298✔
2489
  iaInfo->binfo.inputTsOrder = pNode->window.node.inputTsOrder;
1,232,298✔
2490
  iaInfo->binfo.outputTsOrder = pNode->window.node.outputTsOrder;
1,232,298✔
2491
  iaInfo->interval = interval;
1,232,298✔
2492
  iaInfo->primaryTsIndex = ((SColumnNode*)pNode->window.pTspk)->slotId;
1,232,298✔
2493
  iaInfo->binfo.mergeResultBlock = pNode->window.mergeDataBlock;
1,232,298✔
2494

2495
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
1,232,298✔
2496
  initResultSizeInfo(&pOperator->resultInfo, 512);
1,232,298✔
2497

2498
  int32_t    num = 0;
1,232,298✔
2499
  SExprInfo* pExprInfo = NULL;
1,232,298✔
2500
  code = createExprInfo(pNode->window.pFuncs, NULL, &pExprInfo, &num);
1,232,298✔
2501
  QUERY_CHECK_CODE(code, lino, _error);
1,232,298✔
2502

2503
  code = initAggSup(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
2,464,596✔
2504
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
1,232,298✔
2505
  QUERY_CHECK_CODE(code, lino, _error);
1,232,298✔
2506

2507
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pNode->window.node.pOutputDataBlockDesc);
1,232,298✔
2508
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
1,232,298✔
2509
  initBasicInfo(&iaInfo->binfo, pResBlock);
1,232,298✔
2510
  code = initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
1,232,298✔
2511
  QUERY_CHECK_CODE(code, lino, _error);
1,232,298✔
2512

2513
  iaInfo->timeWindowInterpo = false;
1,232,298✔
2514
  code = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo, &iaInfo->timeWindowInterpo);
1,232,298✔
2515
  QUERY_CHECK_CODE(code, lino, _error);
1,232,298✔
2516
  if (iaInfo->timeWindowInterpo) {
1,232,298✔
2517
    iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
2518
  }
2519

2520
  initResultRowInfo(&iaInfo->binfo.resultRowInfo);
1,232,298✔
2521
  code = blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity);
1,232,298✔
2522
  QUERY_CHECK_CODE(code, lino, _error);
1,232,298✔
2523
  iaInfo->pOperator = pOperator;
1,232,298✔
2524
  setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL,
1,232,298✔
2525
                  false, OP_NOT_OPENED, miaInfo, pTaskInfo);
2526

2527
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAggNext, NULL, destroyMAIOperatorInfo,
1,232,298✔
2528
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2529
  setOperatorResetStateFn(pOperator, resetMergeAlignedIntervalOperState);
1,232,298✔
2530

2531
  code = appendDownstream(pOperator, &downstream, 1);
1,232,298✔
2532
  QUERY_CHECK_CODE(code, lino, _error);
1,232,298✔
2533

2534
  *pOptrInfo = pOperator;
1,232,298✔
2535
  return TSDB_CODE_SUCCESS;
1,232,298✔
2536

2537
_error:
×
2538
  if (miaInfo != NULL) destroyMAIOperatorInfo(miaInfo);
×
2539
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2540
  pTaskInfo->code = code;
×
2541
  return code;
×
2542
}
2543

2544
//=====================================================================================================================
2545
// merge interval operator
2546
typedef struct SMergeIntervalAggOperatorInfo {
2547
  SIntervalAggOperatorInfo intervalAggOperatorInfo;
2548
  SList*                   groupIntervals;
2549
  SListIter                groupIntervalsIter;
2550
  bool                     hasGroupId;
2551
  uint64_t                 groupId;
2552
  SSDataBlock*             prefetchedBlock;
2553
  bool                     inputBlocksFinished;
2554
} SMergeIntervalAggOperatorInfo;
2555

2556
typedef struct SGroupTimeWindow {
2557
  uint64_t    groupId;
2558
  STimeWindow window;
2559
} SGroupTimeWindow;
2560

2561
void destroyMergeIntervalOperatorInfo(void* param) {
×
2562
  SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
×
2563
  miaInfo->groupIntervals = tdListFree(miaInfo->groupIntervals);
×
2564
  destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo);
×
2565

2566
  taosMemoryFreeClear(param);
×
2567
}
×
2568

2569
static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock,
×
2570
                                        STimeWindow* newWin) {
2571
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
2572
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2573
  bool                           ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
2574

2575
  SGroupTimeWindow groupTimeWindow = {.groupId = tableGroupId, .window = *newWin};
×
2576
  int32_t          code = tdListAppend(miaInfo->groupIntervals, &groupTimeWindow);
×
2577
  if (code != TSDB_CODE_SUCCESS) {
×
2578
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2579
    return code;
×
2580
  }
2581

2582
  SListIter iter = {0};
×
2583
  tdListInitIter(miaInfo->groupIntervals, &iter, TD_LIST_FORWARD);
×
2584
  SListNode* listNode = NULL;
×
2585
  while ((listNode = tdListNext(&iter)) != NULL) {
×
2586
    SGroupTimeWindow* prevGrpWin = (SGroupTimeWindow*)listNode->data;
×
2587
    if (prevGrpWin->groupId != tableGroupId) {
×
2588
      continue;
×
2589
    }
2590

2591
    STimeWindow* prevWin = &prevGrpWin->window;
×
2592
    if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) {
×
2593
      SListNode* tmp = tdListPopNode(miaInfo->groupIntervals, listNode);
×
2594
      taosMemoryFreeClear(tmp);
×
2595
    }
2596
  }
2597

2598
  return TSDB_CODE_SUCCESS;
×
2599
}
2600

2601
static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
×
2602
                                   int32_t scanFlag, SSDataBlock* pResultBlock) {
2603
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
2604
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2605

2606
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
×
2607
  SExprSupp*     pExprSup = &pOperatorInfo->exprSupp;
×
2608

2609
  int32_t     startPos = 0;
×
2610
  int32_t     numOfOutput = pExprSup->numOfExprs;
×
2611
  int64_t*    tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
×
2612
  uint64_t    tableGroupId = pBlock->info.id.groupId;
×
2613
  bool        ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
2614
  TSKEY       blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
×
2615
  SResultRow* pResult = NULL;
×
2616

2617
  STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
×
2618
                                        iaInfo->binfo.inputTsOrder);
2619

2620
  int32_t ret =
2621
      setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
2622
                             numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2623
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
×
2624
    T_LONG_JMP(pTaskInfo->env, ret);
×
2625
  }
2626

2627
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
×
2628
  int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
2629
                                                 iaInfo->binfo.inputTsOrder);
2630
  if (forwardRows <= 0) {
×
2631
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2632
  }
2633

2634
  // prev time window not interpolation yet.
2635
  if (iaInfo->timeWindowInterpo) {
×
2636
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
×
2637
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
×
2638

2639
    // restore current time window
2640
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
2641
                                 numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2642
    if (ret != TSDB_CODE_SUCCESS) {
×
2643
      T_LONG_JMP(pTaskInfo->env, ret);
×
2644
    }
2645

2646
    // window start key interpolation
2647
    ret = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &win, startPos, forwardRows, pExprSup);
×
2648
    if (ret != TSDB_CODE_SUCCESS) {
×
2649
      T_LONG_JMP(pTaskInfo->env, ret);
×
2650
    }
2651
  }
2652

2653
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, 1);
×
2654
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
×
2655
                                        forwardRows, pBlock->info.rows, numOfOutput);
×
2656
  if (ret != TSDB_CODE_SUCCESS) {
×
2657
    T_LONG_JMP(pTaskInfo->env, ret);
×
2658
  }
2659
  doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
2660

2661
  // output previous interval results after this interval (&win) is closed
2662
  int32_t code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &win);
×
2663
  if (code != TSDB_CODE_SUCCESS) {
×
2664
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2665
    T_LONG_JMP(pTaskInfo->env, code);
×
2666
  }
2667

2668
  STimeWindow nextWin = win;
×
2669
  while (1) {
×
2670
    int32_t prevEndPos = forwardRows - 1 + startPos;
×
2671
    startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos,
×
2672
                                      iaInfo->binfo.inputTsOrder);
2673
    if (startPos < 0) {
×
2674
      break;
×
2675
    }
2676

2677
    // null data, failed to allocate more memory buffer
2678
    code =
2679
        setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
×
2680
                               pExprSup->pCtx, numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2681
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
×
2682
      T_LONG_JMP(pTaskInfo->env, code);
×
2683
    }
2684

2685
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
×
2686
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
2687
                                           iaInfo->binfo.inputTsOrder);
2688

2689
    // window start(end) key interpolation
2690
    code = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup);
×
2691
    if (code != TSDB_CODE_SUCCESS) {
×
2692
      T_LONG_JMP(pTaskInfo->env, code);
×
2693
    }
2694

2695
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, 1);
×
2696
    code = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
×
2697
                                           forwardRows, pBlock->info.rows, numOfOutput);
×
2698
    if (code != TSDB_CODE_SUCCESS) {
×
2699
      T_LONG_JMP(pTaskInfo->env, code);
×
2700
    }
2701
    doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
2702

2703
    // output previous interval results after this interval (&nextWin) is closed
2704
    code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &nextWin);
×
2705
    if (code != TSDB_CODE_SUCCESS) {
×
2706
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2707
      T_LONG_JMP(pTaskInfo->env, code);
×
2708
    }
2709
  }
2710

2711
  if (iaInfo->timeWindowInterpo) {
×
2712
    saveDataBlockLastRow(iaInfo->pPrevValues, pBlock, iaInfo->pInterpCols);
×
2713
  }
2714
}
×
2715

2716
static int32_t doMergeIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
2717
  int32_t        code = TSDB_CODE_SUCCESS;
×
2718
  int32_t        lino = 0;
×
2719
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
×
2720

2721
  SMergeIntervalAggOperatorInfo* miaInfo = pOperator->info;
×
2722
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2723
  SExprSupp*                     pExpSupp = &pOperator->exprSupp;
×
2724

2725
  if (pOperator->status == OP_EXEC_DONE) {
×
2726
    (*ppRes) = NULL;
×
2727
    return code;
×
2728
  }
2729

2730
  SSDataBlock* pRes = iaInfo->binfo.pRes;
×
2731
  blockDataCleanup(pRes);
×
2732
  code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
×
2733
  QUERY_CHECK_CODE(code, lino, _end);
×
2734

2735
  if (!miaInfo->inputBlocksFinished) {
×
2736
    SOperatorInfo* downstream = pOperator->pDownstream[0];
×
2737
    while (1) {
×
2738
      SSDataBlock* pBlock = NULL;
×
2739
      if (miaInfo->prefetchedBlock == NULL) {
×
2740
        pBlock = getNextBlockFromDownstream(pOperator, 0);
×
2741
      } else {
2742
        pBlock = miaInfo->prefetchedBlock;
×
2743
        miaInfo->groupId = pBlock->info.id.groupId;
×
2744
        miaInfo->prefetchedBlock = NULL;
×
2745
      }
2746

2747
      if (pBlock == NULL) {
×
2748
        tdListInitIter(miaInfo->groupIntervals, &miaInfo->groupIntervalsIter, TD_LIST_FORWARD);
×
2749
        miaInfo->inputBlocksFinished = true;
×
2750
        break;
×
2751
      }
2752

2753
      if (!miaInfo->hasGroupId) {
×
2754
        miaInfo->hasGroupId = true;
×
2755
        miaInfo->groupId = pBlock->info.id.groupId;
×
2756
      } else if (miaInfo->groupId != pBlock->info.id.groupId) {
×
2757
        miaInfo->prefetchedBlock = pBlock;
×
2758
        break;
×
2759
      }
2760

2761
      pRes->info.scanFlag = pBlock->info.scanFlag;
×
2762
      code = setInputDataBlock(pExpSupp, pBlock, iaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
×
2763
      QUERY_CHECK_CODE(code, lino, _end);
×
2764

2765
      doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, pBlock->info.scanFlag, pRes);
×
2766

2767
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
×
2768
        break;
×
2769
      }
2770
    }
2771

2772
    pRes->info.id.groupId = miaInfo->groupId;
×
2773
  }
2774

2775
  if (miaInfo->inputBlocksFinished) {
×
2776
    SListNode* listNode = tdListNext(&miaInfo->groupIntervalsIter);
×
2777

2778
    if (listNode != NULL) {
×
2779
      SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data);
×
2780
      pRes->info.id.groupId = grpWin->groupId;
×
2781
    }
2782
  }
2783

2784
  if (pRes->info.rows == 0) {
×
2785
    setOperatorCompleted(pOperator);
×
2786
  }
2787

2788
  size_t rows = pRes->info.rows;
×
2789
  pOperator->resultInfo.totalRows += rows;
×
2790

2791
_end:
×
2792
  if (code != TSDB_CODE_SUCCESS) {
×
2793
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2794
    pTaskInfo->code = code;
×
2795
    T_LONG_JMP(pTaskInfo->env, code);
×
2796
  }
2797
  (*ppRes) = (rows == 0) ? NULL : pRes;
×
2798
  return code;
×
2799
}
2800

2801
static int32_t resetMergeIntervalOperState(SOperatorInfo* pOper) {
×
2802
  SMergeIntervalAggOperatorInfo* pInfo = pOper->info;
×
2803

2804
  pInfo->hasGroupId = false;
×
2805
  pInfo->groupId = 0;
×
2806
  pInfo->prefetchedBlock = NULL;
×
2807
  pInfo->inputBlocksFinished = false;
×
2808
  tdListEmpty(pInfo->groupIntervals);
×
2809
  
2810
  SIntervalAggOperatorInfo* pIntervalInfo = &pInfo->intervalAggOperatorInfo;
×
2811
  return resetInterval(pOper, pIntervalInfo);
×
2812
}
2813

2814
int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
×
2815
                                        SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2816
  QRY_PARAM_CHECK(pOptrInfo);
×
2817

2818
  int32_t                        code = TSDB_CODE_SUCCESS;
×
2819
  int32_t                        lino = 0;
×
2820
  SMergeIntervalAggOperatorInfo* pMergeIntervalInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo));
×
2821
  SOperatorInfo*                 pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
2822
  if (pMergeIntervalInfo == NULL || pOperator == NULL) {
×
2823
    code = terrno;
×
2824
    goto _error;
×
2825
  }
2826

2827
  pOperator->pPhyNode = pIntervalPhyNode;
×
2828
  SInterval interval = {.interval = pIntervalPhyNode->interval,
×
2829
                        .sliding = pIntervalPhyNode->sliding,
×
2830
                        .intervalUnit = pIntervalPhyNode->intervalUnit,
×
2831
                        .slidingUnit = pIntervalPhyNode->slidingUnit,
×
2832
                        .offset = pIntervalPhyNode->offset,
×
2833
                        .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision,
×
2834
                        .timeRange = pIntervalPhyNode->timeRange};
2835
  calcIntervalAutoOffset(&interval);
×
2836

2837
  pMergeIntervalInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow));
×
2838

2839
  SIntervalAggOperatorInfo* pIntervalInfo = &pMergeIntervalInfo->intervalAggOperatorInfo;
×
2840
  pIntervalInfo->win = pTaskInfo->window;
×
2841
  pIntervalInfo->binfo.inputTsOrder = pIntervalPhyNode->window.node.inputTsOrder;
×
2842
  pIntervalInfo->interval = interval;
×
2843
  pIntervalInfo->binfo.mergeResultBlock = pIntervalPhyNode->window.mergeDataBlock;
×
2844
  pIntervalInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
×
2845
  pIntervalInfo->binfo.outputTsOrder = pIntervalPhyNode->window.node.outputTsOrder;
×
2846

2847
  SExprSupp* pExprSupp = &pOperator->exprSupp;
×
2848
  pExprSupp->hasWindowOrGroup = true;
×
2849
  pExprSupp->hasWindow = true;
×
2850

2851
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
×
2852
  initResultSizeInfo(&pOperator->resultInfo, 4096);
×
2853

2854
  int32_t    num = 0;
×
2855
  SExprInfo* pExprInfo = NULL;
×
2856
  code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
×
2857
  QUERY_CHECK_CODE(code, lino, _error);
×
2858

2859
  code = initAggSup(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
×
2860
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
×
2861
  if (code != TSDB_CODE_SUCCESS) {
×
2862
    goto _error;
×
2863
  }
2864

2865
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pIntervalPhyNode->window.node.pOutputDataBlockDesc);
×
2866
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
×
2867
  initBasicInfo(&pIntervalInfo->binfo, pResBlock);
×
2868
  code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win);
×
2869
  QUERY_CHECK_CODE(code, lino, _error);
×
2870

2871
  pIntervalInfo->timeWindowInterpo = false;
×
2872
  code = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo, &pIntervalInfo->timeWindowInterpo);
×
2873
  QUERY_CHECK_CODE(code, lino, _error);
×
2874
  if (pIntervalInfo->timeWindowInterpo) {
×
2875
    pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
2876
    if (pIntervalInfo->binfo.resultRowInfo.openWindow == NULL) {
×
2877
      goto _error;
×
2878
    }
2879
  }
2880

2881
  pIntervalInfo->pOperator = pOperator;
×
2882
  initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo);
×
2883
  setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false,
×
2884
                  OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo);
2885
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAggNext, NULL, destroyMergeIntervalOperatorInfo,
×
2886
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2887
  setOperatorResetStateFn(pOperator, resetMergeIntervalOperState);
×
2888

2889
  code = appendDownstream(pOperator, &downstream, 1);
×
2890
  if (code != TSDB_CODE_SUCCESS) {
×
2891
    goto _error;
×
2892
  }
2893

2894
  *pOptrInfo = pOperator;
×
2895
  return TSDB_CODE_SUCCESS;
×
2896
_error:
×
2897
  if (pMergeIntervalInfo != NULL) {
×
2898
    destroyMergeIntervalOperatorInfo(pMergeIntervalInfo);
×
2899
  }
2900
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2901
  pTaskInfo->code = code;
×
2902
  return code;
×
2903
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc