• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4933

20 Jan 2026 10:44AM UTC coverage: 66.671% (+0.03%) from 66.646%
#4933

push

travis-ci

web-flow
merge: from main to 3.0 #34340

73 of 178 new or added lines in 9 files covered. (41.01%)

1199 existing lines in 124 files now uncovered.

203121 of 304663 relevant lines covered (66.67%)

132228377.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.6
/source/libs/executor/src/timewindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "executorInt.h"
16
#include "filter.h"
17
#include "function.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "taoserror.h"
23
#include "tchecksum.h"
24
#include "tcommon.h"
25
#include "tcompare.h"
26
#include "tdatablock.h"
27
#include "tfill.h"
28
#include "tglobal.h"
29
#include "tlog.h"
30
#include "ttime.h"
31

32
typedef enum SResultTsInterpType {
33
  RESULT_ROW_START_INTERP = 1,
34
  RESULT_ROW_END_INTERP = 2,
35
} SResultTsInterpType;
36

37
typedef struct SOpenWindowInfo {
38
  SResultRowPosition pos;
39
  uint64_t           groupId;
40
} SOpenWindowInfo;
41

42
static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo);
43

44
static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult,
45
                                              uint64_t groupId, SExecTaskInfo* pTaskInfo);
46
static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult);
47

48
static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, bool masterscan,
2,147,483,647✔
49
                                      SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
50
                                      int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
51
                                      SExecTaskInfo* pTaskInfo) {
52
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
2,147,483,647✔
53
                                                  masterscan, tableGroupId, pTaskInfo, true, pAggSup, true);
54

55
  if (pResultRow == NULL || pTaskInfo->code != 0) {
2,147,483,647✔
56
    *pResult = NULL;
×
57
    qError("failed to set result output buffer, error:%s", tstrerror(pTaskInfo->code));
×
58
    return pTaskInfo->code;
×
59
  }
60

61
  // set time window for current result
62
  TAOS_SET_POBJ_ALIGNED(&pResultRow->win, win);
2,147,483,647✔
63
  *pResult = pResultRow;
2,147,483,647✔
64
  return setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
2,147,483,647✔
65
}
66

67
void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, int32_t rowIndex, uint64_t groupId) {
2,147,483,647✔
68
  pRowSup->win.ekey = ts;
2,147,483,647✔
69
  pRowSup->prevTs = ts;
2,147,483,647✔
70
  pRowSup->groupId = groupId;
2,147,483,647✔
71
  pRowSup->numOfRows += 1;
2,147,483,647✔
72
  if (hasContinuousNullRows(pRowSup)) {
2,147,483,647✔
73
    // rows having null state col are wrapped by rows of same state
74
    // these rows can be counted into current window
75
    pRowSup->numOfRows += pRowSup->numNullRows;
180,460,419✔
76
    resetNumNullRows(pRowSup);
180,460,419✔
77
  }
78
}
2,147,483,647✔
79

80
void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, uint64_t groupId) {
879,956,838✔
81
  pRowSup->startRowIndex = rowIndex;
879,956,838✔
82
  pRowSup->numOfRows = 0;
879,956,460✔
83
  pRowSup->win.skey = tsList[rowIndex];
879,956,838✔
84
  pRowSup->groupId = groupId;
879,956,838✔
85
  resetNumNullRows(pRowSup);
879,956,460✔
86
}
879,956,838✔
87

88
FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos,
×
89
                                            int32_t order, int64_t* pData) {
90
  int32_t forwardRows = 0;
2,147,483,647✔
91

92
  if (order == TSDB_ORDER_ASC) {
×
93
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
2,147,483,647✔
94
    if (end >= 0) {
2,147,483,647✔
95
      forwardRows = end;
2,147,483,647✔
96

97
      while (pData[end + pos] == ekey) {
2,147,483,647✔
98
        forwardRows += 1;
1,785,633,367✔
99
        ++pos;
1,785,633,367✔
100
      }
101
    }
102
  } else {
103
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
370,065,178✔
104
    if (end >= 0) {
369,857,839✔
105
      forwardRows = end;
369,877,383✔
106

107
      while (pData[end + pos] == ekey) {
733,193,204✔
108
        forwardRows += 1;
363,315,821✔
109
        ++pos;
363,315,821✔
110
      }
111
    }
112
    //    int32_t end = searchFn((char*)pData, pos + 1, ekey, order);
113
    //    if (end >= 0) {
114
    //      forwardRows = pos - end;
115
    //
116
    //      if (pData[end] == ekey) {
117
    //        forwardRows += 1;
118
    //      }
119
    //    }
120
  }
121

122
  return forwardRows;
2,147,483,647✔
123
}
124

125
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
2,147,483,647✔
126
  int32_t midPos = -1;
2,147,483,647✔
127
  int32_t numOfRows;
128

129
  if (num <= 0) {
2,147,483,647✔
130
    return -1;
×
131
  }
132

133
  TSKEY*  keyList = (TSKEY*)pValue;
2,147,483,647✔
134
  int32_t firstPos = 0;
2,147,483,647✔
135
  int32_t lastPos = num - 1;
2,147,483,647✔
136

137
  if (order == TSDB_ORDER_DESC) {
2,147,483,647✔
138
    // find the first position which is smaller than the key
139
    while (1) {
140
      if (key >= keyList[firstPos]) return firstPos;
426,398,904✔
141
      if (key == keyList[lastPos]) return lastPos;
63,560,235✔
142

143
      if (key < keyList[lastPos]) {
63,094,420✔
144
        lastPos += 1;
6,739,508✔
145
        if (lastPos >= num) {
6,739,508✔
146
          return -1;
×
147
        } else {
148
          return lastPos;
6,739,508✔
149
        }
150
      }
151

152
      numOfRows = lastPos - firstPos + 1;
56,340,412✔
153
      midPos = (numOfRows >> 1) + firstPos;
56,340,412✔
154

155
      if (key < keyList[midPos]) {
56,340,412✔
156
        firstPos = midPos + 1;
1,901,986✔
157
      } else if (key > keyList[midPos]) {
54,435,426✔
158
        lastPos = midPos - 1;
53,959,230✔
159
      } else {
160
        break;
522,196✔
161
      }
162
    }
163

164
  } else {
165
    // find the first position which is bigger than the key
166
    while (1) {
167
      if (key <= keyList[firstPos]) return firstPos;
2,147,483,647✔
168
      if (key == keyList[lastPos]) return lastPos;
2,147,483,647✔
169

170
      if (key > keyList[lastPos]) {
2,147,483,647✔
171
        lastPos = lastPos + 1;
2,147,483,647✔
172
        if (lastPos >= num)
2,147,483,647✔
173
          return -1;
396,295✔
174
        else
175
          return lastPos;
2,147,483,647✔
176
      }
177

178
      numOfRows = lastPos - firstPos + 1;
2,147,483,647✔
179
      midPos = (numOfRows >> 1u) + firstPos;
2,147,483,647✔
180

181
      if (key < keyList[midPos]) {
2,147,483,647✔
182
        lastPos = midPos - 1;
2,147,483,647✔
183
      } else if (key > keyList[midPos]) {
1,322,753,739✔
184
        firstPos = midPos + 1;
824,693,224✔
185
      } else {
186
        break;
498,073,195✔
187
      }
188
    }
189
  }
190

191
  return midPos;
498,595,391✔
192
}
193

194
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
2,147,483,647✔
195
                                 __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order) {
196
  int32_t num = -1;
2,147,483,647✔
197
  int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
2,147,483,647✔
198

199
  if (order == TSDB_ORDER_ASC) {
2,147,483,647✔
200
    if (ekey < pDataBlockInfo->window.ekey && pPrimaryColumn) {
2,147,483,647✔
201
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
2,147,483,647✔
202
      if (item != NULL) {
2,147,483,647✔
203
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
204
      }
205
    } else {
206
      num = pDataBlockInfo->rows - startPos;
20,918,050✔
207
      if (item != NULL) {
24,679,870✔
208
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
209
      }
210
    }
211
  } else {  // desc
212
    if (ekey > pDataBlockInfo->window.skey && pPrimaryColumn) {
363,889,881✔
213
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
369,985,814✔
214
      if (item != NULL) {
368,212,259✔
215
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
216
      }
217
    } else {
218
      num = pDataBlockInfo->rows - startPos;
344,240✔
219
      if (item != NULL) {
541,479✔
220
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
221
      }
222
    }
223
  }
224

225
  return num;
2,147,483,647✔
226
}
227

228
void doTimeWindowInterpolation(SArray* pPrevValues, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs,
10,045,795✔
229
                               int32_t curRowIndex, TSKEY windowKey, int32_t type, SExprSupp* pSup) {
230
  SqlFunctionCtx* pCtx = pSup->pCtx;
10,045,795✔
231

232
  int32_t index = 1;
10,045,795✔
233
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
30,142,922✔
234
    if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) {
20,097,127✔
235
      pCtx[k].start.key = INT64_MIN;
10,051,332✔
236
      continue;
10,051,332✔
237
    }
238

239
    SFunctParam*     pParam = &pCtx[k].param[0];
10,045,795✔
240
    SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, pParam->pCol->slotId);
10,045,795✔
241

242
    double v1 = 0, v2 = 0, v = 0;
10,045,795✔
243
    if (prevRowIndex == -1) {
10,045,795✔
244
      SGroupKeys* p = taosArrayGet(pPrevValues, index);
×
245
      GET_TYPED_DATA(v1, double, pColInfo->info.type, p->pData, typeGetTypeModFromColInfo(&pColInfo->info));
×
246
    } else {
247
      GET_TYPED_DATA(v1, double, pColInfo->info.type, colDataGetData(pColInfo, prevRowIndex),
10,045,795✔
248
                     typeGetTypeModFromColInfo(&pColInfo->info));
249
    }
250

251
    GET_TYPED_DATA(v2, double, pColInfo->info.type, colDataGetData(pColInfo, curRowIndex),
10,045,795✔
252
                   typeGetTypeModFromColInfo(&pColInfo->info));
253

254
#if 0
255
    if (functionId == FUNCTION_INTERP) {
256
      if (type == RESULT_ROW_START_INTERP) {
257
        pCtx[k].start.key = prevTs;
258
        pCtx[k].start.val = v1;
259

260
        pCtx[k].end.key = curTs;
261
        pCtx[k].end.val = v2;
262

263
        if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_VARBINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR ||
264
            pColInfo->info.type == TSDB_DATA_TYPE_GEOMETRY) {
265
          if (prevRowIndex == -1) {
266
            //            pCtx[k].start.ptr = (char*)pRuntimeEnv->prevRow[index];
267
          } else {
268
            pCtx[k].start.ptr = (char*)pColInfo->pData + prevRowIndex * pColInfo->info.bytes;
269
          }
270

271
          pCtx[k].end.ptr = (char*)pColInfo->pData + curRowIndex * pColInfo->info.bytes;
272
        }
273
      }
274
    } else if (functionId == FUNCTION_TWA) {
275
#endif
276

277
    SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
10,045,795✔
278
    SPoint point2 = (SPoint){.key = curTs, .val = &v2};
10,045,795✔
279
    SPoint point = (SPoint){.key = windowKey, .val = &v};
10,045,795✔
280

281
    if (!fmIsElapsedFunc(pCtx[k].functionId)) {
10,045,795✔
282
      taosGetLinearInterpolationVal(&point, TSDB_DATA_TYPE_DOUBLE, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, 0);
9,986,270✔
283
    }
284

285
    if (type == RESULT_ROW_START_INTERP) {
10,045,795✔
286
      pCtx[k].start.key = point.key;
4,995,532✔
287
      pCtx[k].start.val = v;
4,995,532✔
288
    } else {
289
      pCtx[k].end.key = point.key;
5,050,263✔
290
      pCtx[k].end.val = v;
5,050,263✔
291
    }
292

293
    index += 1;
10,045,795✔
294
  }
295
#if 0
296
  }
297
#endif
298
}
10,045,795✔
299

300
static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t type) {
648,515✔
301
  if (type == RESULT_ROW_START_INTERP) {
648,515✔
302
    for (int32_t k = 0; k < numOfOutput; ++k) {
1,031,695✔
303
      pCtx[k].start.key = INT64_MIN;
680,072✔
304
    }
305
  } else {
306
    for (int32_t k = 0; k < numOfOutput; ++k) {
887,693✔
307
      pCtx[k].end.key = INT64_MIN;
590,801✔
308
    }
309
  }
310
}
648,515✔
311

312
static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, int32_t pos, SSDataBlock* pBlock,
5,347,155✔
313
                                              const TSKEY* tsCols, STimeWindow* win, SExprSupp* pSup) {
314
  bool ascQuery = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
5,347,155✔
315

316
  TSKEY curTs = tsCols[pos];
5,347,155✔
317

318
  SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
5,347,155✔
319
  TSKEY       lastTs = *(int64_t*)pTsKey->pData;
5,347,155✔
320

321
  // lastTs == INT64_MIN and pos == 0 means this is the first time window, interpolation is not needed.
322
  // start exactly from this point, no need to do interpolation
323
  TSKEY key = ascQuery ? win->skey : win->ekey;
5,347,155✔
324
  if (key == curTs) {
5,347,155✔
325
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
343,183✔
326
    return true;
343,183✔
327
  }
328

329
  // it is the first time window, no need to do interpolation
330
  if (pTsKey->isNull && pos == 0) {
5,003,972✔
331
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
8,440✔
332
  } else {
333
    TSKEY prevTs = ((pos == 0) ? lastTs : tsCols[pos - 1]);
4,995,532✔
334
    doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, pos - 1, curTs, pos, key,
4,995,532✔
335
                              RESULT_ROW_START_INTERP, pSup);
336
  }
337

338
  return true;
5,003,972✔
339
}
340

341
static int32_t setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex,
5,347,155✔
342
                                               int32_t nextRowIndex, SArray* pDataBlock, const TSKEY* tsCols,
343
                                               TSKEY blockEkey, STimeWindow* win, bool* pRes) {
344
  int32_t code = TSDB_CODE_SUCCESS;
5,347,155✔
345
  int32_t lino = 0;
5,347,155✔
346
  int32_t order = pInfo->binfo.inputTsOrder;
5,347,155✔
347

348
  TSKEY actualEndKey = tsCols[endRowIndex];
5,347,155✔
349
  TSKEY key = (order == TSDB_ORDER_ASC) ? win->ekey : win->skey;
5,347,155✔
350

351
  // not ended in current data block, do not invoke interpolation
352
  if ((key > blockEkey && (order == TSDB_ORDER_ASC)) || (key < blockEkey && (order == TSDB_ORDER_DESC))) {
5,347,155✔
353
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
12,530✔
354
    (*pRes) = false;
12,530✔
355
    return code;
12,530✔
356
  }
357

358
  // there is actual end point of current time window, no interpolation needs
359
  if (key == actualEndKey) {
5,334,625✔
360
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
284,362✔
361
    (*pRes) = true;
284,362✔
362
    return code;
284,362✔
363
  }
364

365
  if (nextRowIndex < 0) {
5,050,263✔
366
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
367
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
368
  }
369

370
  TSKEY nextKey = tsCols[nextRowIndex];
5,050,263✔
371
  doTimeWindowInterpolation(pInfo->pPrevValues, pDataBlock, actualEndKey, endRowIndex, nextKey, nextRowIndex, key,
5,050,263✔
372
                            RESULT_ROW_END_INTERP, pSup);
373
  (*pRes) = true;
5,050,263✔
374
  return code;
5,050,263✔
375
}
376

377
bool inCalSlidingWindow(const SInterval* pInterval, const STimeWindow* pWin, TSKEY calStart, TSKEY calEnd, EStreamType blockType) {
2,147,483,647✔
378
  if (pInterval->interval != pInterval->sliding &&
2,147,483,647✔
379
      ((pWin->ekey < calStart || pWin->skey > calEnd) || (blockType == STREAM_PULL_DATA && pWin->skey < calStart))) {
60,251,900✔
380
    return false;
×
381
  }
382

383
  return true;
2,147,483,647✔
384
}
385

386
bool inSlidingWindow(const SInterval* pInterval, const STimeWindow* pWin, const SDataBlockInfo* pBlockInfo) {
2,147,483,647✔
387
  return inCalSlidingWindow(pInterval, pWin, pBlockInfo->calWin.skey, pBlockInfo->calWin.ekey, pBlockInfo->type);
2,147,483,647✔
388
}
389

390
int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
2,147,483,647✔
391
                               TSKEY* primaryKeys, int32_t prevPosition, int32_t order) {
392
  bool ascQuery = (order != TSDB_ORDER_DESC);
2,147,483,647✔
393

394
  int32_t precision = pInterval->precision;
2,147,483,647✔
395
  getNextTimeWindow(pInterval, pNext, order);
2,147,483,647✔
396

397
  // next time window is not in current block
398
  if ((pNext->skey > pDataBlockInfo->window.ekey && order == TSDB_ORDER_ASC) ||
2,147,483,647✔
399
      (pNext->ekey < pDataBlockInfo->window.skey && order == TSDB_ORDER_DESC)) {
2,147,483,647✔
400
    return -1;
13,517,694✔
401
  }
402

403
  if (!inSlidingWindow(pInterval, pNext, pDataBlockInfo) && order == TSDB_ORDER_ASC) {
2,147,483,647✔
404
    return -1;
×
405
  }
406

407
  TSKEY   skey = ascQuery ? pNext->skey : pNext->ekey;
2,147,483,647✔
408
  int32_t startPos = 0;
2,147,483,647✔
409

410
  // tumbling time window query, a special case of sliding time window query
411
  if (pInterval->sliding == pInterval->interval && prevPosition != -1) {
2,147,483,647✔
412
    startPos = prevPosition + 1;
2,147,483,647✔
413
  } else {
414
    if ((skey <= pDataBlockInfo->window.skey && ascQuery) || (skey >= pDataBlockInfo->window.ekey && !ascQuery)) {
58,232,633✔
415
      startPos = 0;
8,652,190✔
416
    } else {
417
      startPos = binarySearchForKey((char*)primaryKeys, pDataBlockInfo->rows, skey, order);
51,599,710✔
418
    }
419
  }
420
  if(startPos < 0 || startPos >= pDataBlockInfo->rows) {
2,147,483,647✔
421
    return -1;
2,147,483,647✔
422
  }
423

424
  /* interp query with fill should not skip time window */
425
  //  if (pQueryAttr->pointInterpQuery && pQueryAttr->fillType != TSDB_FILL_NONE) {
426
  //    return startPos;
427
  //  }
428

429
  /*
430
   * This time window does not cover any data, try next time window,
431
   * this case may happen when the time window is too small
432
   */
433
  if (primaryKeys != NULL) {
2,147,483,647✔
434
    if (ascQuery && primaryKeys[startPos] > pNext->ekey) {
2,147,483,647✔
435
      TSKEY next = primaryKeys[startPos];
1,530,892,468✔
436
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
1,531,801,446✔
437
        pNext->skey = taosTimeTruncate(next, pInterval);
4,729,255✔
438
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
1,528✔
439
      } else {
440
        pNext->ekey += ((next - pNext->ekey + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
1,532,699,634✔
441
        pNext->skey = pNext->ekey - pInterval->interval + 1;
1,533,595,213✔
442
      }
443
    } else if ((!ascQuery) && primaryKeys[startPos] < pNext->skey) {
663,232,056✔
444
      TSKEY next = primaryKeys[startPos];
367,633,903✔
445
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
367,916,071✔
446
        pNext->skey = taosTimeTruncate(next, pInterval);
93,139✔
447
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
×
448
      } else {
449
        pNext->skey -= ((pNext->skey - next + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
367,801,703✔
450
        pNext->ekey = pNext->skey + pInterval->interval - 1;
366,452,551✔
451
      }
452
    }
453
  }
454

455
  return startPos;
2,147,483,647✔
456
}
457

458
static bool isResultRowInterpolated(SResultRow* pResult, SResultTsInterpType type) {
16,041,465✔
459
  if (type == RESULT_ROW_START_INTERP) {
16,041,465✔
460
    return pResult->startInterp == true;
5,347,155✔
461
  } else {
462
    return pResult->endInterp == true;
10,694,310✔
463
  }
464
}
465

466
static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
10,681,780✔
467
  if (type == RESULT_ROW_START_INTERP) {
10,681,780✔
468
    pResult->startInterp = true;
5,347,155✔
469
  } else {
470
    pResult->endInterp = true;
5,334,625✔
471
  }
472
}
10,681,780✔
473

474
static int32_t doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataBlock* pBlock, SResultRow* pResult,
2,147,483,647✔
475
                                           STimeWindow* win, int32_t startPos, int32_t forwardRows, SExprSupp* pSup) {
476
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
477
  int32_t lino = 0;
2,147,483,647✔
478
  if (!pInfo->timeWindowInterpo) {
2,147,483,647✔
479
    return code;
2,147,483,647✔
480
  }
481

482
  if (pBlock == NULL) {
2,287,840✔
483
    code = TSDB_CODE_INVALID_PARA;
×
484
    return code;
×
485
  }
486

487
  if (pBlock->pDataBlock == NULL) {
2,287,840✔
488
    return code;
×
489
  }
490

491
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
5,347,155✔
492

493
  TSKEY* tsCols = (TSKEY*)(pColInfo->pData);
5,347,155✔
494
  bool   done = isResultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
5,347,155✔
495
  if (!done) {  // it is not interpolated, now start to generated the interpolated value
5,347,155✔
496
    bool interp = setTimeWindowInterpolationStartTs(pInfo, startPos, pBlock, tsCols, win, pSup);
5,347,155✔
497
    if (interp) {
5,347,155✔
498
      setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
5,347,155✔
499
    }
500
  } else {
501
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
×
502
  }
503

504
  // point interpolation does not require the end key time window interpolation.
505
  // interpolation query does not generate the time window end interpolation
506
  done = isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
5,347,155✔
507
  if (!done) {
5,347,155✔
508
    int32_t endRowIndex = startPos + forwardRows - 1;
5,347,155✔
509
    int32_t nextRowIndex = endRowIndex + 1;
5,347,155✔
510

511
    // duplicated ts row does not involve in the interpolation of end value for current time window
512
    int32_t x = endRowIndex;
5,347,155✔
513
    while (x > 0) {
5,362,386✔
514
      if (tsCols[x] == tsCols[x - 1]) {
5,350,456✔
515
        x -= 1;
15,231✔
516
      } else {
517
        endRowIndex = x;
5,335,225✔
518
        break;
5,335,225✔
519
      }
520
    }
521

522
    TSKEY endKey = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey;
5,347,155✔
523
    bool  interp = false;
5,347,155✔
524
    code = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, nextRowIndex, pBlock->pDataBlock, tsCols, endKey,
5,347,155✔
525
                                           win, &interp);
526
    QUERY_CHECK_CODE(code, lino, _end);
5,347,155✔
527
    if (interp) {
5,347,155✔
528
      setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
5,334,625✔
529
    }
530
  } else {
531
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
×
532
  }
533

534
_end:
5,347,155✔
535
  if (code != TSDB_CODE_SUCCESS) {
5,347,155✔
536
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
537
  }
538
  return code;
5,347,155✔
539
}
540

541
static void saveDataBlockLastRow(SArray* pPrevKeys, const SSDataBlock* pBlock, SArray* pCols) {
16,800✔
542
  if (pBlock->pDataBlock == NULL) {
16,800✔
543
    return;
×
544
  }
545

546
  size_t num = taosArrayGetSize(pPrevKeys);
16,800✔
547
  for (int32_t k = 0; k < num; ++k) {
50,400✔
548
    SColumn* pc = taosArrayGet(pCols, k);
33,600✔
549

550
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pc->slotId);
33,600✔
551

552
    SGroupKeys* pkey = taosArrayGet(pPrevKeys, k);
33,600✔
553
    for (int32_t i = pBlock->info.rows - 1; i >= 0; --i) {
33,600✔
554
      if (colDataIsNull_s(pColInfo, i)) {
67,200✔
555
        continue;
×
556
      }
557

558
      char* val = colDataGetData(pColInfo, i);
33,600✔
559
      if (IS_VAR_DATA_TYPE(pkey->type)) {
33,600✔
560
        if (IS_STR_DATA_BLOB(pkey->type)) {
×
561
          memcpy(pkey->pData, val, blobDataTLen(val));
×
562
        } else {
563
          memcpy(pkey->pData, val, varDataTLen(val));
×
564
        }
565
      } else {
566
        memcpy(pkey->pData, val, pkey->bytes);
33,600✔
567
      }
568

569
      break;
33,600✔
570
    }
571
  }
572
}
573

574
static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t numOfExprs, SResultRowInfo* pResultRowInfo,
16,800✔
575
                                       SSDataBlock* pBlock, int32_t scanFlag, int64_t* tsCols, SResultRowPosition* p) {
576
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
16,800✔
577

578
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
16,800✔
579
  SExprSupp*                pSup = &pOperatorInfo->exprSupp;
16,800✔
580

581
  int32_t startPos = 0;
16,800✔
582
  int32_t numOfOutput = pSup->numOfExprs;
16,800✔
583

584
  SResultRow* pResult = NULL;
16,800✔
585

586
  while (1) {
×
587
    SListNode*          pn = tdListGetHead(pResultRowInfo->openWindow);
16,800✔
588
    SOpenWindowInfo*    pOpenWin = (SOpenWindowInfo*)pn->data;
16,800✔
589
    uint64_t            groupId = pOpenWin->groupId;
16,800✔
590
    SResultRowPosition* p1 = &pOpenWin->pos;
16,800✔
591
    if (p->pageId == p1->pageId && p->offset == p1->offset) {
16,800✔
592
      break;
16,800✔
593
    }
594

595
    SResultRow* pr = getResultRowByPos(pInfo->aggSup.pResultBuf, p1, false);
×
596
    if (NULL == pr) {
×
597
      T_LONG_JMP(pTaskInfo->env, terrno);
×
598
    }
599

600
    if (!(pr->offset == p1->offset && pr->pageId == p1->pageId)) {
×
601
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
602
      T_LONG_JMP(pTaskInfo->env, terrno);
×
603
    }
604

605
    if (pr->closed) {
×
606
      if (!(isResultRowInterpolated(pr, RESULT_ROW_START_INTERP) &&
×
607
            isResultRowInterpolated(pr, RESULT_ROW_END_INTERP))) {
×
608
        pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
609
        T_LONG_JMP(pTaskInfo->env, terrno);
×
610
      }
611
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
×
612
      taosMemoryFree(pNode);
×
613
      continue;
×
614
    }
615

616
    STimeWindow w = pr->win;
×
617
    int32_t     ret = setTimeWindowOutputBuf(pResultRowInfo, &w, (scanFlag == MAIN_SCAN), &pResult, groupId, pSup->pCtx,
×
618
                                             numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
619
    if (ret != TSDB_CODE_SUCCESS) {
×
620
      T_LONG_JMP(pTaskInfo->env, ret);
×
621
    }
622

623
    if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
×
624
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
625
      T_LONG_JMP(pTaskInfo->env, terrno);
×
626
    }
627

628
    SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
×
629
    if (!pTsKey) {
×
630
      pTaskInfo->code = terrno;
×
631
      T_LONG_JMP(pTaskInfo->env, terrno);
×
632
    }
633

634
    int64_t prevTs = *(int64_t*)pTsKey->pData;
×
635
    if (groupId == pBlock->info.id.groupId) {
×
636
      TSKEY curTs = pBlock->info.window.skey;
×
637
      if (tsCols != NULL) {
×
638
        curTs = tsCols[startPos];
×
639
      }
640
      doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, curTs, startPos, w.ekey,
×
641
                                RESULT_ROW_END_INTERP, pSup);
642
    }
643

644
    setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
×
645
    setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
×
646

647
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, 1);
×
648
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0,
×
649
                                          pBlock->info.rows, numOfExprs);
×
650
    if (ret != TSDB_CODE_SUCCESS) {
×
651
      T_LONG_JMP(pTaskInfo->env, ret);
×
652
    }
653

654
    if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
×
655
      closeResultRow(pr);
×
656
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
×
657
      taosMemoryFree(pNode);
×
658
    } else {  // the remains are can not be closed yet.
659
      break;
×
660
    }
661
  }
662
}
16,800✔
663

664
static bool tsKeyCompFn(void* l, void* r, void* param) {
1,752,320,430✔
665
  TSKEY*                    lTS = (TSKEY*)l;
1,752,320,430✔
666
  TSKEY*                    rTS = (TSKEY*)r;
1,752,320,430✔
667
  SIntervalAggOperatorInfo* pInfo = param;
1,752,320,430✔
668
  return pInfo->binfo.outputTsOrder == ORDER_ASC ? *lTS < *rTS : *lTS > *rTS;
1,752,320,430✔
669
}
670

671
static bool isCalculatedWin(SIntervalAggOperatorInfo* pInfo, const STimeWindow* win, uint64_t tableGroupId) {
273,822,332✔
672
  char keyBuf[sizeof(TSKEY) + sizeof(uint64_t)] = {0};
273,822,332✔
673
  SET_RES_WINDOW_KEY(keyBuf, (char*)&win->skey, sizeof(TSKEY), tableGroupId);
273,883,580✔
674
  return tSimpleHashGet(pInfo->aggSup.pResultRowHashTable, keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))) != NULL;
275,957,212✔
675
}
676

677
/**
678
 * @brief check if cur window should be filtered out by limit info
679
 * @retval true if should be filtered out
680
 * @retval false if not filtering out
681
 * @note If no limit info, we skip filtering.
682
 *       If input/output ts order mismatch, we skip filtering too.
683
 *       eg. input ts order: desc, and output ts order: asc, limit: 10
684
 *       IntervalOperator should output the first 10 windows, however, we can't find the first 10 windows until we scan
685
 *       every tuple in every block.
686
 *       And the boundedQueue keeps refreshing all records with smaller ts key.
687
 */
688
static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STimeWindow* win, uint64_t groupId,
2,147,483,647✔
689
                                  SExecTaskInfo* pTaskInfo) {
690
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
691
  int32_t lino = 0;
2,147,483,647✔
692
  if (!pOperatorInfo->limited  // if no limit info, no filter will be applied
2,147,483,647✔
693
      || pOperatorInfo->binfo.inputTsOrder != pOperatorInfo->binfo.outputTsOrder
636,633,396✔
694
      // if input/output ts order mismatch, no filter
695
  ) {
696
    return false;
2,147,483,647✔
697
  }
698

699
  if (pOperatorInfo->limit == 0) return true;
276,623,118✔
700

701
  if (pOperatorInfo->pBQ == NULL) {
276,637,966✔
702
    pOperatorInfo->pBQ = createBoundedQueue(pOperatorInfo->limit - 1, tsKeyCompFn, taosAutoMemoryFree, pOperatorInfo);
334,668✔
703
    QUERY_CHECK_NULL(pOperatorInfo->pBQ, code, lino, _end, terrno);
335,020✔
704
  }
705

706
  bool shouldFilter = false;
276,443,310✔
707
  // if BQ has been full, compare it with top of BQ
708
  if (taosBQSize(pOperatorInfo->pBQ) == taosBQMaxSize(pOperatorInfo->pBQ) + 1) {
276,443,310✔
709
    PriorityQueueNode* top = taosBQTop(pOperatorInfo->pBQ);
77,130,864✔
710
    shouldFilter = tsKeyCompFn(top->data, &win->skey, pOperatorInfo);
77,130,864✔
711
  }
712
  if (shouldFilter) {
275,703,054✔
713
    return true;
871,890✔
714
  } else if (isCalculatedWin(pOperatorInfo, win, groupId)) {
274,831,164✔
715
    return false;
113,432,538✔
716
  }
717

718
  // cur win not been filtered out and not been pushed into BQ yet, push it into BQ
719
  PriorityQueueNode node = {.data = taosMemoryMalloc(sizeof(TSKEY))};
162,377,890✔
720
  QUERY_CHECK_NULL(node.data, code, lino, _end, terrno);
161,832,994✔
721

722
  *((TSKEY*)node.data) = win->skey;
161,832,994✔
723

724
  if (NULL == taosBQPush(pOperatorInfo->pBQ, &node)) {
161,847,074✔
725
    taosMemoryFree(node.data);
×
726
    return true;
×
727
  }
728

729
_end:
162,383,874✔
730
  if (code != TSDB_CODE_SUCCESS) {
162,039,618✔
731
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
26,048✔
732
    pTaskInfo->code = code;
26,048✔
733
    T_LONG_JMP(pTaskInfo->env, code);
×
734
  }
735
  return false;
162,013,570✔
736
}
737

738
int32_t getNumOfRowsInTimeWinUnsorted(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, STimeWindow* win,
2,147,483,647✔
739
                                      int32_t startPos) {
740
  int32_t rows = pDataBlockInfo->rows;
2,147,483,647✔
741
  for (int32_t i = startPos; i < pDataBlockInfo->rows; ++i) {
2,147,483,647✔
742
    if (pPrimaryColumn[i] >= win->skey && pPrimaryColumn[i] <= win->ekey) {
2,147,483,647✔
743
      continue;
2,147,483,647✔
744
    } else {
745
      return i - startPos;
2,147,483,647✔
746
    }
747
  }
748
  return rows - startPos;
105,780,083✔
749
}
750

751
static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
119,468,667✔
752
                            int32_t scanFlag) {
753
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
119,468,667✔
754
  bool                      sorted = pInfo->binfo.inputTsOrder == ORDER_ASC || pInfo->binfo.inputTsOrder == ORDER_DESC;
119,487,882✔
755

756
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
119,441,504✔
757
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
119,495,366✔
758

759
  int32_t     startPos = 0;
119,482,250✔
760
  int32_t     numOfOutput = pSup->numOfExprs;
119,482,250✔
761
  int64_t*    tsCols = extractTsCol(pBlock, pInfo, pTaskInfo);
119,474,894✔
762
  uint64_t    tableGroupId = pBlock->info.id.groupId;
119,419,134✔
763
  bool        ascScan = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
119,450,923✔
764
  SResultRow* pResult = NULL;
119,448,284✔
765
  TSKEY       ts = sorted ? getStartTsKey(&pBlock->info.window, tsCols) : tsCols[startPos];
119,398,873✔
766

767
  if (tableGroupId != pInfo->curGroupId) {
119,456,071✔
768
    pInfo->handledGroupNum += 1;
15,990,152✔
769
    if (pInfo->slimited && pInfo->handledGroupNum > pInfo->slimit) {
15,989,915✔
770
      return true;
37,057✔
771
    } else {
772
      pInfo->curGroupId = tableGroupId;
15,952,025✔
773
      destroyBoundedQueue(pInfo->pBQ);
15,952,699✔
774
      pInfo->pBQ = NULL;
15,947,402✔
775
    }
776
  }
777

778
  STimeWindow win =
119,388,244✔
779
      getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->binfo.inputTsOrder);
119,405,125✔
780
  if (filterWindowWithLimit(pInfo, &win, tableGroupId, pTaskInfo)) return false;
119,420,038✔
781

782
  int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
118,704,953✔
783
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
784
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
118,704,596✔
785
    T_LONG_JMP(pTaskInfo->env, ret);
102✔
786
  }
787

788
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
118,706,680✔
789
  int32_t forwardRows = sorted ? getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey,
13,712,723✔
790
                                                          NULL, pInfo->binfo.inputTsOrder)
791
                               : getNumOfRowsInTimeWinUnsorted(&pBlock->info, tsCols, &win, startPos);
118,706,680✔
792

793
  // prev time window not interpolation yet.
794
  if (pInfo->timeWindowInterpo) {
118,740,596✔
795
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
16,800✔
796
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
16,800✔
797

798
    // restore current time window
799
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
16,800✔
800
                                 numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
801
    if (ret != TSDB_CODE_SUCCESS) {
16,800✔
802
      T_LONG_JMP(pTaskInfo->env, ret);
×
803
    }
804

805
    // window start key interpolation
806
    ret = doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup);
16,800✔
807
    if (ret != TSDB_CODE_SUCCESS) {
16,800✔
808
      T_LONG_JMP(pTaskInfo->env, ret);
×
809
    }
810
  }
811
  // qDebug("hashIntervalAgg1 window skey: %lld, ekey:%lld, startPos: %d, forwardRows: %d",
812
  //   win.skey, win.ekey, startPos, forwardRows);
813
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, 1);
118,742,441✔
814
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
118,738,282✔
815
                                        pBlock->info.rows, numOfOutput);
118,724,131✔
816
  if (ret != TSDB_CODE_SUCCESS) {
118,639,685✔
817
    T_LONG_JMP(pTaskInfo->env, ret);
×
818
  }
819

820
  doCloseWindow(pResultRowInfo, pInfo, pResult);
118,639,685✔
821

822
  STimeWindow nextWin = win;
118,687,445✔
823
  int32_t rows = pBlock->info.rows;
118,714,129✔
824

825
  while (startPos < pBlock->info.rows) {
2,147,483,647✔
826
    if (sorted) {
2,147,483,647✔
827
      startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, forwardRows - 1 + startPos,
2,147,483,647✔
828
                                        pInfo->binfo.inputTsOrder);
829
      if (startPos < 0) {
2,147,483,647✔
830
        break;
13,517,694✔
831
      }
832
    } else {
833
      pBlock->info.rows = forwardRows;
2,147,483,647✔
834
      int32_t newStartOff = forwardRows >= 1
2,147,483,647✔
835
                                ? getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols + startPos,
2,147,483,647✔
836
                                                         forwardRows - 1, pInfo->binfo.inputTsOrder)
837
                                : -1;
2,147,483,647✔
838
      pBlock->info.rows = rows;
2,147,483,647✔
839
      if (newStartOff >= 0) {
2,147,483,647✔
840
        startPos += newStartOff;
29,533,298✔
841
      } else if ((startPos += forwardRows) < pBlock->info.rows) {
2,147,483,647✔
842
        getInitialStartTimeWindow(&pInfo->interval, tsCols[startPos], &nextWin, true);
2,147,483,647✔
843
      }
844
      if (startPos >= pBlock->info.rows) {
2,147,483,647✔
845
        break;
105,024,386✔
846
      }
847
    }
848

849
    if (filterWindowWithLimit(pInfo, &nextWin, tableGroupId, pTaskInfo)) {
2,147,483,647✔
850
      break;
199,446✔
851
    }
852

853
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
2,147,483,647✔
854
    forwardRows = sorted ? getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
2,147,483,647✔
855
                                                    pInfo->binfo.inputTsOrder)
856
                         : getNumOfRowsInTimeWinUnsorted(&pBlock->info, tsCols, &nextWin, startPos);
2,147,483,647✔
857
    if (forwardRows == 0) continue;
2,147,483,647✔
858

859
    // null data, failed to allocate more memory buffer
860
    int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
2,147,483,647✔
861
                                          pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
862
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
2,147,483,647✔
UNCOV
863
      T_LONG_JMP(pTaskInfo->env, code);
×
864
    }
865

866
    // window start(end) key interpolation
867
    code = doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
2,147,483,647✔
868
    if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
869
      T_LONG_JMP(pTaskInfo->env, code);
×
870
    }
871
    // TODO: add to open window? how to close the open windows after input blocks exhausted?
872
#if 0
873
    if ((ascScan && ekey <= pBlock->info.window.ekey) ||
874
        (!ascScan && ekey >= pBlock->info.window.skey)) {
875
      // window start(end) key interpolation
876
      doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
877
    } else if (pInfo->timeWindowInterpo) {
878
      addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
879
    }
880
#endif
881
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, 1);
2,147,483,647✔
882
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
2,147,483,647✔
883
                                          pBlock->info.rows, numOfOutput);
2,147,483,647✔
884
    if (ret != TSDB_CODE_SUCCESS) {
2,147,483,647✔
885
      T_LONG_JMP(pTaskInfo->env, ret);
×
886
    }
887
    doCloseWindow(pResultRowInfo, pInfo, pResult);
2,147,483,647✔
888
  }
889

890
  if (pInfo->timeWindowInterpo) {
124,638,324✔
891
    saveDataBlockLastRow(pInfo->pPrevValues, pBlock, pInfo->pInterpCols);
16,800✔
892
  }
893
  return false;
118,737,795✔
894
}
895

896
void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult) {
2,147,483,647✔
897
  // current result is done in computing final results.
898
  if (pInfo->timeWindowInterpo && isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
2,147,483,647✔
899
    closeResultRow(pResult);
5,334,625✔
900
    SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
5,334,625✔
901
    taosMemoryFree(pNode);
5,334,625✔
902
  }
903
}
2,147,483,647✔
904

905
SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId,
16,800✔
906
                                       SExecTaskInfo* pTaskInfo) {
907
  int32_t         code = TSDB_CODE_SUCCESS;
16,800✔
908
  int32_t         lino = 0;
16,800✔
909
  SOpenWindowInfo openWin = {0};
16,800✔
910
  openWin.pos.pageId = pResult->pageId;
16,800✔
911
  openWin.pos.offset = pResult->offset;
16,800✔
912
  openWin.groupId = groupId;
16,800✔
913
  SListNode* pn = tdListGetTail(pResultRowInfo->openWindow);
16,800✔
914
  if (pn == NULL) {
16,800✔
915
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
16,800✔
916
    QUERY_CHECK_CODE(code, lino, _end);
16,800✔
917
    return openWin.pos;
16,800✔
918
  }
919

920
  SOpenWindowInfo* px = (SOpenWindowInfo*)pn->data;
×
921
  if (px->pos.pageId != openWin.pos.pageId || px->pos.offset != openWin.pos.offset || px->groupId != openWin.groupId) {
×
922
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
×
923
    QUERY_CHECK_CODE(code, lino, _end);
×
924
  }
925

926
_end:
×
927
  if (code != TSDB_CODE_SUCCESS) {
×
928
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
929
    pTaskInfo->code = code;
×
930
    T_LONG_JMP(pTaskInfo->env, code);
×
931
  }
932
  return openWin.pos;
×
933
}
934

935
int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo) {
124,634,082✔
936
  TSKEY* tsCols = NULL;
124,634,082✔
937

938
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
124,634,082✔
939
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
124,662,036✔
940
    if (!pColDataInfo) {
124,614,728✔
941
      pTaskInfo->code = terrno;
×
942
      T_LONG_JMP(pTaskInfo->env, terrno);
×
943
    }
944

945
    tsCols = (int64_t*)pColDataInfo->pData;
124,614,728✔
946
    if (tsCols[0] == 0) {
124,612,123✔
947
      qWarn("%s at line %d.block start ts:%" PRId64 ",end ts:%" PRId64, __func__, __LINE__, tsCols[0],
379✔
948
            tsCols[pBlock->info.rows - 1]);
949
    }
950

951
    if (tsCols[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
124,694,188✔
952
      int32_t code = blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
9,584,567✔
953
      if (code != TSDB_CODE_SUCCESS) {
9,552,508✔
954
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
955
        pTaskInfo->code = code;
×
956
        T_LONG_JMP(pTaskInfo->env, code);
×
957
      }
958
    }
959
  }
960

961
  return tsCols;
124,620,452✔
962
}
963

964
static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
35,174,195✔
965
  if (OPTR_IS_OPENED(pOperator)) {
35,174,195✔
966
    return TSDB_CODE_SUCCESS;
20,501,390✔
967
  }
968

969
  int32_t        code = TSDB_CODE_SUCCESS;
14,665,422✔
970
  int32_t        lino = 0;
14,665,422✔
971
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
14,665,422✔
972
  SOperatorInfo* downstream = pOperator->pDownstream[0];
14,633,287✔
973

974
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
14,672,805✔
975
  SExprSupp*                pSup = &pOperator->exprSupp;
14,683,607✔
976

977
  int32_t scanFlag = MAIN_SCAN;
14,653,847✔
978
  int64_t st = taosGetTimestampUs();
14,655,600✔
979

980
  pInfo->cleanGroupResInfo = false;
14,655,600✔
981
  while (1) {
119,452,340✔
982
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
134,104,327✔
983
    if (pBlock == NULL) {
134,086,737✔
984
      break;
13,622,526✔
985
    }
986

987
    pInfo->binfo.pRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
120,464,211✔
988

989
    if (pInfo->scalarSupp.pExprInfo != NULL) {
120,516,661✔
990
      SExprSupp* pExprSup = &pInfo->scalarSupp;
14,803,168✔
991
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
14,802,253✔
992
                                   GET_STM_RTINFO(pOperator->pTaskInfo));
14,800,378✔
993
      QUERY_CHECK_CODE(code, lino, _end);
14,803,633✔
994
    }
995

996
    // the pDataBlock are always the same one, no need to call this again
997
    code = setInputDataBlock(pSup, pBlock, pInfo->binfo.inputTsOrder, scanFlag, true);
119,477,204✔
998
    QUERY_CHECK_CODE(code, lino, _end);
119,492,236✔
999
    if (hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag)) break;
119,492,236✔
1000
  }
1001

1002
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->binfo.outputTsOrder);
13,659,583✔
1003
  QUERY_CHECK_CODE(code, lino, _end);
13,659,118✔
1004
  pInfo->cleanGroupResInfo = true;
13,659,118✔
1005

1006
  OPTR_SET_OPENED(pOperator);
13,658,653✔
1007

1008
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
13,658,653✔
1009

1010
_end:
14,686,397✔
1011
  if (code != TSDB_CODE_SUCCESS) {
14,686,397✔
1012
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1,027,744✔
1013
    pTaskInfo->code = code;
1,027,744✔
1014
    T_LONG_JMP(pTaskInfo->env, code);
1,027,744✔
1015
  }
1016
  return code;
13,658,653✔
1017
}
1018

1019
// start a new state window and record the start info
1020
void doKeepNewStateWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList,
2,147,483,647✔
1021
  int32_t rowIndex, uint64_t groupId, const EStateWinExtendOption* extendOption, bool hasPrevWin) {
1022
  pRowSup->groupId = groupId;
2,147,483,647✔
1023
  if (*extendOption == STATE_WIN_EXTEND_OPTION_DEFAULT ||
2,147,483,647✔
1024
      *extendOption == STATE_WIN_EXTEND_OPTION_BACKWARD) {
1,841,730,732✔
1025
    pRowSup->win.skey = tsList[rowIndex];
1,980,491,446✔
1026
    pRowSup->startRowIndex = rowIndex;
1,980,491,446✔
1027
    pRowSup->numOfRows = 0;  // does not include the current row yet
1,980,491,446✔
1028
  } else {
1029
    pRowSup->startRowIndex = hasContinuousNullRows(pRowSup) ?
2,147,483,647✔
1030
      rowIndex - pRowSup->numNullRows : rowIndex;
1,180,612,174✔
1031
    pRowSup->win.skey = hasPrevWin ?
1,180,612,174✔
1032
                        pRowSup->win.ekey + 1 : tsList[pRowSup->startRowIndex];
1,180,612,174✔
1033
    pRowSup->numOfRows = rowIndex - pRowSup->startRowIndex;
1,180,612,174✔
1034
  }
1035
  resetNumNullRows(pRowSup);
2,147,483,647✔
1036
}
2,147,483,647✔
1037

1038
// close a state window and record its end info
1039
// this functions is called when a new state row appears
1040
// @param rowIndex the index of the first row of next window
1041
void doKeepCurStateWindowEndInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, 
2,147,483,647✔
1042
                                 int32_t rowIndex,
1043
                                 const EStateWinExtendOption* extendOption,
1044
                                 bool hasNextWin) {
1045
  if (*extendOption == STATE_WIN_EXTEND_OPTION_BACKWARD) {
2,147,483,647✔
1046
      pRowSup->win.ekey = hasNextWin?
661,413,788✔
1047
                          tsList[rowIndex] - 1 : pRowSup->prevTs;
661,413,788✔
1048
      // continuous rows having null state col should be included in this window
1049
      pRowSup->numOfRows += hasContinuousNullRows(pRowSup) ?
1,322,827,576✔
1050
        pRowSup->numNullRows : 0;
661,413,788✔
1051
      resetNumNullRows(pRowSup);
661,413,788✔
1052
  }
1053
}
2,147,483,647✔
1054

1055
void doKeepStateWindowNullInfo(SWindowRowsSup* pRowSup, TSKEY nullRowTs) {
877,328,237✔
1056
  pRowSup->numNullRows += 1;
877,328,237✔
1057
  pRowSup->prevTs = nullRowTs;
877,328,237✔
1058
}
877,328,237✔
1059

1060
/**
1061
  @brief Process the closed state window and do aggregation on the tuples
1062
  within the window. Partial results are stored in the output buffer. If window
1063
  has no valid rows, return success.
1064
*/
1065
static int32_t processClosedStateWindow(SStateWindowOperatorInfo* pInfo,
2,147,483,647✔
1066
                                        SWindowRowsSup* pRowSup,
1067
                                        SExecTaskInfo* pTaskInfo,
1068
                                        SExprSupp* pSup,
1069
                                        int32_t numOfOutput) {
1070
  if (pRowSup->numOfRows == 0) {
2,147,483,647✔
1071
    // no valid rows in the window
1072
    return TSDB_CODE_SUCCESS;
16,091,614✔
1073
  }
1074
  int32_t     code = TSDB_CODE_SUCCESS;
2,147,483,647✔
1075
  int32_t     lino = 0;
2,147,483,647✔
1076
  SResultRow* pResult = NULL;
2,147,483,647✔
1077
  code = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win,
2,147,483,647✔
1078
    true, &pResult, pRowSup->groupId, pSup->pCtx, numOfOutput,
2,147,483,647✔
1079
    pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1080
  QUERY_CHECK_CODE(code, lino, _return);
2,147,483,647✔
1081

1082
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
2,147,483,647✔
1083
  code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx,
2,147,483,647✔
1084
    &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
1085
    pRowSup->numOfRows, 0, numOfOutput);
1086
  QUERY_CHECK_CODE(code, lino, _return);
2,147,483,647✔
1087

1088
_return:
2,147,483,647✔
1089
  if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
1090
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
427✔
1091
  }
1092
  return code;
2,147,483,647✔
1093
}
1094

1095
// process a data block for state window aggregation
1096
// scan from startIndex to endIndex
1097
// numPartialCalcRows returns the number of rows that have been
1098
// partially calculated within the block
1099
static void doStateWindowAggImpl(SOperatorInfo* pOperator,
24,843,045✔
1100
                                 SStateWindowOperatorInfo* pInfo,
1101
                                 SSDataBlock* pBlock, int32_t* startIndex,
1102
                                 int32_t* endIndex,
1103
                                 int32_t* numPartialCalcRows) {
1104
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
24,843,045✔
1105
  SExprSupp*     pExprSup = &pOperator->exprSupp;
24,843,045✔
1106

1107
  SColumnInfoData* pStateColInfoData = 
1108
    taosArrayGet(pBlock->pDataBlock, pInfo->stateCol.slotId);
24,843,045✔
1109
  if (!pStateColInfoData) {
24,843,045✔
1110
    pTaskInfo->code = terrno;
×
1111
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1112
  }
1113
  uint64_t gid = pBlock->info.id.groupId;
24,843,045✔
1114
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
24,843,045✔
1115
  int32_t bytes = pStateColInfoData->info.bytes;
24,843,045✔
1116

1117
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock,
24,843,045✔
1118
                                               pInfo->tsSlotId);
24,843,045✔
1119
  if (NULL == pColInfoData) {
24,843,045✔
1120
    pTaskInfo->code = terrno;
×
1121
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1122
  }
1123
  TSKEY* tsList = (TSKEY*)pColInfoData->pData;
24,843,045✔
1124

1125
  struct SColumnDataAgg* pAgg = (pBlock->pBlockAgg != NULL) ?
24,843,045✔
1126
                                &pBlock->pBlockAgg[pInfo->stateCol.slotId] :
24,843,045✔
1127
                                NULL;
1128
  EStateWinExtendOption  extendOption = pInfo->extendOption;
24,843,045✔
1129
  SWindowRowsSup*        pRowSup = &pInfo->winSup;
24,843,045✔
1130

1131
  if (pRowSup->groupId != gid) {
24,843,045✔
1132
    /*
1133
      group changed, process the previous group's unclosed state window first
1134
    */
1135
    doKeepCurStateWindowEndInfo(pRowSup, tsList, 0, &extendOption, false);
10,777,034✔
1136
    int32_t code = processClosedStateWindow(pInfo, pRowSup, pTaskInfo,
10,777,034✔
1137
                                            pExprSup, numOfOutput);
1138
    if (TSDB_CODE_SUCCESS != code) T_LONG_JMP(pTaskInfo->env, code);
10,777,034✔
1139
    *numPartialCalcRows = pRowSup->startRowIndex + pRowSup->numOfRows;
10,777,034✔
1140

1141
    /*
1142
      unhandled null rows should be ignored, since they belong to previous group
1143
    */
1144
    *numPartialCalcRows += pRowSup->numNullRows;
10,777,034✔
1145

1146
    /*
1147
      reset state window info for new group
1148
    */
1149
    pInfo->hasKey = false;
10,777,034✔
1150
    resetWindowRowsSup(pRowSup);
10,777,034✔
1151
  }
1152

1153
  for (int32_t j = *startIndex; j < *endIndex; ++j) {
2,147,483,647✔
1154
    if (pBlock->info.scanFlag != PRE_SCAN) {
2,147,483,647✔
1155
      if (pInfo->winSup.lastTs == INT64_MIN || gid != pRowSup->groupId || !pInfo->hasKey) {
2,147,483,647✔
1156
        pInfo->winSup.lastTs = tsList[j];
369,744,661✔
1157
      } else {
1158
        if (tsList[j] == pInfo->winSup.lastTs) {
2,147,483,647✔
1159
          // forbid duplicated ts rows
1160
          qError("%s:%d duplicated ts found in state window aggregation", __FILE__, __LINE__);
123,199✔
1161
          pTaskInfo->code = TSDB_CODE_QRY_WINDOW_DUP_TIMESTAMP;
123,199✔
1162
          T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_WINDOW_DUP_TIMESTAMP);
123,199✔
1163
        } else {
1164
          pInfo->winSup.lastTs = tsList[j];
2,147,483,647✔
1165
        }
1166
      }
1167
    }
1168
    if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pAgg)) {
2,147,483,647✔
1169
      doKeepStateWindowNullInfo(pRowSup, tsList[j]);
877,328,237✔
1170
      continue;
877,328,237✔
1171
    }
1172
    if (pStateColInfoData->pData == NULL) {
2,147,483,647✔
1173
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1174
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1175
    }
1176
    char* val = colDataGetData(pStateColInfoData, j);
2,147,483,647✔
1177

1178
    if (!pInfo->hasKey) {
2,147,483,647✔
1179
      assignVal(pInfo->stateKey.pData, val, bytes, pInfo->stateKey.type);
6,880,681✔
1180
      pInfo->hasKey = true;
6,880,681✔
1181
      doKeepNewStateWindowStartInfo(
6,880,681✔
1182
        pRowSup, tsList, j, gid, &extendOption, false);
1183
      doKeepTuple(pRowSup, tsList[j], j, gid);
6,880,681✔
1184
    } else if (!compareVal(val, &pInfo->stateKey)) {
2,147,483,647✔
1185
      doKeepCurStateWindowEndInfo(pRowSup, tsList, j, &extendOption, true);
2,147,483,647✔
1186
      int32_t code = processClosedStateWindow(pInfo, pRowSup, pTaskInfo,
2,147,483,647✔
1187
                                              pExprSup, numOfOutput);
1188
      if (TSDB_CODE_SUCCESS != code) {
2,147,483,647✔
1189
        T_LONG_JMP(pTaskInfo->env, code);
×
1190
      }
1191
      *numPartialCalcRows = pRowSup->startRowIndex + pRowSup->numOfRows;
2,147,483,647✔
1192

1193
      doKeepNewStateWindowStartInfo(pRowSup, tsList, j, gid,
2,147,483,647✔
1194
                                    &extendOption, true);
1195
      doKeepTuple(pRowSup, tsList[j], j, gid);
2,147,483,647✔
1196
      assignVal(pInfo->stateKey.pData, val, bytes, pInfo->stateKey.type);
2,147,483,647✔
1197
    } else {
1198
      doKeepTuple(pRowSup, tsList[j], j, gid);
2,147,483,647✔
1199
    }
1200
  }
1201

1202
  if (!pInfo->hasKey && extendOption != STATE_WIN_EXTEND_OPTION_FORWARD) {
24,719,846✔
1203
    /*
1204
      No valid state rows within the block and we don't care about
1205
      null rows before valid state window, mark them as processed and drop them
1206
    */
1207
    *numPartialCalcRows = pBlock->info.rows;
3,233,694✔
1208
    resetNumNullRows(pRowSup);
3,233,694✔
1209
    return;
3,233,694✔
1210
  }
1211
  if (pRowSup->numOfRows == 0 && 
21,486,152✔
1212
      extendOption != STATE_WIN_EXTEND_OPTION_BACKWARD) {
3,862,199✔
1213
    /*
1214
      If no valid state window or we don't know the belonging of
1215
      null rows in the end of the block, handle them with next block
1216
    */
1217
    return;
3,855,417✔
1218
  }
1219
  doKeepCurStateWindowEndInfo(pRowSup, tsList, *endIndex, &extendOption, false);
17,630,735✔
1220
  int32_t code = processClosedStateWindow(pInfo, pRowSup, pTaskInfo,
17,630,735✔
1221
                                          pExprSup, numOfOutput);
1222
  if (TSDB_CODE_SUCCESS != code) {
17,630,735✔
1223
    pTaskInfo->code = code;
427✔
1224
    T_LONG_JMP(pTaskInfo->env, code);
427✔
1225
  }
1226
  *numPartialCalcRows = pRowSup->startRowIndex + pRowSup->numOfRows;
17,630,308✔
1227
  // reset part of pRowSup after doing agg calculation
1228
  pRowSup->startRowIndex = 0;
17,630,308✔
1229
  pRowSup->numOfRows = 0;
17,630,308✔
1230
}
1231

1232
static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) {
5,362,237✔
1233
  if (OPTR_IS_OPENED(pOperator)) {
5,362,237✔
1234
    return TSDB_CODE_SUCCESS;
2,328,974✔
1235
  }
1236

1237
  int32_t                   code = TSDB_CODE_SUCCESS;
3,033,263✔
1238
  int32_t                   lino = 0;
3,033,263✔
1239
  SStateWindowOperatorInfo* pInfo = pOperator->info;
3,033,263✔
1240
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
3,033,263✔
1241

1242
  SExprSupp* pSup = &pOperator->exprSupp;
3,033,263✔
1243
  int32_t    order = pInfo->binfo.inputTsOrder;
3,033,263✔
1244
  int64_t    st = taosGetTimestampUs();
3,033,263✔
1245

1246
  SOperatorInfo* downstream = pOperator->pDownstream[0];
3,033,263✔
1247
  pInfo->cleanGroupResInfo = false;
3,033,263✔
1248

1249
  SSDataBlock* pUnfinishedBlock = NULL;
3,033,263✔
1250
  int32_t      startIndex = 0;
3,033,263✔
1251
  int32_t      endIndex = 0;
3,033,263✔
1252
  int32_t      numPartialCalcRows = 0;
3,033,263✔
1253
  while (1) {
24,719,419✔
1254
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
27,752,682✔
1255
    if (pBlock == NULL) {
27,752,682✔
1256
      if (pUnfinishedBlock != NULL) {
2,226,725✔
1257
        blockDataDestroy(pUnfinishedBlock);
29,205✔
1258
        pUnfinishedBlock = NULL;
29,205✔
1259
        resetWindowRowsSup(&pInfo->winSup);
29,205✔
1260
      }
1261
      break;
2,226,725✔
1262
    }
1263
    
1264
    // mark whether pUnfinishedBlock is a reference to pBlock
1265
    bool isRef = false;
25,525,957✔
1266
    startIndex = 0;
25,525,957✔
1267
    if (pUnfinishedBlock != NULL) {
25,525,957✔
1268
      startIndex = pUnfinishedBlock->info.rows;
7,437,207✔
1269
      // merge unfinished block with current block
1270
      code = blockDataMerge(pUnfinishedBlock, pBlock);
7,437,207✔
1271
      // reset id to current block id
1272
      pUnfinishedBlock->info.id = pBlock->info.id;
7,437,207✔
1273
      QUERY_CHECK_CODE(code, lino, _end);
7,437,207✔
1274
    } else {
1275
      pUnfinishedBlock = pBlock;
18,088,750✔
1276
      isRef = true;
18,088,750✔
1277
    }
1278
    endIndex = pUnfinishedBlock->info.rows;
25,525,957✔
1279

1280
    pInfo->binfo.pRes->info.scanFlag = pUnfinishedBlock->info.scanFlag;
25,525,957✔
1281
    code = setInputDataBlock(
25,525,957✔
1282
      pSup, pUnfinishedBlock, order, pUnfinishedBlock->info.scanFlag, true);
25,525,957✔
1283
    QUERY_CHECK_CODE(code, lino, _end);
25,525,957✔
1284

1285
    code = blockDataUpdateTsWindow(pUnfinishedBlock, pInfo->tsSlotId);
25,525,957✔
1286
    QUERY_CHECK_CODE(code, lino, _end);
25,525,957✔
1287

1288
    // there is an scalar expression that 
1289
    // needs to be calculated right before apply the group aggregation.
1290
    if (pInfo->scalarSup.pExprInfo != NULL) {
25,525,957✔
1291
      pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo,
1,840,617✔
1292
        pUnfinishedBlock, pUnfinishedBlock, pInfo->scalarSup.pCtx,
1293
        pInfo->scalarSup.numOfExprs, NULL,
1294
        GET_STM_RTINFO(pOperator->pTaskInfo));
1,840,617✔
1295
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
1,840,617✔
1296
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
682,912✔
1297
      }
1298
    }
1299

1300
    doStateWindowAggImpl(pOperator, pInfo, pUnfinishedBlock, 
24,843,045✔
1301
      &startIndex, &endIndex, &numPartialCalcRows);
1302
    if (numPartialCalcRows < pUnfinishedBlock->info.rows) {
24,719,419✔
1303
      // save unfinished block for next round processing
1304
      if (isRef) {
7,466,412✔
1305
        code = createOneDataBlock(pBlock, true, &pUnfinishedBlock);
2,895,782✔
1306
        QUERY_CHECK_CODE(code, lino, _end);
2,895,782✔
1307
      }
1308
      code = blockDataTrimFirstRows(pUnfinishedBlock, numPartialCalcRows);
7,466,412✔
1309
      QUERY_CHECK_NULL(pUnfinishedBlock, code, lino, _end, terrno);
7,466,412✔
1310
    } else {
1311
      if (!isRef) {
17,253,007✔
1312
        blockDataDestroy(pUnfinishedBlock);
2,866,577✔
1313
      }
1314
      pUnfinishedBlock = NULL;
17,253,007✔
1315
    }
1316
    numPartialCalcRows = 0;
24,719,419✔
1317
  }
1318

1319
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
2,226,725✔
1320
  code = initGroupedResultInfo(
2,226,725✔
1321
    &pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
1322
  QUERY_CHECK_CODE(code, lino, _end);
2,226,725✔
1323
  pInfo->cleanGroupResInfo = true;
2,226,725✔
1324
  pOperator->status = OP_RES_TO_RETURN;
2,226,725✔
1325

1326
_end:
2,226,725✔
1327
  if (code != TSDB_CODE_SUCCESS) {
2,226,725✔
1328
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1329
    pTaskInfo->code = code;
×
1330
    T_LONG_JMP(pTaskInfo->env, code);
×
1331
  }
1332
  return code;
2,226,725✔
1333
}
1334

1335
static int32_t doStateWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
6,875,518✔
1336
  if (pOperator->status == OP_EXEC_DONE) {
6,875,518✔
1337
    (*ppRes) = NULL;
1,513,281✔
1338
    return TSDB_CODE_SUCCESS;
1,513,281✔
1339
  }
1340

1341
  int32_t                   code = TSDB_CODE_SUCCESS;
5,362,237✔
1342
  int32_t                   lino = 0;
5,362,237✔
1343
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
5,362,237✔
1344
  SStateWindowOperatorInfo* pInfo = pOperator->info;
5,362,237✔
1345
  SOptrBasicInfo*           pBInfo = &pInfo->binfo;
5,362,237✔
1346

1347
  code = pOperator->fpSet._openFn(pOperator);
5,362,237✔
1348
  QUERY_CHECK_CODE(code, lino, _end);
4,555,699✔
1349

1350
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
4,555,699✔
1351
  QUERY_CHECK_CODE(code, lino, _end);
4,555,699✔
1352

1353
  while (1) {
×
1354
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
4,555,699✔
1355
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
4,555,699✔
1356
    QUERY_CHECK_CODE(code, lino, _end);
4,555,699✔
1357

1358
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
4,555,699✔
1359
    if (!hasRemain) {
4,555,699✔
1360
      setOperatorCompleted(pOperator);
2,170,245✔
1361
      break;
2,170,245✔
1362
    }
1363

1364
    if (pBInfo->pRes->info.rows > 0) {
2,385,454✔
1365
      break;
2,385,454✔
1366
    }
1367
  }
1368

1369
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
4,555,699✔
1370

1371
_end:
4,555,699✔
1372
  if (code != TSDB_CODE_SUCCESS) {
4,555,699✔
1373
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1374
    pTaskInfo->code = code;
×
1375
    T_LONG_JMP(pTaskInfo->env, code);
×
1376
  }
1377
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
4,555,699✔
1378
  return code;
4,555,699✔
1379
}
1380

1381
static int32_t doBuildIntervalResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
44,982,245✔
1382
  int32_t                   code = TSDB_CODE_SUCCESS;
44,982,245✔
1383
  int32_t                   lino = 0;
44,982,245✔
1384
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
44,982,245✔
1385
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
44,991,187✔
1386

1387
  if (pOperator->status == OP_EXEC_DONE) {
44,984,570✔
1388
    (*ppRes) = NULL;
9,808,515✔
1389
    return code;
9,808,515✔
1390
  }
1391

1392
  SSDataBlock* pBlock = pInfo->binfo.pRes;
35,126,300✔
1393
  code = pOperator->fpSet._openFn(pOperator);
35,175,590✔
1394
  QUERY_CHECK_CODE(code, lino, _end);
34,160,508✔
1395

1396
  while (1) {
5,670✔
1397
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
34,166,178✔
1398
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL, NULL);
34,166,643✔
1399
    QUERY_CHECK_CODE(code, lino, _end);
34,166,265✔
1400

1401
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
34,166,265✔
1402
    if (!hasRemain) {
34,166,265✔
1403
      setOperatorCompleted(pOperator);
13,648,455✔
1404
      break;
13,648,833✔
1405
    }
1406

1407
    if (pBlock->info.rows > 0) {
20,517,810✔
1408
      break;
20,512,140✔
1409
    }
1410
  }
1411

1412
  size_t rows = pBlock->info.rows;
34,160,973✔
1413
  pOperator->resultInfo.totalRows += rows;
34,160,595✔
1414

1415
_end:
34,160,130✔
1416
  if (code != TSDB_CODE_SUCCESS) {
34,160,130✔
1417
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1418
    pTaskInfo->code = code;
×
1419
    T_LONG_JMP(pTaskInfo->env, code);
×
1420
  }
1421
  (*ppRes) = (rows == 0) ? NULL : pBlock;
34,160,130✔
1422
  return code;
34,160,130✔
1423
}
1424

1425
static void destroyStateWindowOperatorInfo(void* param) {
2,995,360✔
1426
  if (param == NULL) {
2,995,360✔
1427
    return;
×
1428
  }
1429
  SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
2,995,360✔
1430
  cleanupBasicInfo(&pInfo->binfo);
2,995,360✔
1431
  taosMemoryFreeClear(pInfo->stateKey.pData);
2,995,360✔
1432
  if (pInfo->pOperator) {
2,995,360✔
1433
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
2,995,360✔
1434
                      pInfo->cleanGroupResInfo);
2,995,360✔
1435
    pInfo->pOperator = NULL;
2,995,360✔
1436
  }
1437

1438
  cleanupExprSupp(&pInfo->scalarSup);
2,995,360✔
1439
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
2,995,360✔
1440
  cleanupAggSup(&pInfo->aggSup);
2,995,360✔
1441
  cleanupGroupResInfo(&pInfo->groupResInfo);
2,995,360✔
1442

1443
  taosMemoryFreeClear(param);
2,995,360✔
1444
}
1445

1446
static void freeItem(void* param) {
31,496✔
1447
  SGroupKeys* pKey = (SGroupKeys*)param;
31,496✔
1448
  taosMemoryFree(pKey->pData);
31,496✔
1449
}
31,496✔
1450

1451
void destroyIntervalOperatorInfo(void* param) {
17,322,460✔
1452
  if (param == NULL) {
17,322,460✔
1453
    return;
×
1454
  }
1455

1456
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
17,322,460✔
1457

1458
  cleanupBasicInfo(&pInfo->binfo);
17,322,460✔
1459
  if (pInfo->pOperator) {
17,319,205✔
1460
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
16,392,569✔
1461
                      pInfo->cleanGroupResInfo);
16,390,244✔
1462
    pInfo->pOperator = NULL;
16,386,573✔
1463
  }
1464

1465
  cleanupAggSup(&pInfo->aggSup);
17,319,719✔
1466
  cleanupExprSupp(&pInfo->scalarSupp);
17,314,033✔
1467

1468
  pInfo->binfo.resultRowInfo.openWindow = tdListFree(pInfo->binfo.resultRowInfo.openWindow);
17,324,320✔
1469

1470
  taosArrayDestroy(pInfo->pInterpCols);
17,322,868✔
1471
  pInfo->pInterpCols = NULL;
17,317,810✔
1472

1473
  taosArrayDestroyEx(pInfo->pPrevValues, freeItem);
17,316,415✔
1474
  pInfo->pPrevValues = NULL;
17,310,400✔
1475

1476
  cleanupGroupResInfo(&pInfo->groupResInfo);
17,309,470✔
1477
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
17,316,415✔
1478
  destroyBoundedQueue(pInfo->pBQ);
17,311,901✔
1479
  taosMemoryFreeClear(param);
17,309,984✔
1480
}
1481

1482
static int32_t initWindowInterpPrevVal(SIntervalAggOperatorInfo* pInfo) {
15,748✔
1483
  int32_t code = TSDB_CODE_SUCCESS;
15,748✔
1484
  int32_t lino = 0;
15,748✔
1485
  void*   tmp = NULL;
15,748✔
1486

1487
  pInfo->pInterpCols = taosArrayInit(4, sizeof(SColumn));
15,748✔
1488
  QUERY_CHECK_NULL(pInfo->pInterpCols, code, lino, _end, terrno);
15,748✔
1489

1490
  pInfo->pPrevValues = taosArrayInit(4, sizeof(SGroupKeys));
15,748✔
1491
  QUERY_CHECK_NULL(pInfo->pPrevValues, code, lino, _end, terrno);
15,748✔
1492

1493
  {  // ts column
1494
    SColumn c = {0};
15,748✔
1495
    c.colId = 1;
15,748✔
1496
    c.slotId = pInfo->primaryTsIndex;
15,748✔
1497
    c.type = TSDB_DATA_TYPE_TIMESTAMP;
15,748✔
1498
    c.bytes = sizeof(int64_t);
15,748✔
1499
    tmp = taosArrayPush(pInfo->pInterpCols, &c);
15,748✔
1500
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
15,748✔
1501

1502
    SGroupKeys key;
15,748✔
1503
    key.bytes = c.bytes;
15,748✔
1504
    key.type = c.type;
15,748✔
1505
    key.isNull = true;  // to denote no value is assigned yet
15,748✔
1506
    key.pData = taosMemoryCalloc(1, c.bytes);
15,748✔
1507
    QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
15,748✔
1508

1509
    tmp = taosArrayPush(pInfo->pPrevValues, &key);
15,748✔
1510
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
15,748✔
1511
  }
1512
_end:
15,748✔
1513
  if (code != TSDB_CODE_SUCCESS) {
15,748✔
1514
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1515
  }
1516
  return code;
15,748✔
1517
}
1518

1519
static int32_t timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SIntervalAggOperatorInfo* pInfo,
16,380,538✔
1520
                                      bool* pRes) {
1521
  // the primary timestamp column
1522
  bool    needed = false;
16,380,538✔
1523
  int32_t code = TSDB_CODE_SUCCESS;
16,380,538✔
1524
  int32_t lino = 0;
16,380,538✔
1525
  void*   tmp = NULL;
16,380,538✔
1526

1527
  for (int32_t i = 0; i < numOfCols; ++i) {
48,813,265✔
1528
    SExprInfo* pExpr = pCtx[i].pExpr;
32,496,343✔
1529
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
32,483,123✔
1530
      needed = true;
15,748✔
1531
      break;
15,748✔
1532
    }
1533
  }
1534

1535
  if (needed) {
16,332,670✔
1536
    code = initWindowInterpPrevVal(pInfo);
15,748✔
1537
    QUERY_CHECK_CODE(code, lino, _end);
15,748✔
1538
  }
1539

1540
  for (int32_t i = 0; i < numOfCols; ++i) {
48,705,293✔
1541
    SExprInfo* pExpr = pCtx[i].pExpr;
32,433,809✔
1542

1543
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
32,423,557✔
1544
      SFunctParam* pParam = &pExpr->base.pParam[0];
15,748✔
1545

1546
      SColumn c = *pParam->pCol;
15,748✔
1547
      tmp = taosArrayPush(pInfo->pInterpCols, &c);
15,748✔
1548
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
15,748✔
1549

1550
      SGroupKeys key = {0};
15,748✔
1551
      key.bytes = c.bytes;
15,748✔
1552
      key.type = c.type;
15,748✔
1553
      key.isNull = false;
15,748✔
1554
      key.pData = taosMemoryCalloc(1, c.bytes);
15,748✔
1555
      QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
15,748✔
1556

1557
      tmp = taosArrayPush(pInfo->pPrevValues, &key);
15,748✔
1558
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
15,748✔
1559
    }
1560
  }
1561

1562
_end:
16,271,484✔
1563
  if (code != TSDB_CODE_SUCCESS) {
16,291,843✔
1564
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1565
  }
1566
  *pRes = needed;
16,291,843✔
1567
  return code;
16,362,169✔
1568
}
1569

1570
static int32_t resetInterval(SOperatorInfo* pOper, SIntervalAggOperatorInfo* pIntervalInfo){
1,464✔
1571
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
1,464✔
1572
  SIntervalPhysiNode* pPhynode = (SIntervalPhysiNode*)pOper->pPhyNode;
1,464✔
1573
  pOper->status = OP_NOT_OPENED;
1,464✔
1574

1575
  resetBasicOperatorState(&pIntervalInfo->binfo);
1,464✔
1576
  cleanupResultInfo(pIntervalInfo->pOperator->pTaskInfo, &pIntervalInfo->pOperator->exprSupp, &pIntervalInfo->groupResInfo, &pIntervalInfo->aggSup,
1,464✔
1577
    pIntervalInfo->cleanGroupResInfo);
1,464✔
1578

1579
  colDataDestroy(&pIntervalInfo->twAggSup.timeWindowData);
1,464✔
1580
  int32_t code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pTaskInfo->window);
1,464✔
1581
  if (code == 0) {
1,464✔
1582
    code = resetAggSup(&pOper->exprSupp, &pIntervalInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
2,928✔
1583
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
1,464✔
1584
                       &pTaskInfo->storageAPI.functionStore);
1585
  }
1586
  if (code == 0) {
1,464✔
1587
    code = resetExprSupp(&pIntervalInfo->scalarSupp, pTaskInfo, pPhynode->window.pExprs, NULL,
1,464✔
1588
                         &pTaskInfo->storageAPI.functionStore);
1589
  }
1590

1591
  if (pIntervalInfo->binfo.resultRowInfo.openWindow != NULL){
1,464✔
1592
    tdListEmpty(pIntervalInfo->binfo.resultRowInfo.openWindow);
×
1593
  }
1594

1595
  if (pPhynode->window.node.pSlimit && ((SLimitNode*)pPhynode->window.node.pSlimit)->limit) {
1,464✔
1596
    pIntervalInfo->curGroupId = UINT64_MAX;
×
1597
  }
1598

1599
  pIntervalInfo->cleanGroupResInfo = false;
1,464✔
1600
  pIntervalInfo->handledGroupNum = 0;
1,464✔
1601
  pIntervalInfo->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder;
1,464✔
1602
  pIntervalInfo->binfo.outputTsOrder = pPhynode->window.node.outputTsOrder;
1,464✔
1603

1604
  taosArrayDestroy(pIntervalInfo->pInterpCols);
1,464✔
1605
  pIntervalInfo->pInterpCols = NULL;
1,464✔
1606

1607
  if (pIntervalInfo->pPrevValues != NULL) {
1,464✔
1608
    taosArrayDestroyEx(pIntervalInfo->pPrevValues, freeItem);
×
1609
    pIntervalInfo->pPrevValues = NULL;
×
1610
    code = initWindowInterpPrevVal(pIntervalInfo);
×
1611
  }
1612

1613
  cleanupGroupResInfo(&pIntervalInfo->groupResInfo);
1,464✔
1614
  destroyBoundedQueue(pIntervalInfo->pBQ);
1,464✔
1615
  pIntervalInfo->pBQ = NULL;
1,464✔
1616
  return code;
1,464✔
1617
}
1618

1619
static int32_t resetIntervalOperState(SOperatorInfo* pOper) {
1,464✔
1620
  SIntervalAggOperatorInfo* pInfo = pOper->info;
1,464✔
1621
  return resetInterval(pOper, pInfo);
1,464✔
1622
}
1623

1624
int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
15,673,143✔
1625
                                   SOperatorInfo** pOptrInfo) {
1626
  QRY_PARAM_CHECK(pOptrInfo);
15,673,143✔
1627

1628
  int32_t                   code = TSDB_CODE_SUCCESS;
15,678,320✔
1629
  int32_t                   lino = 0;
15,678,320✔
1630
  SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
15,678,320✔
1631
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
15,588,851✔
1632
  if (pInfo == NULL || pOperator == NULL) {
15,606,199✔
1633
    code = terrno;
50✔
1634
    lino = __LINE__;
×
1635
    goto _error;
×
1636
  }
1637

1638
  pOperator->pPhyNode = pPhyNode;
15,608,009✔
1639
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->window.node.pOutputDataBlockDesc);
15,609,876✔
1640
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
15,688,048✔
1641
  initBasicInfo(&pInfo->binfo, pResBlock);
15,688,048✔
1642

1643
  SExprSupp* pSup = &pOperator->exprSupp;
15,672,809✔
1644
  pSup->hasWindowOrGroup = true;
15,669,403✔
1645
  pSup->hasWindow = true;
15,679,646✔
1646

1647
  pInfo->primaryTsIndex = ((SColumnNode*)pPhyNode->window.pTspk)->slotId;
15,669,457✔
1648

1649
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
15,673,216✔
1650
  initResultSizeInfo(&pOperator->resultInfo, 512);
15,673,216✔
1651
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
15,655,734✔
1652
  QUERY_CHECK_CODE(code, lino, _error);
15,677,818✔
1653

1654
  int32_t    num = 0;
15,677,818✔
1655
  SExprInfo* pExprInfo = NULL;
15,678,283✔
1656
  code = createExprInfo(pPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
15,677,818✔
1657
  QUERY_CHECK_CODE(code, lino, _error);
15,672,900✔
1658

1659
  code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
15,672,900✔
1660
                    &pTaskInfo->storageAPI.functionStore);
1661
  QUERY_CHECK_CODE(code, lino, _error);
15,669,046✔
1662

1663
  SInterval interval = {.interval = pPhyNode->interval,
46,899,638✔
1664
                        .sliding = pPhyNode->sliding,
15,629,972✔
1665
                        .intervalUnit = pPhyNode->intervalUnit,
15,671,371✔
1666
                        .slidingUnit = pPhyNode->slidingUnit,
15,660,600✔
1667
                        .offset = pPhyNode->offset,
15,661,058✔
1668
                        .precision = ((SColumnNode*)pPhyNode->window.pTspk)->node.resType.precision,
15,670,358✔
1669
                        .timeRange = pPhyNode->timeRange};
1670
  calcIntervalAutoOffset(&interval);
15,622,004✔
1671

1672
  STimeWindowAggSupp as = {
15,633,122✔
1673
      .maxTs = INT64_MIN,
1674
  };
1675

1676
  pInfo->win = pTaskInfo->window;
15,633,122✔
1677
  pInfo->binfo.inputTsOrder = pPhyNode->window.node.inputTsOrder;
15,668,525✔
1678
  pInfo->binfo.outputTsOrder = pPhyNode->window.node.outputTsOrder;
15,646,669✔
1679
  pInfo->interval = interval;
15,676,510✔
1680
  pInfo->twAggSup = as;
15,655,940✔
1681
  pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock;
15,591,502✔
1682
  if (pPhyNode->window.node.pLimit && ((SLimitNode*)pPhyNode->window.node.pLimit)->limit) {
15,678,811✔
1683
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pLimit;
1,037,306✔
1684
    pInfo->limited = true;
1,037,306✔
1685
    pInfo->limit = pLimit->limit->datum.i + (pLimit->offset ? pLimit->offset->datum.i : 0);
1,037,346✔
1686
  }
1687
  if (pPhyNode->window.node.pSlimit && ((SLimitNode*)pPhyNode->window.node.pSlimit)->limit) {
15,640,804✔
1688
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pSlimit;
106,912✔
1689
    pInfo->slimited = true;
106,912✔
1690
    pInfo->slimit = pLimit->limit->datum.i + (pLimit->offset ? pLimit->offset->datum.i : 0);
106,912✔
1691
    pInfo->curGroupId = UINT64_MAX;
106,912✔
1692
  }
1693

1694
  if (pPhyNode->window.pExprs != NULL) {
15,671,656✔
1695
    int32_t    numOfScalar = 0;
6,107,988✔
1696
    SExprInfo* pScalarExprInfo = NULL;
6,108,918✔
1697
    code = createExprInfo(pPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
6,090,840✔
1698
    QUERY_CHECK_CODE(code, lino, _error);
6,115,943✔
1699

1700
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
5,186,982✔
1701
    if (code != TSDB_CODE_SUCCESS) {
5,186,016✔
1702
      goto _error;
×
1703
    }
1704
  }
1705

1706
  code = filterInitFromNode((SNode*)pPhyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
14,745,126✔
1707
                            pTaskInfo->pStreamRuntimeInfo);
14,755,743✔
1708
  if (code != TSDB_CODE_SUCCESS) {
14,709,894✔
1709
    goto _error;
×
1710
  }
1711

1712
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
14,709,894✔
1713
  QUERY_CHECK_CODE(code, lino, _error);
14,683,013✔
1714

1715
  pInfo->timeWindowInterpo = false;
14,683,013✔
1716
  code = timeWindowinterpNeeded(pSup->pCtx, num, pInfo, &pInfo->timeWindowInterpo);
14,687,281✔
1717
  QUERY_CHECK_CODE(code, lino, _error);
14,719,306✔
1718
  if (pInfo->timeWindowInterpo) {
14,719,306✔
1719
    pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
15,748✔
1720
    if (pInfo->binfo.resultRowInfo.openWindow == NULL) {
15,748✔
1721
      goto _error;
×
1722
    }
1723
  }
1724

1725
  pInfo->pOperator = pOperator;
14,719,269✔
1726
  pInfo->cleanGroupResInfo = false;
14,735,824✔
1727
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
14,742,374✔
1728
  setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED,
14,679,814✔
1729
                  pInfo, pTaskInfo);
1730

1731
  pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResultNext, NULL, destroyIntervalOperatorInfo,
14,706,054✔
1732
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1733
  setOperatorResetStateFn(pOperator, resetIntervalOperState);
14,741,906✔
1734
  code = appendDownstream(pOperator, &downstream, 1);
14,727,052✔
1735
  if (code != TSDB_CODE_SUCCESS) {
14,728,025✔
1736
    goto _error;
×
1737
  }
1738

1739
  *pOptrInfo = pOperator;
14,728,025✔
1740
  return TSDB_CODE_SUCCESS;
14,729,420✔
1741

1742
_error:
929,426✔
1743
  if (pInfo != NULL) {
928,961✔
1744
    destroyIntervalOperatorInfo(pInfo);
928,961✔
1745
  }
1746

1747
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
928,961✔
1748
  pTaskInfo->code = code;
928,961✔
1749
  qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code));
928,961✔
1750
  return code;
928,961✔
1751
}
1752

1753
// todo handle multiple timeline cases. assume no timeline interweaving
1754
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) {
9,364,631✔
1755
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
9,364,631✔
1756
  SExprSupp*     pSup = &pOperator->exprSupp;
9,364,631✔
1757

1758
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
9,364,631✔
1759
  if (!pColInfoData) {
9,364,631✔
1760
    pTaskInfo->code = terrno;
×
1761
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1762
  }
1763

1764
  bool    masterScan = true;
9,364,631✔
1765
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
9,364,631✔
1766
  int64_t gid = pBlock->info.id.groupId;
9,364,631✔
1767

1768
  int64_t gap = pInfo->gap;
9,364,631✔
1769

1770
  if (!pInfo->reptScan) {
9,364,631✔
1771
    pInfo->reptScan = true;
2,311,365✔
1772
    pInfo->winSup.prevTs = INT64_MIN;
2,311,365✔
1773
  }
1774

1775
  SWindowRowsSup* pRowSup = &pInfo->winSup;
9,364,631✔
1776
  pRowSup->numOfRows = 0;
9,364,631✔
1777
  pRowSup->startRowIndex = 0;
9,364,631✔
1778

1779
  // In case of ascending or descending order scan data, only one time window needs to be kepted for each table.
1780
  TSKEY* tsList = (TSKEY*)pColInfoData->pData;
9,364,631✔
1781
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
2,019,360,456✔
1782
    if (gid != pRowSup->groupId || pInfo->winSup.prevTs == INT64_MIN) {
2,009,995,825✔
1783
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
7,379,125✔
1784
      doKeepTuple(pRowSup, tsList[j], j, gid);
7,378,665✔
1785
    } else if (((tsList[j] - pRowSup->prevTs >= 0) && (tsList[j] - pRowSup->prevTs <= gap)) ||
2,002,616,700✔
1786
               ((pRowSup->prevTs - tsList[j] >= 0) && (pRowSup->prevTs - tsList[j] <= gap))) {
703,767,601✔
1787
      // The gap is less than the threshold, so it belongs to current session window that has been opened already.
1788
      doKeepTuple(pRowSup, tsList[j], j, gid);
1,298,905,338✔
1789
    } else {  // start a new session window
1790
      // start a new session window
1791
      if (pRowSup->numOfRows > 0) {  // handled data that belongs to the previous session window
703,711,362✔
1792
        SResultRow* pResult = NULL;
703,412,359✔
1793

1794
        // keep the time window for the closed time window.
1795
        STimeWindow window = pRowSup->win;
703,412,359✔
1796

1797
        int32_t ret =
1798
            setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
703,412,359✔
1799
                                   numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1800
        if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
703,412,359✔
1801
          T_LONG_JMP(pTaskInfo->env, ret);
×
1802
        }
1803

1804
        // pInfo->numOfRows data belong to the current session window
1805
        updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
703,412,359✔
1806
        ret =
1807
            applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData,
703,412,359✔
1808
                                            pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
703,412,359✔
1809
        if (ret != TSDB_CODE_SUCCESS) {
703,412,359✔
1810
          T_LONG_JMP(pTaskInfo->env, ret);
×
1811
        }
1812
      }
1813

1814
      // here we start a new session window
1815
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
703,711,362✔
1816
      doKeepTuple(pRowSup, tsList[j], j, gid);
703,711,362✔
1817
    }
1818
  }
1819

1820
  SResultRow* pResult = NULL;
9,364,631✔
1821
  pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
9,364,171✔
1822
  int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
9,364,171✔
1823
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1824
  if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
9,364,631✔
1825
    T_LONG_JMP(pTaskInfo->env, ret);
×
1826
  }
1827

1828
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
9,364,631✔
1829
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
9,364,631✔
1830
                                        pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
9,364,631✔
1831
  if (ret != TSDB_CODE_SUCCESS) {
9,364,631✔
1832
    T_LONG_JMP(pTaskInfo->env, ret);
×
1833
  }
1834
}
9,364,631✔
1835

1836
static int32_t doSessionWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
6,219,770✔
1837
  if (pOperator->status == OP_EXEC_DONE) {
6,219,770✔
1838
    (*ppRes) = NULL;
1,493,458✔
1839
    return TSDB_CODE_SUCCESS;
1,493,458✔
1840
  }
1841

1842
  int32_t                  code = TSDB_CODE_SUCCESS;
4,726,312✔
1843
  int32_t                  lino = 0;
4,726,312✔
1844
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
4,726,312✔
1845
  SSessionAggOperatorInfo* pInfo = pOperator->info;
4,726,312✔
1846
  SOptrBasicInfo*          pBInfo = &pInfo->binfo;
4,726,312✔
1847
  SExprSupp*               pSup = &pOperator->exprSupp;
4,726,312✔
1848

1849
  if (pOperator->status == OP_RES_TO_RETURN) {
4,726,312✔
1850
    while (1) {
×
1851
      doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
1,599,428✔
1852
      code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
1,599,428✔
1853
      QUERY_CHECK_CODE(code, lino, _end);
1,599,428✔
1854

1855
      bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
1,599,428✔
1856
      if (!hasRemain) {
1,599,428✔
1857
        setOperatorCompleted(pOperator);
16,491✔
1858
        break;
16,491✔
1859
      }
1860

1861
      if (pBInfo->pRes->info.rows > 0) {
1,582,937✔
1862
        break;
1,582,937✔
1863
      }
1864
    }
1865
    pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
1,599,428✔
1866
    (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
1,599,428✔
1867
    return code;
1,599,428✔
1868
  }
1869

1870
  int64_t st = taosGetTimestampUs();
3,126,884✔
1871
  int32_t order = pInfo->binfo.inputTsOrder;
3,126,884✔
1872

1873
  SOperatorInfo* downstream = pOperator->pDownstream[0];
3,126,884✔
1874

1875
  pInfo->cleanGroupResInfo = false;
3,126,884✔
1876
  while (1) {
9,364,631✔
1877
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
12,491,515✔
1878
    if (pBlock == NULL) {
12,491,515✔
1879
      break;
3,126,884✔
1880
    }
1881

1882
    pBInfo->pRes->info.scanFlag = pBlock->info.scanFlag;
9,364,631✔
1883
    if (pInfo->scalarSupp.pExprInfo != NULL) {
9,364,631✔
1884
      SExprSupp* pExprSup = &pInfo->scalarSupp;
1,380✔
1885
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
1,380✔
1886
                                   GET_STM_RTINFO(pOperator->pTaskInfo));
1,380✔
1887
      QUERY_CHECK_CODE(code, lino, _end);
1,380✔
1888
    }
1889
    // the pDataBlock are always the same one, no need to call this again
1890
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
9,364,631✔
1891
    QUERY_CHECK_CODE(code, lino, _end);
9,364,631✔
1892

1893
    code = blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
9,364,631✔
1894
    QUERY_CHECK_CODE(code, lino, _end);
9,364,631✔
1895

1896
    doSessionWindowAggImpl(pOperator, pInfo, pBlock);
9,364,631✔
1897
  }
1898

1899
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
3,126,884✔
1900

1901
  // restore the value
1902
  pOperator->status = OP_RES_TO_RETURN;
3,126,884✔
1903

1904
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
3,126,884✔
1905
  QUERY_CHECK_CODE(code, lino, _end);
3,126,884✔
1906
  pInfo->cleanGroupResInfo = true;
3,126,884✔
1907

1908
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
3,126,884✔
1909
  QUERY_CHECK_CODE(code, lino, _end);
3,126,884✔
1910
  while (1) {
×
1911
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
3,126,884✔
1912
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
3,126,884✔
1913
    QUERY_CHECK_CODE(code, lino, _end);
3,126,884✔
1914

1915
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
3,126,884✔
1916
    if (!hasRemain) {
3,126,884✔
1917
      setOperatorCompleted(pOperator);
3,060,552✔
1918
      break;
3,060,552✔
1919
    }
1920

1921
    if (pBInfo->pRes->info.rows > 0) {
66,332✔
1922
      break;
66,332✔
1923
    }
1924
  }
1925
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
3,126,884✔
1926

1927
_end:
3,126,884✔
1928
  if (code != TSDB_CODE_SUCCESS) {
3,126,884✔
1929
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1930
    pTaskInfo->code = code;
×
1931
    T_LONG_JMP(pTaskInfo->env, code);
×
1932
  }
1933
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
3,126,884✔
1934
  return code;
3,126,884✔
1935
}
1936

1937
static int32_t resetStatewindowOperState(SOperatorInfo* pOper) {
42,867✔
1938
  SStateWindowOperatorInfo* pInfo = pOper->info;
42,867✔
1939
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
42,867✔
1940
  SStateWindowPhysiNode* pPhynode = (SStateWindowPhysiNode*)pOper->pPhyNode;
42,867✔
1941
  pOper->status = OP_NOT_OPENED;
42,867✔
1942

1943
  resetBasicOperatorState(&pInfo->binfo);
42,867✔
1944
  cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
42,867✔
1945
                    pInfo->cleanGroupResInfo);
42,867✔
1946

1947
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
42,867✔
1948
  int32_t code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
42,867✔
1949
  if (code == 0) {
42,867✔
1950
    code = resetAggSup(&pOper->exprSupp, &pInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
85,734✔
1951
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
42,867✔
1952
                       &pTaskInfo->storageAPI.functionStore);
1953
  }
1954
  if (code == 0) {
42,867✔
1955
    code = resetExprSupp(&pInfo->scalarSup, pTaskInfo, pPhynode->window.pExprs, NULL,
42,867✔
1956
                         &pTaskInfo->storageAPI.functionStore);
1957
  }
1958

1959
  pInfo->cleanGroupResInfo = false;
42,867✔
1960
  pInfo->hasKey = false;
42,867✔
1961
  pInfo->winSup.lastTs = INT64_MIN;
42,867✔
1962
  cleanupGroupResInfo(&pInfo->groupResInfo);
42,867✔
1963
  memset(pInfo->stateKey.pData, 0, pInfo->stateKey.bytes);
42,867✔
1964
  return code;
42,867✔
1965
}
1966

1967
// todo make this as an non-blocking operator
1968
int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWindowPhysiNode* pStateNode,
2,995,360✔
1969
                                      SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
1970
  QRY_PARAM_CHECK(pOptrInfo);
2,995,360✔
1971

1972
  int32_t                   code = TSDB_CODE_SUCCESS;
2,995,360✔
1973
  int32_t                   lino = 0;
2,995,360✔
1974
  SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
2,995,360✔
1975
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2,995,360✔
1976
  if (pInfo == NULL || pOperator == NULL) {
2,995,360✔
1977
    code = terrno;
×
1978
    goto _error;
×
1979
  }
1980

1981
  pOperator->pPhyNode = pStateNode;
2,995,360✔
1982
  pOperator->exprSupp.hasWindowOrGroup = true;
2,995,360✔
1983
  pOperator->exprSupp.hasWindow = true;
2,995,360✔
1984
  int32_t      tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
2,995,360✔
1985
  SColumnNode* pColNode = (SColumnNode*)(pStateNode->pStateKey);
2,995,360✔
1986

1987
  if (pStateNode->window.pExprs != NULL) {
2,995,360✔
1988
    int32_t    numOfScalarExpr = 0;
2,343,930✔
1989
    SExprInfo* pScalarExprInfo = NULL;
2,343,930✔
1990
    code = createExprInfo(pStateNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
2,343,930✔
1991
    QUERY_CHECK_CODE(code, lino, _error);
2,343,930✔
1992

1993
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
2,343,930✔
1994
    if (code != TSDB_CODE_SUCCESS) {
2,343,930✔
1995
      goto _error;
×
1996
    }
1997
  }
1998

1999
  pInfo->stateCol = extractColumnFromColumnNode(pColNode);
2,995,360✔
2000
  pInfo->stateKey.type = pInfo->stateCol.type;
2,995,360✔
2001
  pInfo->stateKey.bytes = pInfo->stateCol.bytes;
2,995,360✔
2002
  pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes);
2,995,360✔
2003
  if (pInfo->stateKey.pData == NULL) {
2,995,360✔
2004
    goto _error;
×
2005
  }
2006
  pInfo->binfo.inputTsOrder = pStateNode->window.node.inputTsOrder;
2,995,360✔
2007
  pInfo->binfo.outputTsOrder = pStateNode->window.node.outputTsOrder;
2,995,360✔
2008

2009
  code = filterInitFromNode((SNode*)pStateNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
2,995,360✔
2010
                            pTaskInfo->pStreamRuntimeInfo);
2,995,360✔
2011
  if (code != TSDB_CODE_SUCCESS) {
2,995,360✔
2012
    goto _error;
×
2013
  }
2014

2015
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
2,995,360✔
2016

2017
  int32_t    num = 0;
2,995,360✔
2018
  SExprInfo* pExprInfo = NULL;
2,995,360✔
2019
  code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &num);
2,995,360✔
2020
  QUERY_CHECK_CODE(code, lino, _error);
2,995,360✔
2021

2022
  initResultSizeInfo(&pOperator->resultInfo, 4096);
2,995,360✔
2023

2024
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
5,990,720✔
2025
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
2,995,360✔
2026
  if (code != TSDB_CODE_SUCCESS) {
2,995,360✔
2027
    goto _error;
×
2028
  }
2029

2030
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pStateNode->window.node.pOutputDataBlockDesc);
2,995,360✔
2031
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
2,995,360✔
2032
  initBasicInfo(&pInfo->binfo, pResBlock);
2,995,360✔
2033
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
2,995,360✔
2034

2035
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
2,995,360✔
2036
  QUERY_CHECK_CODE(code, lino, _error);
2,995,360✔
2037

2038
  pInfo->tsSlotId = tsSlotId;
2,995,360✔
2039
  pInfo->pOperator = pOperator;
2,995,360✔
2040
  pInfo->cleanGroupResInfo = false;
2,995,360✔
2041
  pInfo->extendOption = pStateNode->extendOption;
2,995,360✔
2042
  pInfo->trueForLimit = pStateNode->trueForLimit;
2,995,360✔
2043
  pInfo->winSup.lastTs = INT64_MIN;
2,995,360✔
2044

2045
  setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
2,995,360✔
2046
                  pTaskInfo);
2047
  pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAggNext, NULL, destroyStateWindowOperatorInfo,
2,995,360✔
2048
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2049
  setOperatorResetStateFn(pOperator, resetStatewindowOperState);
2,995,360✔
2050

2051
  code = appendDownstream(pOperator, &downstream, 1);
2,995,360✔
2052
  if (code != TSDB_CODE_SUCCESS) {
2,995,360✔
2053
    goto _error;
×
2054
  }
2055

2056
  *pOptrInfo = pOperator;
2,995,360✔
2057
  return TSDB_CODE_SUCCESS;
2,995,360✔
2058

2059
_error:
×
2060
  if (pInfo != NULL) {
×
2061
    destroyStateWindowOperatorInfo(pInfo);
×
2062
  }
2063

2064
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2065
  pTaskInfo->code = code;
×
2066
  return code;
×
2067
}
2068

2069
void destroySWindowOperatorInfo(void* param) {
3,227,000✔
2070
  SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param;
3,227,000✔
2071
  if (pInfo == NULL) {
3,227,000✔
2072
    return;
×
2073
  }
2074

2075
  cleanupBasicInfo(&pInfo->binfo);
3,227,000✔
2076
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
3,227,000✔
2077
  if (pInfo->pOperator) {
3,227,000✔
2078
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
3,227,000✔
2079
                      pInfo->cleanGroupResInfo);
3,227,000✔
2080
    pInfo->pOperator = NULL;
3,227,000✔
2081
  }
2082

2083
  cleanupAggSup(&pInfo->aggSup);
3,227,000✔
2084
  cleanupExprSupp(&pInfo->scalarSupp);
3,227,000✔
2085

2086
  cleanupGroupResInfo(&pInfo->groupResInfo);
3,227,000✔
2087
  taosMemoryFreeClear(param);
3,227,000✔
2088
}
2089

2090
static int32_t resetSessionAggOperState(SOperatorInfo* pOper) {
1,464✔
2091
  SSessionAggOperatorInfo* pInfo = pOper->info;
1,464✔
2092
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
1,464✔
2093
  SSessionWinodwPhysiNode* pPhynode = (SSessionWinodwPhysiNode*)pOper->pPhyNode;
1,464✔
2094
  pOper->status = OP_NOT_OPENED;
1,464✔
2095

2096
  resetBasicOperatorState(&pInfo->binfo);
1,464✔
2097
  cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
1,464✔
2098
                    pInfo->cleanGroupResInfo);
1,464✔
2099

2100
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
1,464✔
2101
  int32_t code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
1,464✔
2102
  if (code == 0) {
1,464✔
2103
    code = resetAggSup(&pOper->exprSupp, &pInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
2,928✔
2104
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
1,464✔
2105
                       &pTaskInfo->storageAPI.functionStore);
2106
  }
2107
  if (code == 0) {
1,464✔
2108
    code = resetExprSupp(&pInfo->scalarSupp, pTaskInfo, pPhynode->window.pExprs, NULL,
1,464✔
2109
                         &pTaskInfo->storageAPI.functionStore);
2110
  }
2111

2112
  pInfo->cleanGroupResInfo = false;
1,464✔
2113
  pInfo->winSup = (SWindowRowsSup){0};
1,464✔
2114
  pInfo->winSup.prevTs = INT64_MIN;
1,464✔
2115
  pInfo->reptScan = false;
1,464✔
2116

2117
  cleanupGroupResInfo(&pInfo->groupResInfo);
1,464✔
2118
  return code;
1,464✔
2119
}
2120

2121
int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
3,226,540✔
2122
                                     SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2123
  QRY_PARAM_CHECK(pOptrInfo);
3,226,540✔
2124

2125
  int32_t                  code = TSDB_CODE_SUCCESS;
3,226,540✔
2126
  int32_t                  lino = 0;
3,226,540✔
2127
  SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
3,226,540✔
2128
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
3,226,540✔
2129
  if (pInfo == NULL || pOperator == NULL) {
3,226,540✔
2130
    code = terrno;
375✔
2131
    goto _error;
×
2132
  }
2133

2134
  pOperator->pPhyNode = pSessionNode;
3,226,165✔
2135
  pOperator->exprSupp.hasWindowOrGroup = true;
3,226,165✔
2136
  pOperator->exprSupp.hasWindow = true;
3,226,165✔
2137

2138
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
3,226,165✔
2139
  initResultSizeInfo(&pOperator->resultInfo, 4096);
3,226,165✔
2140

2141
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc);
3,227,000✔
2142
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
3,226,540✔
2143
  initBasicInfo(&pInfo->binfo, pResBlock);
3,226,540✔
2144

2145
  int32_t    numOfCols = 0;
3,227,000✔
2146
  SExprInfo* pExprInfo = NULL;
3,227,000✔
2147
  code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
3,227,000✔
2148
  QUERY_CHECK_CODE(code, lino, _error);
3,227,000✔
2149

2150
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
6,454,000✔
2151
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
3,227,000✔
2152
  QUERY_CHECK_CODE(code, lino, _error);
3,226,540✔
2153

2154
  pInfo->gap = pSessionNode->gap;
3,226,540✔
2155

2156
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
3,226,540✔
2157
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
3,226,540✔
2158
  QUERY_CHECK_CODE(code, lino, _error);
3,226,540✔
2159

2160
  pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
3,226,540✔
2161
  pInfo->binfo.pRes = pResBlock;
3,227,000✔
2162
  pInfo->winSup.prevTs = INT64_MIN;
3,226,540✔
2163
  pInfo->reptScan = false;
3,226,540✔
2164
  pInfo->binfo.inputTsOrder = pSessionNode->window.node.inputTsOrder;
3,226,540✔
2165
  pInfo->binfo.outputTsOrder = pSessionNode->window.node.outputTsOrder;
3,227,000✔
2166

2167
  if (pSessionNode->window.pExprs != NULL) {
3,227,000✔
2168
    int32_t    numOfScalar = 0;
460✔
2169
    SExprInfo* pScalarExprInfo = NULL;
460✔
2170
    code = createExprInfo(pSessionNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
460✔
2171
    QUERY_CHECK_CODE(code, lino, _error);
460✔
2172

2173
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
460✔
2174
    QUERY_CHECK_CODE(code, lino, _error);
460✔
2175
  }
2176

2177
  code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
3,227,000✔
2178
                            pTaskInfo->pStreamRuntimeInfo);
3,226,540✔
2179
  QUERY_CHECK_CODE(code, lino, _error);
3,226,540✔
2180

2181
  pInfo->pOperator = pOperator;
3,226,540✔
2182
  pInfo->cleanGroupResInfo = false;
3,227,000✔
2183
  setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED,
3,227,000✔
2184
                  pInfo, pTaskInfo);
2185
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSessionWindowAggNext, NULL, destroySWindowOperatorInfo,
3,227,000✔
2186
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2187
  pOperator->pTaskInfo = pTaskInfo;
3,226,540✔
2188
  setOperatorResetStateFn(pOperator, resetSessionAggOperState);
3,226,540✔
2189

2190
  code = appendDownstream(pOperator, &downstream, 1);
3,226,540✔
2191
  QUERY_CHECK_CODE(code, lino, _error);
3,227,000✔
2192

2193
  *pOptrInfo = pOperator;
3,227,000✔
2194
  return TSDB_CODE_SUCCESS;
3,227,000✔
2195

2196
_error:
×
2197
  if (pInfo != NULL) destroySWindowOperatorInfo(pInfo);
×
2198
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2199
  pTaskInfo->code = code;
×
2200
  return code;
×
2201
}
2202

2203
void destroyMAIOperatorInfo(void* param) {
1,638,597✔
2204
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = (SMergeAlignedIntervalAggOperatorInfo*)param;
1,638,597✔
2205
  destroyIntervalOperatorInfo(miaInfo->intervalAggOperatorInfo);
1,638,597✔
2206
  taosMemoryFreeClear(param);
1,638,597✔
2207
}
1,638,597✔
2208

2209
static SResultRow* doSetSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, SAggSupporter* pSup) {
1,545,253✔
2210
  SResultRow* pResult = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
1,545,253✔
2211
  if (NULL == pResult) {
1,545,253✔
2212
    return pResult;
×
2213
  }
2214
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
1,545,253✔
2215
  return pResult;
1,545,253✔
2216
}
2217

2218
static int32_t setSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult,
900,666,944✔
2219
                                       SExprSupp* pExprSup, SAggSupporter* pAggSup) {
2220
  if (*pResult == NULL) {
900,666,944✔
2221
    *pResult = doSetSingleOutputTupleBuf(pResultRowInfo, pAggSup);
1,545,253✔
2222
    if (*pResult == NULL) {
1,545,253✔
2223
      return terrno;
×
2224
    }
2225
  }
2226

2227
  // set time window for current result
2228
  (*pResult)->win = (*win);
900,670,346✔
2229
  return setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
900,670,724✔
2230
}
2231

2232
static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,
5,211,390✔
2233
                                          SSDataBlock* pBlock, SSDataBlock* pResultBlock) {
2234
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
5,211,390✔
2235
  SIntervalAggOperatorInfo*             iaInfo = miaInfo->intervalAggOperatorInfo;
5,211,390✔
2236

2237
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
5,211,390✔
2238
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
5,211,390✔
2239
  SInterval*     pInterval = &iaInfo->interval;
5,211,390✔
2240

2241
  int32_t  startPos = 0;
5,211,390✔
2242
  int64_t* tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
5,211,390✔
2243

2244
  TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
5,211,390✔
2245

2246
  // there is an result exists
2247
  if (miaInfo->curTs != INT64_MIN) {
5,211,390✔
2248
    if (ts != miaInfo->curTs) {
1,233,441✔
2249
      finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
1,173,817✔
2250
      resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
1,173,817✔
2251
      miaInfo->curTs = ts;
1,173,817✔
2252
    }
2253
  } else {
2254
    miaInfo->curTs = ts;
3,977,949✔
2255
  }
2256

2257
  STimeWindow win = {0};
5,211,390✔
2258
  win.skey = miaInfo->curTs;
5,211,390✔
2259
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
5,211,390✔
2260

2261
  int32_t ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
5,211,390✔
2262
  if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
5,211,390✔
2263
    T_LONG_JMP(pTaskInfo->env, ret);
×
2264
  }
2265

2266
  int32_t currPos = startPos;
5,211,390✔
2267

2268
  STimeWindow currWin = win;
5,211,390✔
2269
  while (++currPos < pBlock->info.rows) {
1,902,007,420✔
2270
    if (tsCols[currPos] == miaInfo->curTs) {
1,896,736,306✔
2271
      continue;
1,001,342,744✔
2272
    }
2273

2274
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
895,438,544✔
2275
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
1,790,899,768✔
2276
                                          currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
895,450,262✔
2277
    if (ret != TSDB_CODE_SUCCESS) {
895,440,056✔
2278
      T_LONG_JMP(pTaskInfo->env, ret);
×
2279
    }
2280

2281
    finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
895,440,056✔
2282
    resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
895,435,898✔
2283
    miaInfo->curTs = tsCols[currPos];
895,452,908✔
2284

2285
    currWin.skey = miaInfo->curTs;
895,458,200✔
2286
    currWin.ekey =
895,457,822✔
2287
        taosTimeAdd(currWin.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
895,457,066✔
2288

2289
    startPos = currPos;
895,457,822✔
2290
    ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
895,457,822✔
2291
    if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
895,453,286✔
UNCOV
2292
      T_LONG_JMP(pTaskInfo->env, ret);
×
2293
    }
2294

2295
    miaInfo->curTs = currWin.skey;
895,453,664✔
2296
  }
2297

2298
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
5,211,390✔
2299
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
10,422,780✔
2300
                                        currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
5,211,390✔
2301
  if (ret != TSDB_CODE_SUCCESS) {
5,211,390✔
2302
    T_LONG_JMP(pTaskInfo->env, ret);
×
2303
  }
2304
}
5,211,390✔
2305

2306
static void cleanupAfterGroupResultGen(SMergeAlignedIntervalAggOperatorInfo* pMiaInfo, SSDataBlock* pRes) {
3,969,201✔
2307
  pRes->info.id.groupId = pMiaInfo->groupId;
3,969,201✔
2308
  pMiaInfo->curTs = INT64_MIN;
3,969,201✔
2309
  pMiaInfo->groupId = 0;
3,969,201✔
2310
}
3,969,201✔
2311

2312
static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
4,847,765✔
2313
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
4,847,765✔
2314
  int32_t                               code = TSDB_CODE_SUCCESS;
4,847,765✔
2315
  int32_t                               lino = 0;
4,847,765✔
2316
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
4,847,765✔
2317
  SIntervalAggOperatorInfo*             pIaInfo = pMiaInfo->intervalAggOperatorInfo;
4,847,765✔
2318

2319
  SExprSupp*      pSup = &pOperator->exprSupp;
4,847,765✔
2320
  SSDataBlock*    pRes = pIaInfo->binfo.pRes;
4,847,765✔
2321
  SResultRowInfo* pResultRowInfo = &pIaInfo->binfo.resultRowInfo;
4,847,765✔
2322
  SOperatorInfo*  downstream = pOperator->pDownstream[0];
4,847,765✔
2323

2324
  while (1) {
4,396,884✔
2325
    SSDataBlock* pBlock = NULL;
9,244,649✔
2326
    if (pMiaInfo->prefetchedBlock == NULL) {
9,244,649✔
2327
      pBlock = getNextBlockFromDownstream(pOperator, 0);
6,811,953✔
2328
    } else {
2329
      pBlock = pMiaInfo->prefetchedBlock;
2,432,696✔
2330
      pMiaInfo->prefetchedBlock = NULL;
2,432,696✔
2331

2332
      pMiaInfo->groupId = pBlock->info.id.groupId;
2,432,696✔
2333
    }
2334

2335
    // no data exists, all query processing is done
2336
    if (pBlock == NULL) {
9,244,649✔
2337
      // close last unclosed time window
2338
      if (pMiaInfo->curTs != INT64_MIN) {
1,600,563✔
2339
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
1,536,505✔
2340
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
1,536,505✔
2341
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
1,536,505✔
2342
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
1,536,505✔
2343
        QUERY_CHECK_CODE(code, lino, _end);
1,536,505✔
2344
      }
2345

2346
      setOperatorCompleted(pOperator);
1,600,563✔
2347
      break;
1,600,563✔
2348
    }
2349

2350
    if (pMiaInfo->groupId == 0) {
7,644,086✔
2351
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
2,175,474✔
2352
        pMiaInfo->groupId = pBlock->info.id.groupId;
279,690✔
2353
        pRes->info.id.groupId = pMiaInfo->groupId;
279,690✔
2354
      }
2355
    } else {
2356
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
5,468,612✔
2357
        // if there are unclosed time window, close it firstly.
2358
        if (pMiaInfo->curTs == INT64_MIN) {
2,432,696✔
2359
          pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2360
          T_LONG_JMP(pTaskInfo->env, terrno);
×
2361
        }
2362
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
2,432,696✔
2363
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
2,432,696✔
2364

2365
        pMiaInfo->prefetchedBlock = pBlock;
2,432,696✔
2366
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
2,432,696✔
2367
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
2,432,696✔
2368
        QUERY_CHECK_CODE(code, lino, _end);
2,432,696✔
2369
        if (pRes->info.rows == 0) {
2,432,696✔
2370
          // After filtering for last group, the result is empty, so we need to continue to process next group
2371
          continue;
12,852✔
2372
        } else {
2373
          break;
2,419,844✔
2374
        }
2375
      } else {
2376
        // continue
2377
        pRes->info.id.groupId = pMiaInfo->groupId;
3,035,916✔
2378
      }
2379
    }
2380

2381
    pRes->info.scanFlag = pBlock->info.scanFlag;
5,211,390✔
2382
    code = setInputDataBlock(pSup, pBlock, pIaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
5,211,390✔
2383
    QUERY_CHECK_CODE(code, lino, _end);
5,211,390✔
2384

2385
    doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes);
5,211,390✔
2386

2387
    code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
5,211,390✔
2388
    QUERY_CHECK_CODE(code, lino, _end);
5,211,390✔
2389

2390
    if (pRes->info.rows >= pOperator->resultInfo.capacity) {
5,211,390✔
2391
      break;
827,358✔
2392
    }
2393
  }
2394

2395
_end:
4,847,765✔
2396
  if (code != TSDB_CODE_SUCCESS) {
4,847,765✔
2397
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2398
    pTaskInfo->code = code;
×
2399
    T_LONG_JMP(pTaskInfo->env, code);
×
2400
  }
2401
}
4,847,765✔
2402

2403
static int32_t mergeAlignedIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
5,225,844✔
2404
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
5,225,844✔
2405
  int32_t                               code = TSDB_CODE_SUCCESS;
5,225,844✔
2406
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
5,225,844✔
2407
  SIntervalAggOperatorInfo*             iaInfo = pMiaInfo->intervalAggOperatorInfo;
5,225,844✔
2408
  if (pOperator->status == OP_EXEC_DONE) {
5,225,844✔
2409
    (*ppRes) = NULL;
1,535,912✔
2410
    return code;
1,535,912✔
2411
  }
2412

2413
  SSDataBlock* pRes = iaInfo->binfo.pRes;
3,689,932✔
2414
  blockDataCleanup(pRes);
3,689,932✔
2415

2416
  if (iaInfo->binfo.mergeResultBlock) {
3,689,932✔
2417
    while (1) {
2418
      if (pOperator->status == OP_EXEC_DONE) {
5,308,381✔
2419
        break;
1,259,718✔
2420
      }
2421

2422
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
4,048,663✔
2423
        break;
815,556✔
2424
      }
2425

2426
      doMergeAlignedIntervalAgg(pOperator);
3,233,107✔
2427
    }
2428
  } else {
2429
    doMergeAlignedIntervalAgg(pOperator);
1,614,658✔
2430
  }
2431

2432
  size_t rows = pRes->info.rows;
3,689,932✔
2433
  pOperator->resultInfo.totalRows += rows;
3,689,932✔
2434
  (*ppRes) = (rows == 0) ? NULL : pRes;
3,689,932✔
2435
  return code;
3,689,932✔
2436
}
2437

2438
static int32_t resetMergeAlignedIntervalOperState(SOperatorInfo* pOper) {
×
2439
  SMergeAlignedIntervalAggOperatorInfo* pInfo = pOper->info;
×
2440
  
2441
  uint64_t     groupId;  // current groupId
2442
  int64_t      curTs;    // current ts
2443
  SSDataBlock* prefetchedBlock;
2444
  SResultRow*  pResultRow;
2445

2446
  pInfo->groupId = 0;
×
2447
  pInfo->curTs = INT64_MIN;
×
2448
  pInfo->prefetchedBlock = NULL;
×
2449
  pInfo->pResultRow = NULL;
×
2450

2451
  return resetInterval(pOper, pInfo->intervalAggOperatorInfo);
×
2452
}
2453

2454
int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
1,638,597✔
2455
                                               SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2456
  QRY_PARAM_CHECK(pOptrInfo);
1,638,597✔
2457

2458
  int32_t                               code = TSDB_CODE_SUCCESS;
1,638,597✔
2459
  int32_t                               lino = 0;
1,638,597✔
2460
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedIntervalAggOperatorInfo));
1,638,597✔
2461
  SOperatorInfo*                        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1,638,597✔
2462
  if (miaInfo == NULL || pOperator == NULL) {
1,638,597✔
2463
    code = terrno;
×
2464
    goto _error;
×
2465
  }
2466

2467
  pOperator->pPhyNode = pNode;
1,638,597✔
2468
  miaInfo->intervalAggOperatorInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
1,638,597✔
2469
  if (miaInfo->intervalAggOperatorInfo == NULL) {
1,638,597✔
2470
    code = terrno;
×
2471
    goto _error;
×
2472
  }
2473

2474
  SInterval interval = {.interval = pNode->interval,
4,915,791✔
2475
                        .sliding = pNode->sliding,
1,638,597✔
2476
                        .intervalUnit = pNode->intervalUnit,
1,638,597✔
2477
                        .slidingUnit = pNode->slidingUnit,
1,638,597✔
2478
                        .offset = pNode->offset,
1,638,597✔
2479
                        .precision = ((SColumnNode*)pNode->window.pTspk)->node.resType.precision,
1,638,597✔
2480
                        .timeRange = pNode->timeRange};
2481
  calcIntervalAutoOffset(&interval);
1,638,597✔
2482

2483
  SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
1,638,597✔
2484
  SExprSupp*                pSup = &pOperator->exprSupp;
1,638,597✔
2485
  pSup->hasWindowOrGroup = true;
1,638,597✔
2486
  pSup->hasWindow = true;
1,638,597✔
2487

2488
  code = filterInitFromNode((SNode*)pNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
1,638,597✔
2489
                            pTaskInfo->pStreamRuntimeInfo);
1,638,597✔
2490
  QUERY_CHECK_CODE(code, lino, _error);
1,638,597✔
2491

2492
  miaInfo->curTs = INT64_MIN;
1,638,597✔
2493
  iaInfo->win = pTaskInfo->window;
1,638,597✔
2494
  iaInfo->binfo.inputTsOrder = pNode->window.node.inputTsOrder;
1,638,597✔
2495
  iaInfo->binfo.outputTsOrder = pNode->window.node.outputTsOrder;
1,638,597✔
2496
  iaInfo->interval = interval;
1,638,597✔
2497
  iaInfo->primaryTsIndex = ((SColumnNode*)pNode->window.pTspk)->slotId;
1,638,597✔
2498
  iaInfo->binfo.mergeResultBlock = pNode->window.mergeDataBlock;
1,638,597✔
2499

2500
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
1,638,597✔
2501
  initResultSizeInfo(&pOperator->resultInfo, 512);
1,638,597✔
2502

2503
  int32_t    num = 0;
1,638,597✔
2504
  SExprInfo* pExprInfo = NULL;
1,638,597✔
2505
  code = createExprInfo(pNode->window.pFuncs, NULL, &pExprInfo, &num);
1,638,597✔
2506
  QUERY_CHECK_CODE(code, lino, _error);
1,638,597✔
2507

2508
  code = initAggSup(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
3,277,194✔
2509
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
1,638,597✔
2510
  QUERY_CHECK_CODE(code, lino, _error);
1,638,597✔
2511

2512
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pNode->window.node.pOutputDataBlockDesc);
1,638,597✔
2513
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
1,638,597✔
2514
  initBasicInfo(&iaInfo->binfo, pResBlock);
1,638,597✔
2515
  code = initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
1,638,597✔
2516
  QUERY_CHECK_CODE(code, lino, _error);
1,638,597✔
2517

2518
  iaInfo->timeWindowInterpo = false;
1,638,597✔
2519
  code = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo, &iaInfo->timeWindowInterpo);
1,638,597✔
2520
  QUERY_CHECK_CODE(code, lino, _error);
1,638,597✔
2521
  if (iaInfo->timeWindowInterpo) {
1,638,597✔
2522
    iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
2523
  }
2524

2525
  initResultRowInfo(&iaInfo->binfo.resultRowInfo);
1,638,597✔
2526
  code = blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity);
1,638,597✔
2527
  QUERY_CHECK_CODE(code, lino, _error);
1,638,597✔
2528
  iaInfo->pOperator = pOperator;
1,638,597✔
2529
  setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL,
1,638,597✔
2530
                  false, OP_NOT_OPENED, miaInfo, pTaskInfo);
2531

2532
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAggNext, NULL, destroyMAIOperatorInfo,
1,638,597✔
2533
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2534
  setOperatorResetStateFn(pOperator, resetMergeAlignedIntervalOperState);
1,638,597✔
2535

2536
  code = appendDownstream(pOperator, &downstream, 1);
1,638,597✔
2537
  QUERY_CHECK_CODE(code, lino, _error);
1,638,597✔
2538

2539
  *pOptrInfo = pOperator;
1,638,597✔
2540
  return TSDB_CODE_SUCCESS;
1,638,597✔
2541

2542
_error:
×
2543
  if (miaInfo != NULL) destroyMAIOperatorInfo(miaInfo);
×
2544
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2545
  pTaskInfo->code = code;
×
2546
  return code;
×
2547
}
2548

2549
//=====================================================================================================================
2550
// merge interval operator
2551
typedef struct SMergeIntervalAggOperatorInfo {
2552
  SIntervalAggOperatorInfo intervalAggOperatorInfo;
2553
  SList*                   groupIntervals;
2554
  SListIter                groupIntervalsIter;
2555
  bool                     hasGroupId;
2556
  uint64_t                 groupId;
2557
  SSDataBlock*             prefetchedBlock;
2558
  bool                     inputBlocksFinished;
2559
} SMergeIntervalAggOperatorInfo;
2560

2561
typedef struct SGroupTimeWindow {
2562
  uint64_t    groupId;
2563
  STimeWindow window;
2564
} SGroupTimeWindow;
2565

2566
void destroyMergeIntervalOperatorInfo(void* param) {
×
2567
  SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
×
2568
  miaInfo->groupIntervals = tdListFree(miaInfo->groupIntervals);
×
2569
  destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo);
×
2570

2571
  taosMemoryFreeClear(param);
×
2572
}
×
2573

2574
static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock,
×
2575
                                        STimeWindow* newWin) {
2576
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
2577
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2578
  bool                           ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
2579

2580
  SGroupTimeWindow groupTimeWindow = {.groupId = tableGroupId, .window = *newWin};
×
2581
  int32_t          code = tdListAppend(miaInfo->groupIntervals, &groupTimeWindow);
×
2582
  if (code != TSDB_CODE_SUCCESS) {
×
2583
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2584
    return code;
×
2585
  }
2586

2587
  SListIter iter = {0};
×
2588
  tdListInitIter(miaInfo->groupIntervals, &iter, TD_LIST_FORWARD);
×
2589
  SListNode* listNode = NULL;
×
2590
  while ((listNode = tdListNext(&iter)) != NULL) {
×
2591
    SGroupTimeWindow* prevGrpWin = (SGroupTimeWindow*)listNode->data;
×
2592
    if (prevGrpWin->groupId != tableGroupId) {
×
2593
      continue;
×
2594
    }
2595

2596
    STimeWindow* prevWin = &prevGrpWin->window;
×
2597
    if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) {
×
2598
      SListNode* tmp = tdListPopNode(miaInfo->groupIntervals, listNode);
×
2599
      taosMemoryFreeClear(tmp);
×
2600
    }
2601
  }
2602

2603
  return TSDB_CODE_SUCCESS;
×
2604
}
2605

2606
static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
×
2607
                                   int32_t scanFlag, SSDataBlock* pResultBlock) {
2608
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
2609
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2610

2611
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
×
2612
  SExprSupp*     pExprSup = &pOperatorInfo->exprSupp;
×
2613

2614
  int32_t     startPos = 0;
×
2615
  int32_t     numOfOutput = pExprSup->numOfExprs;
×
2616
  int64_t*    tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
×
2617
  uint64_t    tableGroupId = pBlock->info.id.groupId;
×
2618
  bool        ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
2619
  TSKEY       blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
×
2620
  SResultRow* pResult = NULL;
×
2621

2622
  STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
×
2623
                                        iaInfo->binfo.inputTsOrder);
2624

2625
  int32_t ret =
2626
      setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
2627
                             numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2628
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
×
2629
    T_LONG_JMP(pTaskInfo->env, ret);
×
2630
  }
2631

2632
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
×
2633
  int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
2634
                                                 iaInfo->binfo.inputTsOrder);
2635
  if (forwardRows <= 0) {
×
2636
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2637
  }
2638

2639
  // prev time window not interpolation yet.
2640
  if (iaInfo->timeWindowInterpo) {
×
2641
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
×
2642
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
×
2643

2644
    // restore current time window
2645
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
2646
                                 numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2647
    if (ret != TSDB_CODE_SUCCESS) {
×
2648
      T_LONG_JMP(pTaskInfo->env, ret);
×
2649
    }
2650

2651
    // window start key interpolation
2652
    ret = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &win, startPos, forwardRows, pExprSup);
×
2653
    if (ret != TSDB_CODE_SUCCESS) {
×
2654
      T_LONG_JMP(pTaskInfo->env, ret);
×
2655
    }
2656
  }
2657

2658
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, 1);
×
2659
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
×
2660
                                        forwardRows, pBlock->info.rows, numOfOutput);
×
2661
  if (ret != TSDB_CODE_SUCCESS) {
×
2662
    T_LONG_JMP(pTaskInfo->env, ret);
×
2663
  }
2664
  doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
2665

2666
  // output previous interval results after this interval (&win) is closed
2667
  int32_t code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &win);
×
2668
  if (code != TSDB_CODE_SUCCESS) {
×
2669
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2670
    T_LONG_JMP(pTaskInfo->env, code);
×
2671
  }
2672

2673
  STimeWindow nextWin = win;
×
2674
  while (1) {
×
2675
    int32_t prevEndPos = forwardRows - 1 + startPos;
×
2676
    startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos,
×
2677
                                      iaInfo->binfo.inputTsOrder);
2678
    if (startPos < 0) {
×
2679
      break;
×
2680
    }
2681

2682
    // null data, failed to allocate more memory buffer
2683
    code =
2684
        setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
×
2685
                               pExprSup->pCtx, numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2686
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
×
2687
      T_LONG_JMP(pTaskInfo->env, code);
×
2688
    }
2689

2690
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
×
2691
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
2692
                                           iaInfo->binfo.inputTsOrder);
2693

2694
    // window start(end) key interpolation
2695
    code = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup);
×
2696
    if (code != TSDB_CODE_SUCCESS) {
×
2697
      T_LONG_JMP(pTaskInfo->env, code);
×
2698
    }
2699

2700
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, 1);
×
2701
    code = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
×
2702
                                           forwardRows, pBlock->info.rows, numOfOutput);
×
2703
    if (code != TSDB_CODE_SUCCESS) {
×
2704
      T_LONG_JMP(pTaskInfo->env, code);
×
2705
    }
2706
    doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
2707

2708
    // output previous interval results after this interval (&nextWin) is closed
2709
    code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &nextWin);
×
2710
    if (code != TSDB_CODE_SUCCESS) {
×
2711
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2712
      T_LONG_JMP(pTaskInfo->env, code);
×
2713
    }
2714
  }
2715

2716
  if (iaInfo->timeWindowInterpo) {
×
2717
    saveDataBlockLastRow(iaInfo->pPrevValues, pBlock, iaInfo->pInterpCols);
×
2718
  }
2719
}
×
2720

2721
static int32_t doMergeIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
2722
  int32_t        code = TSDB_CODE_SUCCESS;
×
2723
  int32_t        lino = 0;
×
2724
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
×
2725

2726
  SMergeIntervalAggOperatorInfo* miaInfo = pOperator->info;
×
2727
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2728
  SExprSupp*                     pExpSupp = &pOperator->exprSupp;
×
2729

2730
  if (pOperator->status == OP_EXEC_DONE) {
×
2731
    (*ppRes) = NULL;
×
2732
    return code;
×
2733
  }
2734

2735
  SSDataBlock* pRes = iaInfo->binfo.pRes;
×
2736
  blockDataCleanup(pRes);
×
2737
  code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
×
2738
  QUERY_CHECK_CODE(code, lino, _end);
×
2739

2740
  if (!miaInfo->inputBlocksFinished) {
×
2741
    SOperatorInfo* downstream = pOperator->pDownstream[0];
×
2742
    while (1) {
×
2743
      SSDataBlock* pBlock = NULL;
×
2744
      if (miaInfo->prefetchedBlock == NULL) {
×
2745
        pBlock = getNextBlockFromDownstream(pOperator, 0);
×
2746
      } else {
2747
        pBlock = miaInfo->prefetchedBlock;
×
2748
        miaInfo->groupId = pBlock->info.id.groupId;
×
2749
        miaInfo->prefetchedBlock = NULL;
×
2750
      }
2751

2752
      if (pBlock == NULL) {
×
2753
        tdListInitIter(miaInfo->groupIntervals, &miaInfo->groupIntervalsIter, TD_LIST_FORWARD);
×
2754
        miaInfo->inputBlocksFinished = true;
×
2755
        break;
×
2756
      }
2757

2758
      if (!miaInfo->hasGroupId) {
×
2759
        miaInfo->hasGroupId = true;
×
2760
        miaInfo->groupId = pBlock->info.id.groupId;
×
2761
      } else if (miaInfo->groupId != pBlock->info.id.groupId) {
×
2762
        miaInfo->prefetchedBlock = pBlock;
×
2763
        break;
×
2764
      }
2765

2766
      pRes->info.scanFlag = pBlock->info.scanFlag;
×
2767
      code = setInputDataBlock(pExpSupp, pBlock, iaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
×
2768
      QUERY_CHECK_CODE(code, lino, _end);
×
2769

2770
      doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, pBlock->info.scanFlag, pRes);
×
2771

2772
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
×
2773
        break;
×
2774
      }
2775
    }
2776

2777
    pRes->info.id.groupId = miaInfo->groupId;
×
2778
  }
2779

2780
  if (miaInfo->inputBlocksFinished) {
×
2781
    SListNode* listNode = tdListNext(&miaInfo->groupIntervalsIter);
×
2782

2783
    if (listNode != NULL) {
×
2784
      SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data);
×
2785
      pRes->info.id.groupId = grpWin->groupId;
×
2786
    }
2787
  }
2788

2789
  if (pRes->info.rows == 0) {
×
2790
    setOperatorCompleted(pOperator);
×
2791
  }
2792

2793
  size_t rows = pRes->info.rows;
×
2794
  pOperator->resultInfo.totalRows += rows;
×
2795

2796
_end:
×
2797
  if (code != TSDB_CODE_SUCCESS) {
×
2798
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2799
    pTaskInfo->code = code;
×
2800
    T_LONG_JMP(pTaskInfo->env, code);
×
2801
  }
2802
  (*ppRes) = (rows == 0) ? NULL : pRes;
×
2803
  return code;
×
2804
}
2805

2806
static int32_t resetMergeIntervalOperState(SOperatorInfo* pOper) {
×
2807
  SMergeIntervalAggOperatorInfo* pInfo = pOper->info;
×
2808

2809
  pInfo->hasGroupId = false;
×
2810
  pInfo->groupId = 0;
×
2811
  pInfo->prefetchedBlock = NULL;
×
2812
  pInfo->inputBlocksFinished = false;
×
2813
  tdListEmpty(pInfo->groupIntervals);
×
2814
  
2815
  SIntervalAggOperatorInfo* pIntervalInfo = &pInfo->intervalAggOperatorInfo;
×
2816
  return resetInterval(pOper, pIntervalInfo);
×
2817
}
2818

2819
int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
×
2820
                                        SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2821
  QRY_PARAM_CHECK(pOptrInfo);
×
2822

2823
  int32_t                        code = TSDB_CODE_SUCCESS;
×
2824
  int32_t                        lino = 0;
×
2825
  SMergeIntervalAggOperatorInfo* pMergeIntervalInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo));
×
2826
  SOperatorInfo*                 pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
2827
  if (pMergeIntervalInfo == NULL || pOperator == NULL) {
×
2828
    code = terrno;
×
2829
    goto _error;
×
2830
  }
2831

2832
  pOperator->pPhyNode = pIntervalPhyNode;
×
2833
  SInterval interval = {.interval = pIntervalPhyNode->interval,
×
2834
                        .sliding = pIntervalPhyNode->sliding,
×
2835
                        .intervalUnit = pIntervalPhyNode->intervalUnit,
×
2836
                        .slidingUnit = pIntervalPhyNode->slidingUnit,
×
2837
                        .offset = pIntervalPhyNode->offset,
×
2838
                        .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision,
×
2839
                        .timeRange = pIntervalPhyNode->timeRange};
2840
  calcIntervalAutoOffset(&interval);
×
2841

2842
  pMergeIntervalInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow));
×
2843

2844
  SIntervalAggOperatorInfo* pIntervalInfo = &pMergeIntervalInfo->intervalAggOperatorInfo;
×
2845
  pIntervalInfo->win = pTaskInfo->window;
×
2846
  pIntervalInfo->binfo.inputTsOrder = pIntervalPhyNode->window.node.inputTsOrder;
×
2847
  pIntervalInfo->interval = interval;
×
2848
  pIntervalInfo->binfo.mergeResultBlock = pIntervalPhyNode->window.mergeDataBlock;
×
2849
  pIntervalInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
×
2850
  pIntervalInfo->binfo.outputTsOrder = pIntervalPhyNode->window.node.outputTsOrder;
×
2851

2852
  SExprSupp* pExprSupp = &pOperator->exprSupp;
×
2853
  pExprSupp->hasWindowOrGroup = true;
×
2854
  pExprSupp->hasWindow = true;
×
2855

2856
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
×
2857
  initResultSizeInfo(&pOperator->resultInfo, 4096);
×
2858

2859
  int32_t    num = 0;
×
2860
  SExprInfo* pExprInfo = NULL;
×
2861
  code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
×
2862
  QUERY_CHECK_CODE(code, lino, _error);
×
2863

2864
  code = initAggSup(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
×
2865
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
×
2866
  if (code != TSDB_CODE_SUCCESS) {
×
2867
    goto _error;
×
2868
  }
2869

2870
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pIntervalPhyNode->window.node.pOutputDataBlockDesc);
×
2871
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
×
2872
  initBasicInfo(&pIntervalInfo->binfo, pResBlock);
×
2873
  code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win);
×
2874
  QUERY_CHECK_CODE(code, lino, _error);
×
2875

2876
  pIntervalInfo->timeWindowInterpo = false;
×
2877
  code = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo, &pIntervalInfo->timeWindowInterpo);
×
2878
  QUERY_CHECK_CODE(code, lino, _error);
×
2879
  if (pIntervalInfo->timeWindowInterpo) {
×
2880
    pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
2881
    if (pIntervalInfo->binfo.resultRowInfo.openWindow == NULL) {
×
2882
      goto _error;
×
2883
    }
2884
  }
2885

2886
  pIntervalInfo->pOperator = pOperator;
×
2887
  initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo);
×
2888
  setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false,
×
2889
                  OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo);
2890
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAggNext, NULL, destroyMergeIntervalOperatorInfo,
×
2891
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2892
  setOperatorResetStateFn(pOperator, resetMergeIntervalOperState);
×
2893

2894
  code = appendDownstream(pOperator, &downstream, 1);
×
2895
  if (code != TSDB_CODE_SUCCESS) {
×
2896
    goto _error;
×
2897
  }
2898

2899
  *pOptrInfo = pOperator;
×
2900
  return TSDB_CODE_SUCCESS;
×
2901
_error:
×
2902
  if (pMergeIntervalInfo != NULL) {
×
2903
    destroyMergeIntervalOperatorInfo(pMergeIntervalInfo);
×
2904
  }
2905
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2906
  pTaskInfo->code = code;
×
2907
  return code;
×
2908
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc