• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4696

29 Aug 2025 06:36AM UTC coverage: 58.25% (+0.2%) from 58.041%
#4696

push

travis-ci

web-flow
fix(gpt): fix race-condition in preparing tmp files (#32800)

133424 of 291873 branches covered (45.71%)

Branch coverage included in aggregate %.

5 of 34 new or added lines in 6 files covered. (14.71%)

444 existing lines in 69 files now uncovered.

201767 of 283561 relevant lines covered (71.15%)

17907122.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.22
/source/libs/executor/src/timewindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "executorInt.h"
16
#include "filter.h"
17
#include "function.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "taoserror.h"
23
#include "tchecksum.h"
24
#include "tcommon.h"
25
#include "tcompare.h"
26
#include "tdatablock.h"
27
#include "tfill.h"
28
#include "tglobal.h"
29
#include "tlog.h"
30
#include "ttime.h"
31

32
typedef enum SResultTsInterpType {
33
  RESULT_ROW_START_INTERP = 1,
34
  RESULT_ROW_END_INTERP = 2,
35
} SResultTsInterpType;
36

37
typedef struct SOpenWindowInfo {
38
  SResultRowPosition pos;
39
  uint64_t           groupId;
40
} SOpenWindowInfo;
41

42
static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo);
43

44
static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult,
45
                                              uint64_t groupId, SExecTaskInfo* pTaskInfo);
46
static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult);
47

48
static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, bool masterscan,
223,050,502✔
49
                                      SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
50
                                      int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
51
                                      SExecTaskInfo* pTaskInfo) {
52
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
223,050,502✔
53
                                                  masterscan, tableGroupId, pTaskInfo, true, pAggSup, true);
54

55
  if (pResultRow == NULL || pTaskInfo->code != 0) {
223,582,343!
56
    *pResult = NULL;
×
57
    qError("failed to set result output buffer, error:%s", tstrerror(pTaskInfo->code));
×
58
    return pTaskInfo->code;
×
59
  }
60

61
  // set time window for current result
62
  TAOS_SET_POBJ_ALIGNED(&pResultRow->win, win);
223,600,949✔
63
  *pResult = pResultRow;
223,600,949✔
64
  return setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
223,600,949✔
65
}
66

67
void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) {
144,981,205✔
68
  pRowSup->win.ekey = ts;
144,981,205✔
69
  pRowSup->prevTs = ts;
144,981,205✔
70
  pRowSup->numOfRows += 1;
144,981,205✔
71
  pRowSup->groupId = groupId;
144,981,205✔
72
}
144,981,205✔
73

74
void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, uint64_t groupId) {
117,135,085✔
75
  pRowSup->startRowIndex = rowIndex;
117,135,085✔
76
  pRowSup->numOfRows = 0;
117,135,085✔
77
  pRowSup->win.skey = tsList[rowIndex];
117,135,085✔
78
  pRowSup->groupId = groupId;
117,135,085✔
79
}
117,135,085✔
80

81
FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos,
×
82
                                            int32_t order, int64_t* pData) {
83
  int32_t forwardRows = 0;
189,035,002✔
84

85
  if (order == TSDB_ORDER_ASC) {
×
86
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
135,518,043✔
87
    if (end >= 0) {
135,296,081!
88
      forwardRows = end;
135,324,110✔
89

90
      while (pData[end + pos] == ekey) {
136,755,138!
91
        forwardRows += 1;
1,431,028✔
92
        ++pos;
1,431,028✔
93
      }
94
    }
95
  } else {
96
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
53,516,959✔
97
    if (end >= 0) {
53,550,028!
98
      forwardRows = end;
53,561,909✔
99

100
      while (pData[end + pos] == ekey) {
55,673,389!
101
        forwardRows += 1;
2,111,480✔
102
        ++pos;
2,111,480✔
103
      }
104
    }
105
    //    int32_t end = searchFn((char*)pData, pos + 1, ekey, order);
106
    //    if (end >= 0) {
107
    //      forwardRows = pos - end;
108
    //
109
    //      if (pData[end] == ekey) {
110
    //        forwardRows += 1;
111
    //      }
112
    //    }
113
  }
114

115
  return forwardRows;
188,846,109✔
116
}
117

118
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
308,761,214✔
119
  int32_t midPos = -1;
308,761,214✔
120
  int32_t numOfRows;
121

122
  if (num <= 0) {
308,761,214!
123
    return -1;
×
124
  }
125

126
  TSKEY*  keyList = (TSKEY*)pValue;
308,761,214✔
127
  int32_t firstPos = 0;
308,761,214✔
128
  int32_t lastPos = num - 1;
308,761,214✔
129

130
  if (order == TSDB_ORDER_DESC) {
308,761,214✔
131
    // find the first position which is smaller than the key
132
    while (1) {
133
      if (key >= keyList[firstPos]) return firstPos;
650,385,238✔
134
      if (key == keyList[lastPos]) return lastPos;
619,970,679✔
135

136
      if (key < keyList[lastPos]) {
619,668,742✔
137
        lastPos += 1;
68,348,643✔
138
        if (lastPos >= num) {
68,348,643!
139
          return -1;
×
140
        } else {
141
          return lastPos;
68,348,643✔
142
        }
143
      }
144

145
      numOfRows = lastPos - firstPos + 1;
551,320,099✔
146
      midPos = (numOfRows >> 1) + firstPos;
551,320,099✔
147

148
      if (key < keyList[midPos]) {
551,320,099✔
149
        firstPos = midPos + 1;
134,138,479✔
150
      } else if (key > keyList[midPos]) {
417,181,620!
151
        lastPos = midPos - 1;
418,196,243✔
152
      } else {
UNCOV
153
        break;
×
154
      }
155
    }
156

157
  } else {
158
    // find the first position which is bigger than the key
159
    while (1) {
160
      if (key <= keyList[firstPos]) return firstPos;
1,441,364,710✔
161
      if (key == keyList[lastPos]) return lastPos;
1,378,971,846✔
162

163
      if (key > keyList[lastPos]) {
1,377,994,829✔
164
        lastPos = lastPos + 1;
145,338,166✔
165
        if (lastPos >= num)
145,338,166!
166
          return -1;
×
167
        else
168
          return lastPos;
145,338,166✔
169
      }
170

171
      numOfRows = lastPos - firstPos + 1;
1,232,656,663✔
172
      midPos = (numOfRows >> 1u) + firstPos;
1,232,656,663✔
173

174
      if (key < keyList[midPos]) {
1,232,656,663✔
175
        lastPos = midPos - 1;
977,539,817✔
176
      } else if (key > keyList[midPos]) {
255,116,846✔
177
        firstPos = midPos + 1;
253,114,195✔
178
      } else {
179
        break;
2,002,651✔
180
      }
181
    }
182
  }
183

184
  return midPos;
988,028✔
185
}
186

187
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
193,408,589✔
188
                                 __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order) {
189
  int32_t num = -1;
193,408,589✔
190
  int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
193,408,589✔
191

192
  if (order == TSDB_ORDER_ASC) {
193,408,589✔
193
    if (ekey < pDataBlockInfo->window.ekey && pPrimaryColumn) {
138,556,520!
194
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
135,407,100!
195
      if (item != NULL) {
135,296,081!
196
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
197
      }
198
    } else {
199
      num = pDataBlockInfo->rows - startPos;
3,149,420✔
200
      if (item != NULL) {
3,149,420!
201
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
202
      }
203
    }
204
  } else {  // desc
205
    if (ekey > pDataBlockInfo->window.skey && pPrimaryColumn) {
54,852,069!
206
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
53,627,902!
207
      if (item != NULL) {
53,550,028!
208
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
209
      }
210
    } else {
211
      num = pDataBlockInfo->rows - startPos;
1,224,167✔
212
      if (item != NULL) {
1,224,167!
213
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
214
      }
215
    }
216
  }
217

218
  return num;
193,219,696✔
219
}
220

221
void doTimeWindowInterpolation(SArray* pPrevValues, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs,
19,017,121✔
222
                               int32_t curRowIndex, TSKEY windowKey, int32_t type, SExprSupp* pSup) {
223
  SqlFunctionCtx* pCtx = pSup->pCtx;
19,017,121✔
224

225
  int32_t index = 1;
19,017,121✔
226
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
40,006,083✔
227
    if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) {
20,979,427✔
228
      pCtx[k].start.key = INT64_MIN;
1,959,755✔
229
      continue;
1,959,755✔
230
    }
231

232
    SFunctParam*     pParam = &pCtx[k].param[0];
19,030,228✔
233
    SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, pParam->pCol->slotId);
19,030,228✔
234

235
    double v1 = 0, v2 = 0, v = 0;
18,977,532✔
236
    if (prevRowIndex == -1) {
18,977,532!
237
      SGroupKeys* p = taosArrayGet(pPrevValues, index);
×
238
      GET_TYPED_DATA(v1, double, pColInfo->info.type, p->pData, typeGetTypeModFromColInfo(&pColInfo->info));
1!
239
    } else {
240
      GET_TYPED_DATA(v1, double, pColInfo->info.type, colDataGetData(pColInfo, prevRowIndex),
19,040,831!
241
                     typeGetTypeModFromColInfo(&pColInfo->info));
242
    }
243

244
    GET_TYPED_DATA(v2, double, pColInfo->info.type, colDataGetData(pColInfo, curRowIndex),
19,040,832!
245
                   typeGetTypeModFromColInfo(&pColInfo->info));
246

247
#if 0
248
    if (functionId == FUNCTION_INTERP) {
249
      if (type == RESULT_ROW_START_INTERP) {
250
        pCtx[k].start.key = prevTs;
251
        pCtx[k].start.val = v1;
252

253
        pCtx[k].end.key = curTs;
254
        pCtx[k].end.val = v2;
255

256
        if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_VARBINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR ||
257
            pColInfo->info.type == TSDB_DATA_TYPE_GEOMETRY) {
258
          if (prevRowIndex == -1) {
259
            //            pCtx[k].start.ptr = (char*)pRuntimeEnv->prevRow[index];
260
          } else {
261
            pCtx[k].start.ptr = (char*)pColInfo->pData + prevRowIndex * pColInfo->info.bytes;
262
          }
263

264
          pCtx[k].end.ptr = (char*)pColInfo->pData + curRowIndex * pColInfo->info.bytes;
265
        }
266
      }
267
    } else if (functionId == FUNCTION_TWA) {
268
#endif
269

270
    SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
19,040,832✔
271
    SPoint point2 = (SPoint){.key = curTs, .val = &v2};
19,040,832✔
272
    SPoint point = (SPoint){.key = windowKey, .val = &v};
19,040,832✔
273

274
    if (!fmIsElapsedFunc(pCtx[k].functionId)) {
19,040,832✔
275
      taosGetLinearInterpolationVal(&point, TSDB_DATA_TYPE_DOUBLE, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, 0);
9,681,154✔
276
    }
277

278
    if (type == RESULT_ROW_START_INTERP) {
19,029,207✔
279
      pCtx[k].start.key = point.key;
9,299,682✔
280
      pCtx[k].start.val = v;
9,299,682✔
281
    } else {
282
      pCtx[k].end.key = point.key;
9,729,525✔
283
      pCtx[k].end.val = v;
9,729,525✔
284
    }
285

286
    index += 1;
19,029,207✔
287
  }
288
#if 0
289
  }
290
#endif
291
}
19,026,656✔
292

293
static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t type) {
1,084,479✔
294
  if (type == RESULT_ROW_START_INTERP) {
1,084,479✔
295
    for (int32_t k = 0; k < numOfOutput; ++k) {
1,627,739✔
296
      pCtx[k].start.key = INT64_MIN;
871,479✔
297
    }
298
  } else {
299
    for (int32_t k = 0; k < numOfOutput; ++k) {
716,576✔
300
      pCtx[k].end.key = INT64_MIN;
388,357✔
301
    }
302
  }
303
}
1,084,479✔
304

305
static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, int32_t pos, SSDataBlock* pBlock,
10,037,287✔
306
                                              const TSKEY* tsCols, STimeWindow* win, SExprSupp* pSup) {
307
  bool ascQuery = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
10,037,287✔
308

309
  TSKEY curTs = tsCols[pos];
10,037,287✔
310

311
  SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
10,037,287✔
312
  TSKEY       lastTs = *(int64_t*)pTsKey->pData;
10,031,064✔
313

314
  // lastTs == INT64_MIN and pos == 0 means this is the first time window, interpolation is not needed.
315
  // start exactly from this point, no need to do interpolation
316
  TSKEY key = ascQuery ? win->skey : win->ekey;
10,031,064✔
317
  if (key == curTs) {
10,031,064✔
318
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
584,730✔
319
    return true;
584,739✔
320
  }
321

322
  // it is the first time window, no need to do interpolation
323
  if (pTsKey->isNull && pos == 0) {
9,446,334!
324
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
161,856✔
325
  } else {
326
    TSKEY prevTs = ((pos == 0) ? lastTs : tsCols[pos - 1]);
9,284,478!
327
    doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, pos - 1, curTs, pos, key,
9,284,478✔
328
                              RESULT_ROW_START_INTERP, pSup);
329
  }
330

331
  return true;
9,461,246✔
332
}
333

334
static int32_t setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex,
10,059,937✔
335
                                               int32_t nextRowIndex, SArray* pDataBlock, const TSKEY* tsCols,
336
                                               TSKEY blockEkey, STimeWindow* win, bool* pRes) {
337
  int32_t code = TSDB_CODE_SUCCESS;
10,059,937✔
338
  int32_t lino = 0;
10,059,937✔
339
  int32_t order = pInfo->binfo.inputTsOrder;
10,059,937✔
340

341
  TSKEY actualEndKey = tsCols[endRowIndex];
10,059,937✔
342
  TSKEY key = (order == TSDB_ORDER_ASC) ? win->ekey : win->skey;
10,059,937!
343

344
  // not ended in current data block, do not invoke interpolation
345
  if ((key > blockEkey && (order == TSDB_ORDER_ASC)) || (key < blockEkey && (order == TSDB_ORDER_DESC))) {
10,059,937!
346
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
174,172✔
347
    (*pRes) = false;
175,744✔
348
    return code;
175,744✔
349
  }
350

351
  // there is actual end point of current time window, no interpolation needs
352
  if (key == actualEndKey) {
9,885,765✔
353
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
152,508✔
354
    (*pRes) = true;
152,508✔
355
    return code;
152,508✔
356
  }
357

358
  if (nextRowIndex < 0) {
9,733,257!
359
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
360
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
361
  }
362

363
  TSKEY nextKey = tsCols[nextRowIndex];
9,733,257✔
364
  doTimeWindowInterpolation(pInfo->pPrevValues, pDataBlock, actualEndKey, endRowIndex, nextKey, nextRowIndex, key,
9,733,257✔
365
                            RESULT_ROW_END_INTERP, pSup);
366
  (*pRes) = true;
9,729,461✔
367
  return code;
9,729,461✔
368
}
369

370
bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd, EStreamType blockType) {
191,603,176✔
371
  if (pInterval->interval != pInterval->sliding &&
191,603,176✔
372
      ((pWin->ekey < calStart || pWin->skey > calEnd) || (blockType == STREAM_PULL_DATA && pWin->skey < calStart))) {
122,552,234!
373
    return false;
×
374
  }
375

376
  return true;
191,603,176✔
377
}
378

379
bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo) {
191,601,969✔
380
  return inCalSlidingWindow(pInterval, pWin, pBlockInfo->calWin.skey, pBlockInfo->calWin.ekey, pBlockInfo->type);
191,601,969✔
381
}
382

383
int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
194,119,306✔
384
                               TSKEY* primaryKeys, int32_t prevPosition, int32_t order) {
385
  bool ascQuery = (order == TSDB_ORDER_ASC);
194,119,306✔
386

387
  int32_t precision = pInterval->precision;
194,119,306✔
388
  getNextTimeWindow(pInterval, pNext, order);
194,119,306✔
389

390
  // next time window is not in current block
391
  if ((pNext->skey > pDataBlockInfo->window.ekey && order == TSDB_ORDER_ASC) ||
193,649,985!
392
      (pNext->ekey < pDataBlockInfo->window.skey && order == TSDB_ORDER_DESC)) {
192,010,305!
393
    return -1;
2,075,460✔
394
  }
395

396
  if (!inSlidingWindow(pInterval, pNext, pDataBlockInfo) && order == TSDB_ORDER_ASC) {
191,574,525!
397
    return -1;
×
398
  }
399

400
  TSKEY   skey = ascQuery ? pNext->skey : pNext->ekey;
191,570,505✔
401
  int32_t startPos = 0;
191,570,505✔
402

403
  // tumbling time window query, a special case of sliding time window query
404
  if (pInterval->sliding == pInterval->interval && prevPosition != -1) {
191,570,505!
405
    startPos = prevPosition + 1;
69,097,598✔
406
  } else {
407
    if ((skey <= pDataBlockInfo->window.skey && ascQuery) || (skey >= pDataBlockInfo->window.ekey && !ascQuery)) {
122,472,907✔
408
      startPos = 0;
2,534,471✔
409
    } else {
410
      startPos = binarySearchForKey((char*)primaryKeys, pDataBlockInfo->rows, skey, order);
119,938,436✔
411
    }
412
  }
413

414
  /* interp query with fill should not skip time window */
415
  //  if (pQueryAttr->pointInterpQuery && pQueryAttr->fillType != TSDB_FILL_NONE) {
416
  //    return startPos;
417
  //  }
418

419
  /*
420
   * This time window does not cover any data, try next time window,
421
   * this case may happen when the time window is too small
422
   */
423
  if (primaryKeys != NULL) {
191,477,488✔
424
    if (ascQuery && primaryKeys[startPos] > pNext->ekey) {
232,558,456✔
425
      TSKEY next = primaryKeys[startPos];
41,090,051✔
426
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
41,090,051✔
427
        pNext->skey = taosTimeTruncate(next, pInterval);
3,649✔
428
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
4✔
429
      } else {
430
        pNext->ekey += ((next - pNext->ekey + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
41,086,402✔
431
        pNext->skey = pNext->ekey - pInterval->interval + 1;
41,086,402✔
432
      }
433
    } else if ((!ascQuery) && primaryKeys[startPos] < pNext->skey) {
150,376,632✔
434
      TSKEY next = primaryKeys[startPos];
15,010,824✔
435
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
15,010,824!
UNCOV
436
        pNext->skey = taosTimeTruncate(next, pInterval);
×
437
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
×
438
      } else {
439
        pNext->skey -= ((pNext->skey - next + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
15,011,387✔
440
        pNext->ekey = pNext->skey + pInterval->interval - 1;
15,011,387✔
441
      }
442
    }
443
  }
444

445
  return startPos;
191,479,773✔
446
}
447

448
static bool isResultRowInterpolated(SResultRow* pResult, SResultTsInterpType type) {
30,117,846✔
449
  if (type == RESULT_ROW_START_INTERP) {
30,117,846✔
450
    return pResult->startInterp == true;
10,044,855✔
451
  } else {
452
    return pResult->endInterp == true;
20,072,991✔
453
  }
454
}
455

456
static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
19,921,280✔
457
  if (type == RESULT_ROW_START_INTERP) {
19,921,280✔
458
    pResult->startInterp = true;
10,046,774✔
459
  } else {
460
    pResult->endInterp = true;
9,874,506✔
461
  }
462
}
19,921,280✔
463

464
static int32_t doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataBlock* pBlock, SResultRow* pResult,
191,567,469✔
465
                                           STimeWindow* win, int32_t startPos, int32_t forwardRows, SExprSupp* pSup) {
466
  int32_t code = TSDB_CODE_SUCCESS;
191,567,469✔
467
  int32_t lino = 0;
191,567,469✔
468
  if (!pInfo->timeWindowInterpo) {
191,567,469✔
469
    return code;
181,584,201✔
470
  }
471

472
  if (pBlock == NULL) {
9,983,268!
473
    code = TSDB_CODE_INVALID_PARA;
×
474
    return code;
×
475
  }
476

477
  if (pBlock->pDataBlock == NULL) {
9,983,268!
478
    return code;
×
479
  }
480

481
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
9,983,268✔
482

483
  TSKEY* tsCols = (TSKEY*)(pColInfo->pData);
10,043,453✔
484
  bool   done = isResultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
10,043,453✔
485
  if (!done) {  // it is not interpolated, now start to generated the interpolated value
10,047,044✔
486
    bool interp = setTimeWindowInterpolationStartTs(pInfo, startPos, pBlock, tsCols, win, pSup);
10,037,332✔
487
    if (interp) {
10,046,518!
488
      setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
10,046,561✔
489
    }
490
  } else {
491
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
9,712✔
492
  }
493

494
  // point interpolation does not require the end key time window interpolation.
495
  // interpolation query does not generate the time window end interpolation
496
  done = isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
10,057,324✔
497
  if (!done) {
10,057,818!
498
    int32_t endRowIndex = startPos + forwardRows - 1;
10,057,949✔
499
    int32_t nextRowIndex = endRowIndex + 1;
10,057,949✔
500

501
    // duplicated ts row does not involve in the interpolation of end value for current time window
502
    int32_t x = endRowIndex;
10,057,949✔
503
    while (x > 0) {
10,058,002✔
504
      if (tsCols[x] == tsCols[x - 1]) {
10,004,846✔
505
        x -= 1;
53✔
506
      } else {
507
        endRowIndex = x;
10,004,793✔
508
        break;
10,004,793✔
509
      }
510
    }
511

512
    TSKEY endKey = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey;
10,057,949!
513
    bool  interp = false;
10,057,949✔
514
    code = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, nextRowIndex, pBlock->pDataBlock, tsCols, endKey,
10,057,949✔
515
                                           win, &interp);
516
    QUERY_CHECK_CODE(code, lino, _end);
10,058,602!
517
    if (interp) {
10,058,602✔
518
      setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
9,883,021✔
519
    }
520
  } else {
521
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
×
522
  }
523

524
_end:
10,058,903✔
525
  if (code != TSDB_CODE_SUCCESS) {
10,058,903!
526
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
527
  }
528
  return code;
10,058,902✔
529
}
530

531
static void saveDataBlockLastRow(SArray* pPrevKeys, const SSDataBlock* pBlock, SArray* pCols) {
61,171✔
532
  if (pBlock->pDataBlock == NULL) {
61,171!
533
    return;
×
534
  }
535

536
  size_t num = taosArrayGetSize(pPrevKeys);
61,171✔
537
  for (int32_t k = 0; k < num; ++k) {
183,512✔
538
    SColumn* pc = taosArrayGet(pCols, k);
122,342✔
539

540
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pc->slotId);
122,341✔
541

542
    SGroupKeys* pkey = taosArrayGet(pPrevKeys, k);
122,341✔
543
    for (int32_t i = pBlock->info.rows - 1; i >= 0; --i) {
122,341!
544
      if (colDataIsNull_s(pColInfo, i)) {
244,682!
545
        continue;
×
546
      }
547

548
      char* val = colDataGetData(pColInfo, i);
122,341!
549
      if (IS_VAR_DATA_TYPE(pkey->type)) {
122,341!
550
        if (IS_STR_DATA_BLOB(pkey->type)) {
×
551
          memcpy(pkey->pData, val, blobDataTLen(val));
×
552
        } else {
553
          memcpy(pkey->pData, val, varDataTLen(val));
×
554
        }
555
      } else {
556
        memcpy(pkey->pData, val, pkey->bytes);
122,341✔
557
      }
558

559
      break;
122,341✔
560
    }
561
  }
562
}
563

564
static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t numOfExprs, SResultRowInfo* pResultRowInfo,
61,171✔
565
                                       SSDataBlock* pBlock, int32_t scanFlag, int64_t* tsCols, SResultRowPosition* p) {
566
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
61,171✔
567

568
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
61,171✔
569
  SExprSupp*                pSup = &pOperatorInfo->exprSupp;
61,171✔
570

571
  int32_t startPos = 0;
61,171✔
572
  int32_t numOfOutput = pSup->numOfExprs;
61,171✔
573

574
  SResultRow* pResult = NULL;
61,171✔
575

576
  while (1) {
1✔
577
    SListNode*          pn = tdListGetHead(pResultRowInfo->openWindow);
61,172✔
578
    SOpenWindowInfo*    pOpenWin = (SOpenWindowInfo*)pn->data;
61,172✔
579
    uint64_t            groupId = pOpenWin->groupId;
61,172✔
580
    SResultRowPosition* p1 = &pOpenWin->pos;
61,172✔
581
    if (p->pageId == p1->pageId && p->offset == p1->offset) {
61,172!
582
      break;
61,171✔
583
    }
584

585
    SResultRow* pr = getResultRowByPos(pInfo->aggSup.pResultBuf, p1, false);
1✔
586
    if (NULL == pr) {
1!
587
      T_LONG_JMP(pTaskInfo->env, terrno);
×
588
    }
589

590
    if (!(pr->offset == p1->offset && pr->pageId == p1->pageId)) {
1!
591
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
592
      T_LONG_JMP(pTaskInfo->env, terrno);
×
593
    }
594

595
    if (pr->closed) {
1!
596
      if (!(isResultRowInterpolated(pr, RESULT_ROW_START_INTERP) &&
×
597
            isResultRowInterpolated(pr, RESULT_ROW_END_INTERP))) {
×
598
        pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
599
        T_LONG_JMP(pTaskInfo->env, terrno);
×
600
      }
601
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
×
602
      taosMemoryFree(pNode);
×
603
      continue;
×
604
    }
605

606
    STimeWindow w = pr->win;
1✔
607
    int32_t     ret = setTimeWindowOutputBuf(pResultRowInfo, &w, (scanFlag == MAIN_SCAN), &pResult, groupId, pSup->pCtx,
1✔
608
                                             numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
609
    if (ret != TSDB_CODE_SUCCESS) {
1!
610
      T_LONG_JMP(pTaskInfo->env, ret);
×
611
    }
612

613
    if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
1!
614
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
615
      T_LONG_JMP(pTaskInfo->env, terrno);
×
616
    }
617

618
    SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
1✔
619
    if (!pTsKey) {
1!
620
      pTaskInfo->code = terrno;
×
621
      T_LONG_JMP(pTaskInfo->env, terrno);
×
622
    }
623

624
    int64_t prevTs = *(int64_t*)pTsKey->pData;
1✔
625
    if (groupId == pBlock->info.id.groupId) {
1!
626
      TSKEY curTs = pBlock->info.window.skey;
1✔
627
      if (tsCols != NULL) {
1!
628
        curTs = tsCols[startPos];
1✔
629
      }
630
      doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, curTs, startPos, w.ekey,
1✔
631
                                RESULT_ROW_END_INTERP, pSup);
632
    }
633

634
    setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
1✔
635
    setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
1✔
636

637
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, 1);
1✔
638
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0,
1✔
639
                                          pBlock->info.rows, numOfExprs);
1✔
640
    if (ret != TSDB_CODE_SUCCESS) {
1!
641
      T_LONG_JMP(pTaskInfo->env, ret);
×
642
    }
643

644
    if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
1!
645
      closeResultRow(pr);
1✔
646
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
1✔
647
      taosMemoryFree(pNode);
1!
648
    } else {  // the remains are can not be closed yet.
649
      break;
×
650
    }
651
  }
652
}
61,171✔
653

654
static bool tsKeyCompFn(void* l, void* r, void* param) {
2,179,774✔
655
  TSKEY*                    lTS = (TSKEY*)l;
2,179,774✔
656
  TSKEY*                    rTS = (TSKEY*)r;
2,179,774✔
657
  SIntervalAggOperatorInfo* pInfo = param;
2,179,774✔
658
  return pInfo->binfo.outputTsOrder == ORDER_ASC ? *lTS < *rTS : *lTS > *rTS;
2,179,774!
659
}
660

661
static bool isCalculatedWin(SIntervalAggOperatorInfo* pInfo, const STimeWindow* win, uint64_t tableGroupId) {
267,110✔
662
  char keyBuf[sizeof(TSKEY) + sizeof(uint64_t)] = {0};
267,110✔
663
  SET_RES_WINDOW_KEY(keyBuf, (char*)&win->skey, sizeof(TSKEY), tableGroupId);
267,110✔
664
  return tSimpleHashGet(pInfo->aggSup.pResultRowHashTable, keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))) != NULL;
267,110✔
665
}
666

667
/**
668
 * @brief check if cur window should be filtered out by limit info
669
 * @retval true if should be filtered out
670
 * @retval false if not filtering out
671
 * @note If no limit info, we skip filtering.
672
 *       If input/output ts order mismatch, we skip filtering too.
673
 *       eg. input ts order: desc, and output ts order: asc, limit: 10
674
 *       IntervalOperator should output the first 10 windows, however, we can't find the first 10 windows until we scan
675
 *       every tuple in every block.
676
 *       And the boundedQueue keeps refreshing all records with smaller ts key.
677
 */
678
static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STimeWindow* win, uint64_t groupId,
193,577,975✔
679
                                  SExecTaskInfo* pTaskInfo) {
680
  int32_t code = TSDB_CODE_SUCCESS;
193,577,975✔
681
  int32_t lino = 0;
193,577,975✔
682
  if (!pOperatorInfo->limited  // if no limit info, no filter will be applied
193,577,975✔
683
      || pOperatorInfo->binfo.inputTsOrder != pOperatorInfo->binfo.outputTsOrder
1,683,259✔
684
      // if input/output ts order mismatch, no filter
685
  ) {
686
    return false;
193,219,260✔
687
  }
688

689
  if (pOperatorInfo->limit == 0) return true;
358,715✔
690

691
  if (pOperatorInfo->pBQ == NULL) {
358,711✔
692
    pOperatorInfo->pBQ = createBoundedQueue(pOperatorInfo->limit - 1, tsKeyCompFn, taosAutoMemoryFree, pOperatorInfo);
20,222✔
693
    QUERY_CHECK_NULL(pOperatorInfo->pBQ, code, lino, _end, terrno);
20,227!
694
  }
695

696
  bool shouldFilter = false;
358,716✔
697
  // if BQ has been full, compare it with top of BQ
698
  if (taosBQSize(pOperatorInfo->pBQ) == taosBQMaxSize(pOperatorInfo->pBQ) + 1) {
358,716✔
699
    PriorityQueueNode* top = taosBQTop(pOperatorInfo->pBQ);
101,463✔
700
    shouldFilter = tsKeyCompFn(top->data, &win->skey, pOperatorInfo);
101,463✔
701
  }
702
  if (shouldFilter) {
358,692✔
703
    return true;
91,581✔
704
  } else if (isCalculatedWin(pOperatorInfo, win, groupId)) {
267,111✔
705
    return false;
25,289✔
706
  }
707

708
  // cur win not been filtered out and not been pushed into BQ yet, push it into BQ
709
  PriorityQueueNode node = {.data = taosMemoryMalloc(sizeof(TSKEY))};
241,986!
710
  QUERY_CHECK_NULL(node.data, code, lino, _end, terrno);
241,993!
711

712
  *((TSKEY*)node.data) = win->skey;
241,993✔
713

714
  if (NULL == taosBQPush(pOperatorInfo->pBQ, &node)) {
241,993!
715
    taosMemoryFree(node.data);
×
716
    return true;
×
717
  }
718

719
_end:
241,838✔
720
  if (code != TSDB_CODE_SUCCESS) {
241,838!
721
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
722
    pTaskInfo->code = code;
×
723
    T_LONG_JMP(pTaskInfo->env, code);
×
724
  }
725
  return false;
241,838✔
726
}
727

728
static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
2,167,076✔
729
                            int32_t scanFlag) {
730
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
2,167,076✔
731

732
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
2,167,076✔
733
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
2,167,076✔
734

735
  int32_t     startPos = 0;
2,167,076✔
736
  int32_t     numOfOutput = pSup->numOfExprs;
2,167,076✔
737
  int64_t*    tsCols = extractTsCol(pBlock, pInfo, pTaskInfo);
2,167,076✔
738
  uint64_t    tableGroupId = pBlock->info.id.groupId;
2,166,878✔
739
  bool        ascScan = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
2,166,878✔
740
  TSKEY       ts = getStartTsKey(&pBlock->info.window, tsCols);
2,166,878✔
741
  SResultRow* pResult = NULL;
2,166,875✔
742

743
  if (tableGroupId != pInfo->curGroupId) {
2,166,875✔
744
    pInfo->handledGroupNum += 1;
90,913✔
745
    if (pInfo->slimited && pInfo->handledGroupNum > pInfo->slimit) {
90,913✔
746
      return true;
12✔
747
    } else {
748
      pInfo->curGroupId = tableGroupId;
90,901✔
749
      destroyBoundedQueue(pInfo->pBQ);
90,901✔
750
      pInfo->pBQ = NULL;
90,897✔
751
    }
752
  }
753

754
  STimeWindow win =
755
      getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->binfo.inputTsOrder);
2,166,859✔
756
  if (filterWindowWithLimit(pInfo, &win, tableGroupId, pTaskInfo)) return false;
2,166,755✔
757

758
  int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
2,081,282✔
759
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
760
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
2,081,512!
761
    T_LONG_JMP(pTaskInfo->env, ret);
16!
762
  }
763

764
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
2,081,496✔
765
  int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
2,081,496✔
766
                                                 pInfo->binfo.inputTsOrder);
767

768
  // prev time window not interpolation yet.
769
  if (pInfo->timeWindowInterpo) {
2,081,415✔
770
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
61,171✔
771
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
61,171✔
772

773
    // restore current time window
774
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
61,171✔
775
                                 numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
776
    if (ret != TSDB_CODE_SUCCESS) {
61,171!
777
      T_LONG_JMP(pTaskInfo->env, ret);
×
778
    }
779

780
    // window start key interpolation
781
    ret = doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup);
61,171✔
782
    if (ret != TSDB_CODE_SUCCESS) {
61,170!
783
      T_LONG_JMP(pTaskInfo->env, ret);
×
784
    }
785
  }
786
  // qDebug("hashIntervalAgg1 window skey: %lld, ekey:%lld, startPos: %d, forwardRows: %d",
787
  //   win.skey, win.ekey, startPos, forwardRows);
788
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, 1);
2,081,414✔
789
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
2,081,390✔
790
                                        pBlock->info.rows, numOfOutput);
2,081,390✔
791
  if (ret != TSDB_CODE_SUCCESS) {
2,081,590!
792
    T_LONG_JMP(pTaskInfo->env, ret);
×
793
  }
794

795
  doCloseWindow(pResultRowInfo, pInfo, pResult);
2,081,590✔
796

797
  STimeWindow nextWin = win;
2,081,556✔
798
  while (1) {
192,068,527✔
799
    int32_t prevEndPos = forwardRows - 1 + startPos;
194,150,083✔
800
    startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos,
194,150,083✔
801
                                      pInfo->binfo.inputTsOrder);
802
    if (startPos < 0 || filterWindowWithLimit(pInfo, &nextWin, tableGroupId, pTaskInfo)) {
193,477,557!
803
      break;
804
    }
805
    // null data, failed to allocate more memory buffer
806
    int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
191,391,681✔
807
                                          pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
808
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
191,255,186!
809
      T_LONG_JMP(pTaskInfo->env, code);
×
810
    }
811

812
    // qDebug("hashIntervalAgg2 window skey: %lld, ekey:%lld, startPos: %d, forwardRows: %d",
813
      // nextWin.skey, nextWin.ekey, startPos, forwardRows);
814

815
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
191,283,840✔
816
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
191,283,840✔
817
                                           pInfo->binfo.inputTsOrder);
818
    // window start(end) key interpolation
819
    code = doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
191,280,925✔
820
    if (code != TSDB_CODE_SUCCESS) {
191,537,778!
821
      T_LONG_JMP(pTaskInfo->env, code);
×
822
    }
823
    // TODO: add to open window? how to close the open windows after input blocks exhausted?
824
#if 0
825
    if ((ascScan && ekey <= pBlock->info.window.ekey) ||
826
        (!ascScan && ekey >= pBlock->info.window.skey)) {
827
      // window start(end) key interpolation
828
      doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
829
    } else if (pInfo->timeWindowInterpo) {
830
      addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
831
    }
832
#endif
833
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, 1);
191,537,778✔
834
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
191,553,739✔
835
                                          pBlock->info.rows, numOfOutput);
191,553,739✔
836
    if (ret != TSDB_CODE_SUCCESS) {
192,109,560!
837
      T_LONG_JMP(pTaskInfo->env, ret);
×
838
    }
839
    doCloseWindow(pResultRowInfo, pInfo, pResult);
192,109,560✔
840
  }
841

842
  if (pInfo->timeWindowInterpo) {
2,038,763✔
843
    saveDataBlockLastRow(pInfo->pPrevValues, pBlock, pInfo->pInterpCols);
61,171✔
844
  }
845
  return false;
2,081,637✔
846
}
847

848
void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult) {
194,156,034✔
849
  // current result is done in computing final results.
850
  if (pInfo->timeWindowInterpo && isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
194,156,034✔
851
    closeResultRow(pResult);
9,884,746✔
852
    SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
9,884,270✔
853
    taosMemoryFree(pNode);
9,884,209!
854
  }
855
}
194,154,649✔
856

857
SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId,
61,171✔
858
                                       SExecTaskInfo* pTaskInfo) {
859
  int32_t         code = TSDB_CODE_SUCCESS;
61,171✔
860
  int32_t         lino = 0;
61,171✔
861
  SOpenWindowInfo openWin = {0};
61,171✔
862
  openWin.pos.pageId = pResult->pageId;
61,171✔
863
  openWin.pos.offset = pResult->offset;
61,171✔
864
  openWin.groupId = groupId;
61,171✔
865
  SListNode* pn = tdListGetTail(pResultRowInfo->openWindow);
61,171✔
866
  if (pn == NULL) {
61,171✔
867
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
59,334✔
868
    QUERY_CHECK_CODE(code, lino, _end);
59,334!
869
    return openWin.pos;
59,334✔
870
  }
871

872
  SOpenWindowInfo* px = (SOpenWindowInfo*)pn->data;
1,837✔
873
  if (px->pos.pageId != openWin.pos.pageId || px->pos.offset != openWin.pos.offset || px->groupId != openWin.groupId) {
1,837!
874
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
1✔
875
    QUERY_CHECK_CODE(code, lino, _end);
1!
876
  }
877

878
_end:
1,837✔
879
  if (code != TSDB_CODE_SUCCESS) {
1,837!
880
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
881
    pTaskInfo->code = code;
×
882
    T_LONG_JMP(pTaskInfo->env, code);
×
883
  }
884
  return openWin.pos;
1,837✔
885
}
886

887
int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo) {
2,571,702✔
888
  TSKEY* tsCols = NULL;
2,571,702✔
889

890
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
2,571,702!
891
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
2,571,734✔
892
    if (!pColDataInfo) {
2,571,399!
893
      pTaskInfo->code = terrno;
×
894
      T_LONG_JMP(pTaskInfo->env, terrno);
×
895
    }
896

897
    tsCols = (int64_t*)pColDataInfo->pData;
2,571,399✔
898
    if (tsCols[0] == 0) {
2,571,399✔
899
      qWarn("%s at line %d.block start ts:%" PRId64 ",end ts:%" PRId64, __func__, __LINE__, tsCols[0],
2!
900
            tsCols[pBlock->info.rows - 1]);
901
    }
902

903
    if (tsCols[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
2,571,452!
904
      int32_t code = blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
814,438✔
905
      if (code != TSDB_CODE_SUCCESS) {
814,449!
906
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
907
        pTaskInfo->code = code;
×
908
        T_LONG_JMP(pTaskInfo->env, code);
×
909
      }
910
    }
911
  }
912

913
  return tsCols;
2,571,431✔
914
}
915

916
static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
1,876,715✔
917
  if (OPTR_IS_OPENED(pOperator)) {
1,876,715✔
918
    return TSDB_CODE_SUCCESS;
456,741✔
919
  }
920

921
  int32_t        code = TSDB_CODE_SUCCESS;
1,419,974✔
922
  int32_t        lino = 0;
1,419,974✔
923
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1,419,974✔
924
  SOperatorInfo* downstream = pOperator->pDownstream[0];
1,419,974✔
925

926
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
1,419,974✔
927
  SExprSupp*                pSup = &pOperator->exprSupp;
1,419,974✔
928

929
  int32_t scanFlag = MAIN_SCAN;
1,419,974✔
930
  int64_t st = taosGetTimestampUs();
1,423,577✔
931

932
  pInfo->cleanGroupResInfo = false;
1,423,577✔
933
  while (1) {
2,167,041✔
934
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
3,590,618✔
935
    if (pBlock == NULL) {
3,592,281✔
936
      break;
1,425,671✔
937
    }
938

939
    pInfo->binfo.pRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
2,166,610✔
940

941
    if (pInfo->scalarSupp.pExprInfo != NULL) {
2,166,610✔
942
      SExprSupp* pExprSup = &pInfo->scalarSupp;
105,775✔
943
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
105,775✔
944
                                   GET_STM_RTINFO(pOperator->pTaskInfo));
105,775!
945
      QUERY_CHECK_CODE(code, lino, _end);
105,763!
946
    }
947

948
    // the pDataBlock are always the same one, no need to call this again
949
    code = setInputDataBlock(pSup, pBlock, pInfo->binfo.inputTsOrder, scanFlag, true);
2,166,598✔
950
    QUERY_CHECK_CODE(code, lino, _end);
2,167,103!
951
    if (hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag)) break;
2,167,103✔
952
  }
953

954
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->binfo.outputTsOrder);
1,425,683✔
955
  QUERY_CHECK_CODE(code, lino, _end);
1,425,616!
956
  pInfo->cleanGroupResInfo = true;
1,425,616✔
957

958
  OPTR_SET_OPENED(pOperator);
1,425,616✔
959

960
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
1,425,835✔
961

962
_end:
1,425,835✔
963
  if (code != TSDB_CODE_SUCCESS) {
1,425,835!
964
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
965
    pTaskInfo->code = code;
×
966
    T_LONG_JMP(pTaskInfo->env, code);
×
967
  }
968
  return code;
1,425,835✔
969
}
970

971
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock) {
67,350✔
972
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
67,350✔
973
  SExprSupp*     pSup = &pOperator->exprSupp;
67,350✔
974

975
  SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->stateCol.slotId);
67,350✔
976
  if (!pStateColInfoData) {
67,351!
977
    pTaskInfo->code = terrno;
×
978
    T_LONG_JMP(pTaskInfo->env, terrno);
×
979
  }
980
  int64_t gid = pBlock->info.id.groupId;
67,351✔
981

982
  bool    hasResult = false;
67,351✔
983
  bool    masterScan = true;
67,351✔
984
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
67,351✔
985
  int32_t bytes = pStateColInfoData->info.bytes;
67,351✔
986

987
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
67,351✔
988
  if (!pColInfoData) {
67,351✔
989
    pTaskInfo->code = terrno;
1✔
990
    T_LONG_JMP(pTaskInfo->env, terrno);
×
991
  }
992
  TSKEY* tsList = (TSKEY*)pColInfoData->pData;
67,350✔
993

994
  SWindowRowsSup* pRowSup = &pInfo->winSup;
67,350✔
995
  pRowSup->numOfRows = 0;
67,350✔
996
  pRowSup->startRowIndex = 0;
67,350✔
997

998
  struct SColumnDataAgg* pAgg = NULL;
67,350✔
999
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
20,941,733✔
1000
    pAgg = (pBlock->pBlockAgg != NULL) ? &pBlock->pBlockAgg[pInfo->stateCol.slotId] : NULL;
20,874,319!
1001
    if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pAgg)) {
41,748,638✔
1002
      continue;
1,690✔
1003
    }
1004
    hasResult = true;
20,872,629✔
1005
    if (pStateColInfoData->pData == NULL) {
20,872,629!
1006
      qError("%s:%d state column data is null", __FILE__, __LINE__);
×
1007
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1008
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1009
    }
1010

1011
    char* val = colDataGetData(pStateColInfoData, j);
20,872,629!
1012

1013
    if (gid != pRowSup->groupId || !pInfo->hasKey) {
20,872,629✔
1014
      // todo extract method
1015
      if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) {
54,364!
UNCOV
1016
        if (IS_STR_DATA_BLOB(pInfo->stateKey.type)) {
×
1017
          blobDataCopy(pInfo->stateKey.pData, val);
×
1018
        } else {
1019
          varDataCopy(pInfo->stateKey.pData, val);
127✔
1020
        }
1021
      } else {
1022
        memcpy(pInfo->stateKey.pData, val, bytes);
54,909✔
1023
      }
1024

1025
      pInfo->hasKey = true;
54,364✔
1026

1027
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
54,364✔
1028
      doKeepTuple(pRowSup, tsList[j], gid);
55,036✔
1029
    } else if (compareVal(val, &pInfo->stateKey)) {
20,818,265✔
1030
      doKeepTuple(pRowSup, tsList[j], gid);
2,317,371✔
1031
    } else {  // a new state window started
1032
      SResultRow* pResult = NULL;
18,500,974✔
1033

1034
      // keep the time window for the closed time window.
1035
      STimeWindow window = pRowSup->win;
18,500,974✔
1036

1037
      pRowSup->win.ekey = pRowSup->win.skey;
18,500,974✔
1038
      int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
18,500,974✔
1039
                                           numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1040
      if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
18,500,427!
1041
        T_LONG_JMP(pTaskInfo->env, ret);
×
1042
      }
1043

1044
      updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
18,500,427✔
1045
      ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData,
18,500,204✔
1046
                                            pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
18,500,204✔
1047
      if (ret != TSDB_CODE_SUCCESS) {
18,500,901!
1048
        T_LONG_JMP(pTaskInfo->env, ret);
×
1049
      }
1050

1051
      // here we start a new session window
1052
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
18,500,901✔
1053
      doKeepTuple(pRowSup, tsList[j], gid);
18,500,774✔
1054

1055
      // todo extract method
1056
      if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) {
18,500,694!
1057
        if (IS_STR_DATA_BLOB(pInfo->stateKey.type)) {
9!
1058
          blobDataCopy(pInfo->stateKey.pData, val);
×
1059
        } else {
1060
          varDataCopy(pInfo->stateKey.pData, val);
9✔
1061
        }
1062
      } else {
1063
        memcpy(pInfo->stateKey.pData, val, bytes);
18,500,685✔
1064
      }
1065
    }
1066
  }
1067

1068
  if (!hasResult) {
67,414✔
1069
    return;
169✔
1070
  }
1071
  SResultRow* pResult = NULL;
67,245✔
1072
  pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
67,245✔
1073
  int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
67,245✔
1074
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1075
  if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
67,184!
1076
    T_LONG_JMP(pTaskInfo->env, ret);
×
1077
  }
1078

1079
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
67,184✔
1080
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
67,184✔
1081
                                        pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
67,184✔
1082
  if (ret != TSDB_CODE_SUCCESS) {
67,184✔
1083
    T_LONG_JMP(pTaskInfo->env, ret);
2!
1084
  }
1085
}
1086

1087
static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) {
70,155✔
1088
  if (OPTR_IS_OPENED(pOperator)) {
70,155✔
1089
    return TSDB_CODE_SUCCESS;
359✔
1090
  }
1091

1092
  int32_t                   code = TSDB_CODE_SUCCESS;
69,796✔
1093
  int32_t                   lino = 0;
69,796✔
1094
  SStateWindowOperatorInfo* pInfo = pOperator->info;
69,796✔
1095
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
69,796✔
1096

1097
  SExprSupp* pSup = &pOperator->exprSupp;
69,796✔
1098
  int32_t    order = pInfo->binfo.inputTsOrder;
69,796✔
1099
  int64_t    st = taosGetTimestampUs();
69,796✔
1100

1101
  SOperatorInfo* downstream = pOperator->pDownstream[0];
69,796✔
1102
  pInfo->cleanGroupResInfo = false;
69,796✔
1103
  while (1) {
67,351✔
1104
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
137,147✔
1105
    if (pBlock == NULL) {
137,145✔
1106
      break;
69,794✔
1107
    }
1108

1109
    pInfo->binfo.pRes->info.scanFlag = pBlock->info.scanFlag;
67,351✔
1110
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
67,351✔
1111
    QUERY_CHECK_CODE(code, lino, _end);
67,352!
1112

1113
    code = blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
67,352✔
1114
    QUERY_CHECK_CODE(code, lino, _end);
67,352!
1115

1116
    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
1117
    if (pInfo->scalarSup.pExprInfo != NULL) {
67,352✔
1118
      pTaskInfo->code =
2,559✔
1119
          projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
2,561✔
1120
                                pInfo->scalarSup.numOfExprs, NULL, GET_STM_RTINFO(pOperator->pTaskInfo));
2,561!
1121
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
2,559!
1122
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1123
      }
1124
    }
1125

1126
    doStateWindowAggImpl(pOperator, pInfo, pBlock);
67,350✔
1127
  }
1128

1129
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
69,794✔
1130
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
69,794✔
1131
  QUERY_CHECK_CODE(code, lino, _end);
69,794!
1132
  pInfo->cleanGroupResInfo = true;
69,794✔
1133
  pOperator->status = OP_RES_TO_RETURN;
69,794✔
1134

1135
_end:
69,794✔
1136
  if (code != TSDB_CODE_SUCCESS) {
69,794!
1137
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1138
    pTaskInfo->code = code;
×
1139
    T_LONG_JMP(pTaskInfo->env, code);
×
1140
  }
1141
  return code;
69,794✔
1142
}
1143

1144
static int32_t doStateWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
123,459✔
1145
  if (pOperator->status == OP_EXEC_DONE) {
123,459✔
1146
    (*ppRes) = NULL;
53,305✔
1147
    return TSDB_CODE_SUCCESS;
53,305✔
1148
  }
1149

1150
  int32_t                   code = TSDB_CODE_SUCCESS;
70,154✔
1151
  int32_t                   lino = 0;
70,154✔
1152
  SStateWindowOperatorInfo* pInfo = pOperator->info;
70,154✔
1153
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
70,154✔
1154
  SOptrBasicInfo*           pBInfo = &pInfo->binfo;
70,154✔
1155

1156
  code = pOperator->fpSet._openFn(pOperator);
70,154✔
1157
  QUERY_CHECK_CODE(code, lino, _end);
70,153!
1158

1159
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
70,153✔
1160
  QUERY_CHECK_CODE(code, lino, _end);
70,153!
1161

1162
  while (1) {
×
1163
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
70,153✔
1164
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
70,153✔
1165
    QUERY_CHECK_CODE(code, lino, _end);
70,153!
1166

1167
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
70,153✔
1168
    if (!hasRemain) {
70,153✔
1169
      setOperatorCompleted(pOperator);
69,794✔
1170
      break;
69,794✔
1171
    }
1172

1173
    if (pBInfo->pRes->info.rows > 0) {
359!
1174
      break;
359✔
1175
    }
1176
  }
1177

1178
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
70,153✔
1179

1180
_end:
70,153✔
1181
  if (code != TSDB_CODE_SUCCESS) {
70,153!
1182
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1183
    pTaskInfo->code = code;
×
1184
    T_LONG_JMP(pTaskInfo->env, code);
×
1185
  }
1186
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
70,153✔
1187
  return code;
70,153✔
1188
}
1189

1190
static int32_t doBuildIntervalResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
2,661,885✔
1191
  int32_t                   code = TSDB_CODE_SUCCESS;
2,661,885✔
1192
  int32_t                   lino = 0;
2,661,885✔
1193
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
2,661,885✔
1194
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
2,661,885✔
1195

1196
  if (pOperator->status == OP_EXEC_DONE) {
2,661,885✔
1197
    (*ppRes) = NULL;
784,960✔
1198
    return code;
784,960✔
1199
  }
1200

1201
  SSDataBlock* pBlock = pInfo->binfo.pRes;
1,876,925✔
1202
  code = pOperator->fpSet._openFn(pOperator);
1,876,925✔
1203
  QUERY_CHECK_CODE(code, lino, _end);
1,882,398!
1204

1205
  while (1) {
×
1206
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
1,882,398✔
1207
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
1,881,984✔
1208
    QUERY_CHECK_CODE(code, lino, _end);
1,882,011!
1209

1210
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
1,882,011✔
1211
    if (!hasRemain) {
1,881,915✔
1212
      setOperatorCompleted(pOperator);
1,424,872✔
1213
      break;
1,425,458✔
1214
    }
1215

1216
    if (pBlock->info.rows > 0) {
457,043!
1217
      break;
457,043✔
1218
    }
1219
  }
1220

1221
  size_t rows = pBlock->info.rows;
1,882,501✔
1222
  pOperator->resultInfo.totalRows += rows;
1,882,501✔
1223

1224
_end:
1,882,501✔
1225
  if (code != TSDB_CODE_SUCCESS) {
1,882,501!
1226
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1227
    pTaskInfo->code = code;
×
1228
    T_LONG_JMP(pTaskInfo->env, code);
×
1229
  }
1230
  (*ppRes) = (rows == 0) ? NULL : pBlock;
1,882,501✔
1231
  return code;
1,882,501✔
1232
}
1233

1234
static void destroyStateWindowOperatorInfo(void* param) {
69,796✔
1235
  if (param == NULL) {
69,796!
1236
    return;
×
1237
  }
1238
  SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
69,796✔
1239
  cleanupBasicInfo(&pInfo->binfo);
69,796✔
1240
  taosMemoryFreeClear(pInfo->stateKey.pData);
69,796!
1241
  if (pInfo->pOperator) {
69,796!
1242
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
69,796✔
1243
                      pInfo->cleanGroupResInfo);
69,796✔
1244
    pInfo->pOperator = NULL;
69,796✔
1245
  }
1246

1247
  cleanupExprSupp(&pInfo->scalarSup);
69,796✔
1248
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
69,796✔
1249
  cleanupAggSup(&pInfo->aggSup);
69,796✔
1250
  cleanupGroupResInfo(&pInfo->groupResInfo);
69,796✔
1251

1252
  taosMemoryFreeClear(param);
69,796!
1253
}
1254

1255
static void freeItem(void* param) {
187,704✔
1256
  SGroupKeys* pKey = (SGroupKeys*)param;
187,704✔
1257
  taosMemoryFree(pKey->pData);
187,704!
1258
}
187,704✔
1259

1260
void destroyIntervalOperatorInfo(void* param) {
1,960,452✔
1261
  if (param == NULL) {
1,960,452!
1262
    return;
×
1263
  }
1264

1265
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
1,960,452✔
1266

1267
  cleanupBasicInfo(&pInfo->binfo);
1,960,452✔
1268
  if (pInfo->pOperator) {
1,960,612!
1269
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
1,960,613✔
1270
                      pInfo->cleanGroupResInfo);
1,960,613✔
1271
    pInfo->pOperator = NULL;
1,960,134✔
1272
  }
1273

1274
  cleanupAggSup(&pInfo->aggSup);
1,960,133✔
1275
  cleanupExprSupp(&pInfo->scalarSupp);
1,960,608✔
1276

1277
  pInfo->binfo.resultRowInfo.openWindow = tdListFree(pInfo->binfo.resultRowInfo.openWindow);
1,960,555✔
1278

1279
  taosArrayDestroy(pInfo->pInterpCols);
1,960,554✔
1280
  pInfo->pInterpCols = NULL;
1,960,345✔
1281

1282
  taosArrayDestroyEx(pInfo->pPrevValues, freeItem);
1,960,345✔
1283
  pInfo->pPrevValues = NULL;
1,960,298✔
1284

1285
  cleanupGroupResInfo(&pInfo->groupResInfo);
1,960,298✔
1286
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
1,960,591✔
1287
  destroyBoundedQueue(pInfo->pBQ);
1,960,623✔
1288
  taosMemoryFreeClear(param);
1,960,229!
1289
}
1290

1291
static int32_t initWindowInterpPrevVal(SIntervalAggOperatorInfo* pInfo) {
93,851✔
1292
  int32_t code = TSDB_CODE_SUCCESS;
93,851✔
1293
  int32_t lino = 0;
93,851✔
1294
  void*   tmp = NULL;
93,851✔
1295

1296
  pInfo->pInterpCols = taosArrayInit(4, sizeof(SColumn));
93,851✔
1297
  QUERY_CHECK_NULL(pInfo->pInterpCols, code, lino, _end, terrno);
93,852!
1298

1299
  pInfo->pPrevValues = taosArrayInit(4, sizeof(SGroupKeys));
93,852✔
1300
  QUERY_CHECK_NULL(pInfo->pPrevValues, code, lino, _end, terrno);
93,852!
1301

1302
  {  // ts column
1303
    SColumn c = {0};
93,852✔
1304
    c.colId = 1;
93,852✔
1305
    c.slotId = pInfo->primaryTsIndex;
93,852✔
1306
    c.type = TSDB_DATA_TYPE_TIMESTAMP;
93,852✔
1307
    c.bytes = sizeof(int64_t);
93,852✔
1308
    tmp = taosArrayPush(pInfo->pInterpCols, &c);
93,852✔
1309
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
93,851!
1310

1311
    SGroupKeys key;
1312
    key.bytes = c.bytes;
93,851✔
1313
    key.type = c.type;
93,851✔
1314
    key.isNull = true;  // to denote no value is assigned yet
93,851✔
1315
    key.pData = taosMemoryCalloc(1, c.bytes);
93,851!
1316
    QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
93,852!
1317

1318
    tmp = taosArrayPush(pInfo->pPrevValues, &key);
93,852✔
1319
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
93,851!
1320
  }
1321
_end:
93,851✔
1322
  if (code != TSDB_CODE_SUCCESS) {
93,851!
1323
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1324
  }
1325
  return code;
93,851✔
1326
}
1327

1328
static int32_t timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SIntervalAggOperatorInfo* pInfo,
1,956,920✔
1329
                                      bool* pRes) {
1330
  // the primary timestamp column
1331
  bool    needed = false;
1,956,920✔
1332
  int32_t code = TSDB_CODE_SUCCESS;
1,956,920✔
1333
  int32_t lino = 0;
1,956,920✔
1334
  void*   tmp = NULL;
1,956,920✔
1335

1336
  for (int32_t i = 0; i < numOfCols; ++i) {
6,265,619✔
1337
    SExprInfo* pExpr = pCtx[i].pExpr;
4,401,548✔
1338
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
4,401,548✔
1339
      needed = true;
93,852✔
1340
      break;
93,852✔
1341
    }
1342
  }
1343

1344
  if (needed) {
1,957,923✔
1345
    code = initWindowInterpPrevVal(pInfo);
93,852✔
1346
    QUERY_CHECK_CODE(code, lino, _end);
93,851!
1347
  }
1348

1349
  for (int32_t i = 0; i < numOfCols; ++i) {
6,356,562✔
1350
    SExprInfo* pExpr = pCtx[i].pExpr;
4,396,421✔
1351

1352
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
4,396,421✔
1353
      SFunctParam* pParam = &pExpr->base.pParam[0];
95,303✔
1354

1355
      SColumn c = *pParam->pCol;
95,303✔
1356
      tmp = taosArrayPush(pInfo->pInterpCols, &c);
95,303✔
1357
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
93,851!
1358

1359
      SGroupKeys key = {0};
93,851✔
1360
      key.bytes = c.bytes;
93,851✔
1361
      key.type = c.type;
93,851✔
1362
      key.isNull = false;
93,851✔
1363
      key.pData = taosMemoryCalloc(1, c.bytes);
93,851!
1364
      QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
93,852!
1365

1366
      tmp = taosArrayPush(pInfo->pPrevValues, &key);
93,852✔
1367
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
93,852!
1368
    }
1369
  }
1370

1371
_end:
1,960,141✔
1372
  if (code != TSDB_CODE_SUCCESS) {
1,960,141!
1373
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1374
  }
1375
  *pRes = needed;
1,958,756✔
1376
  return code;
1,958,756✔
1377
}
1378

1379
static int32_t resetInterval(SOperatorInfo* pOper, SIntervalAggOperatorInfo* pIntervalInfo){
×
1380
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
×
1381
  SIntervalPhysiNode* pPhynode = (SIntervalPhysiNode*)pOper->pPhyNode;
×
1382
  pOper->status = OP_NOT_OPENED;
×
1383

1384
  resetBasicOperatorState(&pIntervalInfo->binfo);
×
1385
  cleanupResultInfo(pIntervalInfo->pOperator->pTaskInfo, &pIntervalInfo->pOperator->exprSupp, &pIntervalInfo->groupResInfo, &pIntervalInfo->aggSup,
×
1386
    pIntervalInfo->cleanGroupResInfo);
×
1387

1388
  colDataDestroy(&pIntervalInfo->twAggSup.timeWindowData);
×
1389
  int32_t code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pTaskInfo->window);
×
1390
  if (code == 0) {
×
1391
    code = resetAggSup(&pOper->exprSupp, &pIntervalInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
×
1392
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
×
1393
                       &pTaskInfo->storageAPI.functionStore);
1394
  }
1395
  if (code == 0) {
×
1396
    code = resetExprSupp(&pIntervalInfo->scalarSupp, pTaskInfo, pPhynode->window.pExprs, NULL,
×
1397
                         &pTaskInfo->storageAPI.functionStore);
1398
  }
1399

1400
  if (pIntervalInfo->binfo.resultRowInfo.openWindow != NULL){
×
1401
    tdListEmpty(pIntervalInfo->binfo.resultRowInfo.openWindow);
×
1402
  }
1403

1404
  if (pPhynode->window.node.pSlimit && ((SLimitNode*)pPhynode->window.node.pSlimit)->limit) {
×
1405
    pIntervalInfo->curGroupId = UINT64_MAX;
×
1406
  }
1407

1408
  pIntervalInfo->cleanGroupResInfo = false;
×
1409
  pIntervalInfo->handledGroupNum = 0;
×
1410
  pIntervalInfo->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder;
×
1411
  pIntervalInfo->binfo.outputTsOrder = pPhynode->window.node.outputTsOrder;
×
1412

1413
  taosArrayDestroy(pIntervalInfo->pInterpCols);
×
1414
  pIntervalInfo->pInterpCols = NULL;
×
1415

1416
  if (pIntervalInfo->pPrevValues != NULL) {
×
1417
    taosArrayDestroyEx(pIntervalInfo->pPrevValues, freeItem);
×
1418
    pIntervalInfo->pPrevValues = NULL;
×
1419
    initWindowInterpPrevVal(pIntervalInfo);
×
1420
  }
1421

1422
  cleanupGroupResInfo(&pIntervalInfo->groupResInfo);
×
1423
  destroyBoundedQueue(pIntervalInfo->pBQ);
×
1424
  pIntervalInfo->pBQ = NULL;
×
1425
  return code;
×
1426
}
1427

1428
static int32_t resetIntervalOperState(SOperatorInfo* pOper) {
×
1429
  SIntervalAggOperatorInfo* pInfo = pOper->info;
×
1430
  return resetInterval(pOper, pInfo);
×
1431
}
1432

1433
int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
1,422,840✔
1434
                                   SOperatorInfo** pOptrInfo) {
1435
  QRY_PARAM_CHECK(pOptrInfo);
1,422,840!
1436

1437
  int32_t                   code = TSDB_CODE_SUCCESS;
1,422,840✔
1438
  int32_t                   lino = 0;
1,422,840✔
1439
  SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
1,422,840!
1440
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1,424,017!
1441
  if (pInfo == NULL || pOperator == NULL) {
1,424,295!
1442
    code = terrno;
×
1443
    lino = __LINE__;
×
1444
    goto _error;
×
1445
  }
1446

1447
  pOperator->pPhyNode = pPhyNode;
1,424,397✔
1448
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->window.node.pOutputDataBlockDesc);
1,424,397✔
1449
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
1,424,621!
1450
  initBasicInfo(&pInfo->binfo, pResBlock);
1,424,621✔
1451

1452
  SExprSupp* pSup = &pOperator->exprSupp;
1,424,365✔
1453
  pSup->hasWindowOrGroup = true;
1,424,365✔
1454

1455
  pInfo->primaryTsIndex = ((SColumnNode*)pPhyNode->window.pTspk)->slotId;
1,424,365✔
1456

1457
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
1,424,365✔
1458
  initResultSizeInfo(&pOperator->resultInfo, 512);
1,424,365✔
1459
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
1,425,129✔
1460
  QUERY_CHECK_CODE(code, lino, _error);
1,424,993!
1461

1462
  int32_t    num = 0;
1,424,993✔
1463
  SExprInfo* pExprInfo = NULL;
1,424,993✔
1464
  code = createExprInfo(pPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
1,424,993✔
1465
  QUERY_CHECK_CODE(code, lino, _error);
1,424,941!
1466

1467
  code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
1,424,941✔
1468
                    &pTaskInfo->storageAPI.functionStore);
1469
  QUERY_CHECK_CODE(code, lino, _error);
1,425,073!
1470

1471
  SInterval interval = {.interval = pPhyNode->interval,
1,425,073✔
1472
                        .sliding = pPhyNode->sliding,
1,425,073✔
1473
                        .intervalUnit = pPhyNode->intervalUnit,
1,425,073✔
1474
                        .slidingUnit = pPhyNode->slidingUnit,
1,425,073✔
1475
                        .offset = pPhyNode->offset,
1,425,073✔
1476
                        .precision = ((SColumnNode*)pPhyNode->window.pTspk)->node.resType.precision,
1,425,073✔
1477
                        .timeRange = pPhyNode->timeRange};
1478
  calcIntervalAutoOffset(&interval);
1,425,073✔
1479

1480
  STimeWindowAggSupp as = {
1,424,691✔
1481
      .waterMark = pPhyNode->window.watermark,
1,424,691✔
1482
      .calTrigger = pPhyNode->window.triggerType,
1,424,691✔
1483
      .maxTs = INT64_MIN,
1484
  };
1485

1486
  pInfo->win = pTaskInfo->window;
1,424,691✔
1487
  pInfo->binfo.inputTsOrder = pPhyNode->window.node.inputTsOrder;
1,424,691✔
1488
  pInfo->binfo.outputTsOrder = pPhyNode->window.node.outputTsOrder;
1,424,691✔
1489
  pInfo->interval = interval;
1,424,691✔
1490
  pInfo->twAggSup = as;
1,424,691✔
1491
  pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock;
1,424,691✔
1492
  if (pPhyNode->window.node.pLimit && ((SLimitNode*)pPhyNode->window.node.pLimit)->limit) {
1,424,691!
1493
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pLimit;
43,103✔
1494
    pInfo->limited = true;
43,103✔
1495
    pInfo->limit = pLimit->limit->datum.i + (pLimit->offset ? pLimit->offset->datum.i : 0);
43,103✔
1496
  }
1497
  if (pPhyNode->window.node.pSlimit && ((SLimitNode*)pPhyNode->window.node.pSlimit)->limit) {
1,424,691!
1498
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pSlimit;
100✔
1499
    pInfo->slimited = true;
100✔
1500
    pInfo->slimit = pLimit->limit->datum.i + (pLimit->offset ? pLimit->offset->datum.i : 0);
100✔
1501
    pInfo->curGroupId = UINT64_MAX;
100✔
1502
  }
1503

1504
  if (pPhyNode->window.pExprs != NULL) {
1,424,691✔
1505
    int32_t    numOfScalar = 0;
3,529✔
1506
    SExprInfo* pScalarExprInfo = NULL;
3,529✔
1507
    code = createExprInfo(pPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
3,529✔
1508
    QUERY_CHECK_CODE(code, lino, _error);
3,528!
1509

1510
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
3,528✔
1511
    if (code != TSDB_CODE_SUCCESS) {
3,529!
1512
      goto _error;
×
1513
    }
1514
  }
1515

1516
  code = filterInitFromNode((SNode*)pPhyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
1,424,691✔
1517
                            pTaskInfo->pStreamRuntimeInfo);
1,424,691✔
1518
  if (code != TSDB_CODE_SUCCESS) {
1,422,802!
1519
    goto _error;
×
1520
  }
1521

1522
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
1,422,802✔
1523
  QUERY_CHECK_CODE(code, lino, _error);
1,425,167!
1524

1525
  pInfo->timeWindowInterpo = false;
1,425,167✔
1526
  code = timeWindowinterpNeeded(pSup->pCtx, num, pInfo, &pInfo->timeWindowInterpo);
1,425,167✔
1527
  QUERY_CHECK_CODE(code, lino, _error);
1,424,180!
1528
  if (pInfo->timeWindowInterpo) {
1,424,180✔
1529
    pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
93,852✔
1530
    if (pInfo->binfo.resultRowInfo.openWindow == NULL) {
93,852!
1531
      goto _error;
×
1532
    }
1533
  }
1534

1535
  pInfo->pOperator = pOperator;
1,424,180✔
1536
  pInfo->cleanGroupResInfo = false;
1,424,180✔
1537
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
1,424,180✔
1538
  setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED,
1,421,224✔
1539
                  pInfo, pTaskInfo);
1540

1541
  pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResultNext, NULL, destroyIntervalOperatorInfo,
1,421,944✔
1542
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1543
  setOperatorResetStateFn(pOperator, resetIntervalOperState);
1,421,555✔
1544
  code = appendDownstream(pOperator, &downstream, 1);
1,422,054✔
1545
  if (code != TSDB_CODE_SUCCESS) {
1,423,383!
1546
    goto _error;
×
1547
  }
1548

1549
  *pOptrInfo = pOperator;
1,423,383✔
1550
  return TSDB_CODE_SUCCESS;
1,423,383✔
1551

1552
_error:
×
1553
  if (pInfo != NULL) {
×
1554
    destroyIntervalOperatorInfo(pInfo);
×
1555
  }
1556

1557
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1558
  pTaskInfo->code = code;
×
1559
  qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code));
×
1560
  return code;
×
1561
}
1562

1563
// todo handle multiple timeline cases. assume no timeline interweaving
1564
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) {
119,468✔
1565
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
119,468✔
1566
  SExprSupp*     pSup = &pOperator->exprSupp;
119,468✔
1567

1568
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
119,468✔
1569
  if (!pColInfoData) {
119,467!
1570
    pTaskInfo->code = terrno;
×
1571
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1572
  }
1573

1574
  bool    masterScan = true;
119,467✔
1575
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
119,467✔
1576
  int64_t gid = pBlock->info.id.groupId;
119,467✔
1577

1578
  int64_t gap = pInfo->gap;
119,467✔
1579

1580
  if (!pInfo->reptScan) {
119,467✔
1581
    pInfo->reptScan = true;
90,336✔
1582
    pInfo->winSup.prevTs = INT64_MIN;
90,336✔
1583
  }
1584

1585
  SWindowRowsSup* pRowSup = &pInfo->winSup;
119,467✔
1586
  pRowSup->numOfRows = 0;
119,467✔
1587
  pRowSup->startRowIndex = 0;
119,467✔
1588

1589
  // In case of ascending or descending order scan data, only one time window needs to be kepted for each table.
1590
  TSKEY* tsList = (TSKEY*)pColInfoData->pData;
119,467✔
1591
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
36,239,766✔
1592
    if (gid != pRowSup->groupId || pInfo->winSup.prevTs == INT64_MIN) {
36,120,152✔
1593
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
90,202✔
1594
      doKeepTuple(pRowSup, tsList[j], gid);
90,358✔
1595
    } else if (((tsList[j] - pRowSup->prevTs >= 0) && (tsList[j] - pRowSup->prevTs <= gap)) ||
36,029,950✔
1596
               ((pRowSup->prevTs - tsList[j] >= 0) && (pRowSup->prevTs - tsList[j] <= gap))) {
10,833,715✔
1597
      // The gap is less than the threshold, so it belongs to current session window that has been opened already.
1598
      doKeepTuple(pRowSup, tsList[j], gid);
25,196,295✔
1599
    } else {  // start a new session window
1600
      // start a new session window
1601
      if (pRowSup->numOfRows > 0) {  // handled data that belongs to the previous session window
10,833,655✔
1602
        SResultRow* pResult = NULL;
10,817,612✔
1603

1604
        // keep the time window for the closed time window.
1605
        STimeWindow window = pRowSup->win;
10,817,612✔
1606

1607
        int32_t ret =
1608
            setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
10,817,612✔
1609
                                   numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1610
        if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
10,817,901!
1611
          T_LONG_JMP(pTaskInfo->env, ret);
×
1612
        }
1613

1614
        // pInfo->numOfRows data belong to the current session window
1615
        updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
10,817,901✔
1616
        ret =
1617
            applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData,
10,817,829✔
1618
                                            pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
10,817,829✔
1619
        if (ret != TSDB_CODE_SUCCESS) {
10,818,020!
1620
          T_LONG_JMP(pTaskInfo->env, ret);
×
1621
        }
1622
      }
1623

1624
      // here we start a new session window
1625
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
10,834,063✔
1626
      doKeepTuple(pRowSup, tsList[j], gid);
10,833,942✔
1627
    }
1628
  }
1629

1630
  SResultRow* pResult = NULL;
119,614✔
1631
  pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
119,614✔
1632
  int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
119,614✔
1633
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1634
  if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
119,468!
1635
    T_LONG_JMP(pTaskInfo->env, ret);
×
1636
  }
1637

1638
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
119,468✔
1639
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
119,468✔
1640
                                        pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
119,468✔
1641
  if (ret != TSDB_CODE_SUCCESS) {
119,468!
1642
    T_LONG_JMP(pTaskInfo->env, ret);
×
1643
  }
1644
}
119,468✔
1645

1646
static int32_t doSessionWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
194,790✔
1647
  if (pOperator->status == OP_EXEC_DONE) {
194,790✔
1648
    (*ppRes) = NULL;
89,437✔
1649
    return TSDB_CODE_SUCCESS;
89,437✔
1650
  }
1651

1652
  int32_t                  code = TSDB_CODE_SUCCESS;
105,353✔
1653
  int32_t                  lino = 0;
105,353✔
1654
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
105,353✔
1655
  SSessionAggOperatorInfo* pInfo = pOperator->info;
105,353✔
1656
  SOptrBasicInfo*          pBInfo = &pInfo->binfo;
105,353✔
1657
  SExprSupp*               pSup = &pOperator->exprSupp;
105,353✔
1658

1659
  if (pOperator->status == OP_RES_TO_RETURN) {
105,353✔
1660
    while (1) {
×
1661
      doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
21✔
1662
      code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
21✔
1663
      QUERY_CHECK_CODE(code, lino, _end);
21!
1664

1665
      bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
21✔
1666
      if (!hasRemain) {
21✔
1667
        setOperatorCompleted(pOperator);
14✔
1668
        break;
14✔
1669
      }
1670

1671
      if (pBInfo->pRes->info.rows > 0) {
7!
1672
        break;
7✔
1673
      }
1674
    }
1675
    pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
21✔
1676
    (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
21!
1677
    return code;
21✔
1678
  }
1679

1680
  int64_t st = taosGetTimestampUs();
105,332✔
1681
  int32_t order = pInfo->binfo.inputTsOrder;
105,332✔
1682

1683
  SOperatorInfo* downstream = pOperator->pDownstream[0];
105,332✔
1684

1685
  pInfo->cleanGroupResInfo = false;
105,332✔
1686
  while (1) {
119,468✔
1687
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
224,800✔
1688
    if (pBlock == NULL) {
224,799✔
1689
      break;
105,332✔
1690
    }
1691

1692
    pBInfo->pRes->info.scanFlag = pBlock->info.scanFlag;
119,467✔
1693
    if (pInfo->scalarSupp.pExprInfo != NULL) {
119,467✔
1694
      SExprSupp* pExprSup = &pInfo->scalarSupp;
149✔
1695
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
149✔
1696
                                   GET_STM_RTINFO(pOperator->pTaskInfo));
149!
1697
      QUERY_CHECK_CODE(code, lino, _end);
149!
1698
    }
1699
    // the pDataBlock are always the same one, no need to call this again
1700
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
119,467✔
1701
    QUERY_CHECK_CODE(code, lino, _end);
119,468!
1702

1703
    code = blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
119,468✔
1704
    QUERY_CHECK_CODE(code, lino, _end);
119,468!
1705

1706
    doSessionWindowAggImpl(pOperator, pInfo, pBlock);
119,468✔
1707
  }
1708

1709
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
105,332✔
1710

1711
  // restore the value
1712
  pOperator->status = OP_RES_TO_RETURN;
105,332✔
1713

1714
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
105,332✔
1715
  QUERY_CHECK_CODE(code, lino, _end);
105,332!
1716
  pInfo->cleanGroupResInfo = true;
105,332✔
1717

1718
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
105,332✔
1719
  QUERY_CHECK_CODE(code, lino, _end);
105,332!
1720
  while (1) {
×
1721
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
105,332✔
1722
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
105,331✔
1723
    QUERY_CHECK_CODE(code, lino, _end);
105,331!
1724

1725
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
105,331✔
1726
    if (!hasRemain) {
105,332✔
1727
      setOperatorCompleted(pOperator);
105,318✔
1728
      break;
105,318✔
1729
    }
1730

1731
    if (pBInfo->pRes->info.rows > 0) {
14!
1732
      break;
14✔
1733
    }
1734
  }
1735
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
105,332✔
1736

1737
_end:
105,332✔
1738
  if (code != TSDB_CODE_SUCCESS) {
105,332!
1739
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1740
    pTaskInfo->code = code;
×
1741
    T_LONG_JMP(pTaskInfo->env, code);
×
1742
  }
1743
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
105,332✔
1744
  return code;
105,332✔
1745
}
1746

1747
static int32_t resetStatewindowOperState(SOperatorInfo* pOper) {
×
1748
  SStateWindowOperatorInfo* pInfo = pOper->info;
×
1749
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
×
1750
  SStateWinodwPhysiNode* pPhynode = (SStateWinodwPhysiNode*)pOper->pPhyNode;
×
1751
  pOper->status = OP_NOT_OPENED;
×
1752

1753
  resetBasicOperatorState(&pInfo->binfo);
×
1754
  cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
×
1755
                    pInfo->cleanGroupResInfo);
×
1756

1757
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
×
1758
  int32_t code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
×
1759
  if (code == 0) {
×
1760
    code = resetAggSup(&pOper->exprSupp, &pInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
×
1761
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
×
1762
                       &pTaskInfo->storageAPI.functionStore);
1763
  }
1764
  if (code == 0) {
×
1765
    code = resetExprSupp(&pInfo->scalarSup, pTaskInfo, pPhynode->window.pExprs, NULL,
×
1766
                         &pTaskInfo->storageAPI.functionStore);
1767
  }
1768

1769
  pInfo->cleanGroupResInfo = false;
×
1770
  pInfo->hasKey = false;
×
1771

1772
  cleanupGroupResInfo(&pInfo->groupResInfo);
×
1773
  memset(pInfo->stateKey.pData, 0, pInfo->stateKey.bytes);
×
1774
  return code;
×
1775
}
1776

1777
// todo make this as an non-blocking operator
1778
int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode,
69,796✔
1779
                                      SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
1780
  QRY_PARAM_CHECK(pOptrInfo);
69,796!
1781

1782
  int32_t                   code = TSDB_CODE_SUCCESS;
69,796✔
1783
  int32_t                   lino = 0;
69,796✔
1784
  SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
69,796!
1785
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
69,796!
1786
  if (pInfo == NULL || pOperator == NULL) {
69,796!
1787
    code = terrno;
×
1788
    goto _error;
×
1789
  }
1790

1791
  pOperator->pPhyNode = pStateNode;
69,796✔
1792
  pOperator->exprSupp.hasWindowOrGroup = true;
69,796✔
1793
  int32_t      tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
69,796✔
1794
  SColumnNode* pColNode = (SColumnNode*)(pStateNode->pStateKey);
69,796✔
1795

1796
  if (pStateNode->window.pExprs != NULL) {
69,796✔
1797
    int32_t    numOfScalarExpr = 0;
2,248✔
1798
    SExprInfo* pScalarExprInfo = NULL;
2,248✔
1799
    code = createExprInfo(pStateNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
2,248✔
1800
    QUERY_CHECK_CODE(code, lino, _error);
2,248!
1801

1802
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
2,248✔
1803
    if (code != TSDB_CODE_SUCCESS) {
2,248!
1804
      goto _error;
×
1805
    }
1806
  }
1807

1808
  pInfo->stateCol = extractColumnFromColumnNode(pColNode);
69,796✔
1809
  pInfo->stateKey.type = pInfo->stateCol.type;
69,796✔
1810
  pInfo->stateKey.bytes = pInfo->stateCol.bytes;
69,796✔
1811
  pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes);
69,796!
1812
  if (pInfo->stateKey.pData == NULL) {
69,796!
1813
    goto _error;
×
1814
  }
1815
  pInfo->binfo.inputTsOrder = pStateNode->window.node.inputTsOrder;
69,796✔
1816
  pInfo->binfo.outputTsOrder = pStateNode->window.node.outputTsOrder;
69,796✔
1817

1818
  code = filterInitFromNode((SNode*)pStateNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
69,796✔
1819
                            pTaskInfo->pStreamRuntimeInfo);
69,796✔
1820
  if (code != TSDB_CODE_SUCCESS) {
69,796!
1821
    goto _error;
×
1822
  }
1823

1824
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
69,796✔
1825

1826
  int32_t    num = 0;
69,796✔
1827
  SExprInfo* pExprInfo = NULL;
69,796✔
1828
  code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &num);
69,796✔
1829
  QUERY_CHECK_CODE(code, lino, _error);
69,796!
1830

1831
  initResultSizeInfo(&pOperator->resultInfo, 4096);
69,796✔
1832

1833
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
69,796✔
1834
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
69,796✔
1835
  if (code != TSDB_CODE_SUCCESS) {
69,796!
1836
    goto _error;
×
1837
  }
1838

1839
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pStateNode->window.node.pOutputDataBlockDesc);
69,796✔
1840
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
69,796!
1841
  initBasicInfo(&pInfo->binfo, pResBlock);
69,796✔
1842
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
69,796✔
1843

1844
  pInfo->twAggSup =
69,796✔
1845
      (STimeWindowAggSupp){.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};
69,796✔
1846

1847
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
69,796✔
1848
  QUERY_CHECK_CODE(code, lino, _error);
69,795!
1849

1850
  pInfo->tsSlotId = tsSlotId;
69,795✔
1851
  pInfo->pOperator = pOperator;
69,795✔
1852
  pInfo->cleanGroupResInfo = false;
69,795✔
1853
  pInfo->trueForLimit = pStateNode->trueForLimit;
69,795✔
1854
  setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
69,795✔
1855
                  pTaskInfo);
1856
  pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAggNext, NULL, destroyStateWindowOperatorInfo,
69,796✔
1857
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1858
  setOperatorResetStateFn(pOperator, resetStatewindowOperState);
69,796✔
1859

1860
  code = appendDownstream(pOperator, &downstream, 1);
69,796✔
1861
  if (code != TSDB_CODE_SUCCESS) {
69,796!
1862
    goto _error;
×
1863
  }
1864

1865
  *pOptrInfo = pOperator;
69,796✔
1866
  return TSDB_CODE_SUCCESS;
69,796✔
1867

1868
_error:
×
1869
  if (pInfo != NULL) {
×
1870
    destroyStateWindowOperatorInfo(pInfo);
×
1871
  }
1872

1873
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1874
  pTaskInfo->code = code;
×
1875
  return code;
×
1876
}
1877

1878
void destroySWindowOperatorInfo(void* param) {
105,332✔
1879
  SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param;
105,332✔
1880
  if (pInfo == NULL) {
105,332!
1881
    return;
×
1882
  }
1883

1884
  cleanupBasicInfo(&pInfo->binfo);
105,332✔
1885
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
105,332✔
1886
  if (pInfo->pOperator) {
105,332!
1887
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
105,332✔
1888
                      pInfo->cleanGroupResInfo);
105,332✔
1889
    pInfo->pOperator = NULL;
105,331✔
1890
  }
1891

1892
  cleanupAggSup(&pInfo->aggSup);
105,331✔
1893
  cleanupExprSupp(&pInfo->scalarSupp);
105,331✔
1894

1895
  cleanupGroupResInfo(&pInfo->groupResInfo);
105,331✔
1896
  taosMemoryFreeClear(param);
105,331!
1897
}
1898

1899
static int32_t resetSessionAggOperState(SOperatorInfo* pOper) {
×
1900
  SSessionAggOperatorInfo* pInfo = pOper->info;
×
1901
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
×
1902
  SSessionWinodwPhysiNode* pPhynode = (SSessionWinodwPhysiNode*)pOper->pPhyNode;
×
1903
  pOper->status = OP_NOT_OPENED;
×
1904

1905
  resetBasicOperatorState(&pInfo->binfo);
×
1906
  cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
×
1907
                    pInfo->cleanGroupResInfo);
×
1908

1909
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
×
1910
  int32_t code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
×
1911
  if (code == 0) {
×
1912
    code = resetAggSup(&pOper->exprSupp, &pInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
×
1913
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
×
1914
                       &pTaskInfo->storageAPI.functionStore);
1915
  }
1916
  if (code == 0) {
×
1917
    code = resetExprSupp(&pInfo->scalarSupp, pTaskInfo, pPhynode->window.pExprs, NULL,
×
1918
                         &pTaskInfo->storageAPI.functionStore);
1919
  }
1920

1921
  pInfo->cleanGroupResInfo = false;
×
1922
  pInfo->winSup = (SWindowRowsSup){0};
×
1923
  pInfo->winSup.prevTs = INT64_MIN;
×
1924
  pInfo->reptScan = false;
×
1925

1926
  cleanupGroupResInfo(&pInfo->groupResInfo);
×
1927
  return code;
×
1928
}
1929

1930
int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
105,331✔
1931
                                     SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
1932
  QRY_PARAM_CHECK(pOptrInfo);
105,331!
1933

1934
  int32_t                  code = TSDB_CODE_SUCCESS;
105,331✔
1935
  int32_t                  lino = 0;
105,331✔
1936
  SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
105,331!
1937
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
105,331!
1938
  if (pInfo == NULL || pOperator == NULL) {
105,330!
1939
    code = terrno;
×
1940
    goto _error;
×
1941
  }
1942

1943
  pOperator->pPhyNode = pSessionNode;
105,330✔
1944
  pOperator->exprSupp.hasWindowOrGroup = true;
105,330✔
1945

1946
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
105,330✔
1947
  initResultSizeInfo(&pOperator->resultInfo, 4096);
105,330✔
1948

1949
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc);
105,330✔
1950
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
105,332!
1951
  initBasicInfo(&pInfo->binfo, pResBlock);
105,332✔
1952

1953
  int32_t    numOfCols = 0;
105,331✔
1954
  SExprInfo* pExprInfo = NULL;
105,331✔
1955
  code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
105,331✔
1956
  QUERY_CHECK_CODE(code, lino, _error);
105,332!
1957

1958
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
105,332✔
1959
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
105,332✔
1960
  QUERY_CHECK_CODE(code, lino, _error);
105,332!
1961

1962
  pInfo->twAggSup.waterMark = pSessionNode->window.watermark;
105,332✔
1963
  pInfo->twAggSup.calTrigger = pSessionNode->window.triggerType;
105,332✔
1964
  pInfo->gap = pSessionNode->gap;
105,332✔
1965

1966
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
105,332✔
1967
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
105,332✔
1968
  QUERY_CHECK_CODE(code, lino, _error);
105,332!
1969

1970
  pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
105,332✔
1971
  pInfo->binfo.pRes = pResBlock;
105,332✔
1972
  pInfo->winSup.prevTs = INT64_MIN;
105,332✔
1973
  pInfo->reptScan = false;
105,332✔
1974
  pInfo->binfo.inputTsOrder = pSessionNode->window.node.inputTsOrder;
105,332✔
1975
  pInfo->binfo.outputTsOrder = pSessionNode->window.node.outputTsOrder;
105,332✔
1976

1977
  if (pSessionNode->window.pExprs != NULL) {
105,332✔
1978
    int32_t    numOfScalar = 0;
167✔
1979
    SExprInfo* pScalarExprInfo = NULL;
167✔
1980
    code = createExprInfo(pSessionNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
167✔
1981
    QUERY_CHECK_CODE(code, lino, _error);
167!
1982

1983
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
167✔
1984
    QUERY_CHECK_CODE(code, lino, _error);
167!
1985
  }
1986

1987
  code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
105,332✔
1988
                            pTaskInfo->pStreamRuntimeInfo);
105,332✔
1989
  QUERY_CHECK_CODE(code, lino, _error);
105,332!
1990

1991
  pInfo->pOperator = pOperator;
105,332✔
1992
  pInfo->cleanGroupResInfo = false;
105,332✔
1993
  setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED,
105,332✔
1994
                  pInfo, pTaskInfo);
1995
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSessionWindowAggNext, NULL, destroySWindowOperatorInfo,
105,332✔
1996
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1997
  pOperator->pTaskInfo = pTaskInfo;
105,332✔
1998
  setOperatorResetStateFn(pOperator, resetSessionAggOperState);
105,332✔
1999

2000
  code = appendDownstream(pOperator, &downstream, 1);
105,332✔
2001
  QUERY_CHECK_CODE(code, lino, _error);
105,332!
2002

2003
  *pOptrInfo = pOperator;
105,332✔
2004
  return TSDB_CODE_SUCCESS;
105,332✔
2005

2006
_error:
×
2007
  if (pInfo != NULL) destroySWindowOperatorInfo(pInfo);
×
2008
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2009
  pTaskInfo->code = code;
×
2010
  return code;
×
2011
}
2012

2013
void destroyMAIOperatorInfo(void* param) {
534,603✔
2014
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = (SMergeAlignedIntervalAggOperatorInfo*)param;
534,603✔
2015
  destroyIntervalOperatorInfo(miaInfo->intervalAggOperatorInfo);
534,603✔
2016
  taosMemoryFreeClear(param);
534,608!
2017
}
534,607✔
2018

2019
static SResultRow* doSetSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, SAggSupporter* pSup) {
339,902✔
2020
  SResultRow* pResult = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
339,902✔
2021
  if (NULL == pResult) {
339,901!
2022
    return pResult;
×
2023
  }
2024
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
339,901✔
2025
  return pResult;
339,901✔
2026
}
2027

2028
static int32_t setSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult,
61,871,400✔
2029
                                       SExprSupp* pExprSup, SAggSupporter* pAggSup) {
2030
  if (*pResult == NULL) {
61,871,400✔
2031
    *pResult = doSetSingleOutputTupleBuf(pResultRowInfo, pAggSup);
339,902✔
2032
    if (*pResult == NULL) {
339,900!
2033
      return terrno;
×
2034
    }
2035
  }
2036

2037
  // set time window for current result
2038
  (*pResult)->win = (*win);
61,871,398✔
2039
  return setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
61,871,398✔
2040
}
2041

2042
static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,
404,615✔
2043
                                          SSDataBlock* pBlock, SSDataBlock* pResultBlock) {
2044
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
404,615✔
2045
  SIntervalAggOperatorInfo*             iaInfo = miaInfo->intervalAggOperatorInfo;
404,615✔
2046

2047
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
404,615✔
2048
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
404,615✔
2049
  SInterval*     pInterval = &iaInfo->interval;
404,615✔
2050

2051
  int32_t  startPos = 0;
404,615✔
2052
  int64_t* tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
404,615✔
2053

2054
  TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
404,612✔
2055

2056
  // there is an result exists
2057
  if (miaInfo->curTs != INT64_MIN) {
404,621✔
2058
    if (ts != miaInfo->curTs) {
52,464✔
2059
      finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
30,167✔
2060
      resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
30,167✔
2061
      miaInfo->curTs = ts;
30,167✔
2062
    }
2063
  } else {
2064
    miaInfo->curTs = ts;
352,157✔
2065
  }
2066

2067
  STimeWindow win = {0};
404,621✔
2068
  win.skey = miaInfo->curTs;
404,621✔
2069
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
404,621✔
2070

2071
  int32_t ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
404,621✔
2072
  if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
404,616!
2073
    T_LONG_JMP(pTaskInfo->env, ret);
1!
2074
  }
2075

2076
  int32_t currPos = startPos;
404,615✔
2077

2078
  STimeWindow currWin = win;
404,615✔
2079
  while (++currPos < pBlock->info.rows) {
96,784,675✔
2080
    if (tsCols[currPos] == miaInfo->curTs) {
96,377,732✔
2081
      continue;
34,920,247✔
2082
    }
2083

2084
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
61,457,485✔
2085
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
61,455,401✔
2086
                                          currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
61,455,401✔
2087
    if (ret != TSDB_CODE_SUCCESS) {
61,467,793!
2088
      T_LONG_JMP(pTaskInfo->env, ret);
×
2089
    }
2090

2091
    finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
61,467,793✔
2092
    resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
61,384,988✔
2093
    miaInfo->curTs = tsCols[currPos];
61,434,741✔
2094

2095
    currWin.skey = miaInfo->curTs;
61,434,741✔
2096
    currWin.ekey =
61,457,436✔
2097
        taosTimeAdd(currWin.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
61,434,741✔
2098

2099
    startPos = currPos;
61,457,436✔
2100
    ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
61,457,436✔
2101
    if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
61,459,443!
2102
      T_LONG_JMP(pTaskInfo->env, ret);
×
2103
    }
2104

2105
    miaInfo->curTs = currWin.skey;
61,459,813✔
2106
  }
2107

2108
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
406,943✔
2109
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
404,619✔
2110
                                        currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
404,619✔
2111
  if (ret != TSDB_CODE_SUCCESS) {
404,618!
2112
    T_LONG_JMP(pTaskInfo->env, ret);
×
2113
  }
2114
}
404,618✔
2115

2116
static void cleanupAfterGroupResultGen(SMergeAlignedIntervalAggOperatorInfo* pMiaInfo, SSDataBlock* pRes) {
351,845✔
2117
  pRes->info.id.groupId = pMiaInfo->groupId;
351,845✔
2118
  pMiaInfo->curTs = INT64_MIN;
351,845✔
2119
  pMiaInfo->groupId = 0;
351,845✔
2120
}
351,845✔
2121

2122
static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
612,737✔
2123
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
612,737✔
2124
  int32_t                               code = TSDB_CODE_SUCCESS;
612,737✔
2125
  int32_t                               lino = 0;
612,737✔
2126
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
612,737✔
2127
  SIntervalAggOperatorInfo*             pIaInfo = pMiaInfo->intervalAggOperatorInfo;
612,737✔
2128

2129
  SExprSupp*      pSup = &pOperator->exprSupp;
612,737✔
2130
  SSDataBlock*    pRes = pIaInfo->binfo.pRes;
612,737✔
2131
  SResultRowInfo* pResultRowInfo = &pIaInfo->binfo.resultRowInfo;
612,737✔
2132
  SOperatorInfo*  downstream = pOperator->pDownstream[0];
612,737✔
2133

2134
  while (1) {
338,390✔
2135
    SSDataBlock* pBlock = NULL;
951,127✔
2136
    if (pMiaInfo->prefetchedBlock == NULL) {
951,127✔
2137
      pBlock = getNextBlockFromDownstream(pOperator, 0);
938,873✔
2138
    } else {
2139
      pBlock = pMiaInfo->prefetchedBlock;
12,254✔
2140
      pMiaInfo->prefetchedBlock = NULL;
12,254✔
2141

2142
      pMiaInfo->groupId = pBlock->info.id.groupId;
12,254✔
2143
    }
2144

2145
    // no data exists, all query processing is done
2146
    if (pBlock == NULL) {
951,121✔
2147
      // close last unclosed time window
2148
      if (pMiaInfo->curTs != INT64_MIN) {
534,258✔
2149
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
339,592✔
2150
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
339,590✔
2151
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
339,591✔
2152
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
339,591✔
2153
        QUERY_CHECK_CODE(code, lino, _end);
339,591!
2154
      }
2155

2156
      setOperatorCompleted(pOperator);
534,257✔
2157
      break;
534,263✔
2158
    }
2159

2160
    if (pMiaInfo->groupId == 0) {
416,863✔
2161
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
390,485✔
2162
        pMiaInfo->groupId = pBlock->info.id.groupId;
662✔
2163
        pRes->info.id.groupId = pMiaInfo->groupId;
662✔
2164
      }
2165
    } else {
2166
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
26,378✔
2167
        // if there are unclosed time window, close it firstly.
2168
        if (pMiaInfo->curTs == INT64_MIN) {
12,254!
2169
          pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2170
          T_LONG_JMP(pTaskInfo->env, terrno);
×
2171
        }
2172
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
12,254✔
2173
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
12,254✔
2174

2175
        pMiaInfo->prefetchedBlock = pBlock;
12,254✔
2176
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
12,254✔
2177
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
12,254✔
2178
        QUERY_CHECK_CODE(code, lino, _end);
12,254!
2179
        if (pRes->info.rows == 0) {
12,254✔
2180
          // After filtering for last group, the result is empty, so we need to continue to process next group
2181
          continue;
51✔
2182
        } else {
2183
          break;
12,203✔
2184
        }
2185
      } else {
2186
        // continue
2187
        pRes->info.id.groupId = pMiaInfo->groupId;
14,124✔
2188
      }
2189
    }
2190

2191
    pRes->info.scanFlag = pBlock->info.scanFlag;
404,609✔
2192
    code = setInputDataBlock(pSup, pBlock, pIaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
404,609✔
2193
    QUERY_CHECK_CODE(code, lino, _end);
404,615!
2194

2195
    doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes);
404,615✔
2196

2197
    code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
404,618✔
2198
    QUERY_CHECK_CODE(code, lino, _end);
404,633!
2199

2200
    if (pRes->info.rows >= pOperator->resultInfo.capacity) {
404,633✔
2201
      break;
66,294✔
2202
    }
2203
  }
2204

2205
_end:
612,760✔
2206
  if (code != TSDB_CODE_SUCCESS) {
612,760!
2207
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2208
    pTaskInfo->code = code;
×
2209
    T_LONG_JMP(pTaskInfo->env, code);
×
2210
  }
2211
}
612,760✔
2212

2213
static int32_t mergeAlignedIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,229,159✔
2214
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
1,229,159✔
2215
  int32_t                               code = TSDB_CODE_SUCCESS;
1,229,159✔
2216
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
1,229,159✔
2217
  SIntervalAggOperatorInfo*             iaInfo = pMiaInfo->intervalAggOperatorInfo;
1,229,159✔
2218
  if (pOperator->status == OP_EXEC_DONE) {
1,229,159✔
2219
    (*ppRes) = NULL;
619,091✔
2220
    return code;
619,091✔
2221
  }
2222

2223
  SSDataBlock* pRes = iaInfo->binfo.pRes;
610,068✔
2224
  blockDataCleanup(pRes);
610,068✔
2225

2226
  if (iaInfo->binfo.mergeResultBlock) {
610,074✔
2227
    while (1) {
2228
      if (pOperator->status == OP_EXEC_DONE) {
531,077✔
2229
        break;
202,054✔
2230
      }
2231

2232
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
329,023✔
2233
        break;
62,151✔
2234
      }
2235

2236
      doMergeAlignedIntervalAgg(pOperator);
266,872✔
2237
    }
2238
  } else {
2239
    doMergeAlignedIntervalAgg(pOperator);
345,871✔
2240
  }
2241

2242
  size_t rows = pRes->info.rows;
610,092✔
2243
  pOperator->resultInfo.totalRows += rows;
610,092✔
2244
  (*ppRes) = (rows == 0) ? NULL : pRes;
610,092✔
2245
  return code;
610,092✔
2246
}
2247

2248
static int32_t resetMergeAlignedIntervalOperState(SOperatorInfo* pOper) {
×
2249
  SMergeAlignedIntervalAggOperatorInfo* pInfo = pOper->info;
×
2250
  
2251
  uint64_t     groupId;  // current groupId
2252
  int64_t      curTs;    // current ts
2253
  SSDataBlock* prefetchedBlock;
2254
  SResultRow*  pResultRow;
2255

2256
  pInfo->groupId = 0;
×
2257
  pInfo->curTs = INT64_MIN;
×
2258
  pInfo->prefetchedBlock = NULL;
×
2259
  pInfo->pResultRow = NULL;
×
2260

2261
  return resetInterval(pOper, pInfo->intervalAggOperatorInfo);
×
2262
}
2263

2264
int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
534,597✔
2265
                                               SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2266
  QRY_PARAM_CHECK(pOptrInfo);
534,597!
2267

2268
  int32_t                               code = TSDB_CODE_SUCCESS;
534,597✔
2269
  int32_t                               lino = 0;
534,597✔
2270
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedIntervalAggOperatorInfo));
534,597!
2271
  SOperatorInfo*                        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
534,603!
2272
  if (miaInfo == NULL || pOperator == NULL) {
534,599!
2273
    code = terrno;
×
2274
    goto _error;
×
2275
  }
2276

2277
  pOperator->pPhyNode = pNode;
534,599✔
2278
  miaInfo->intervalAggOperatorInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
534,599!
2279
  if (miaInfo->intervalAggOperatorInfo == NULL) {
534,600!
2280
    code = terrno;
×
2281
    goto _error;
×
2282
  }
2283

2284
  SInterval interval = {.interval = pNode->interval,
534,600✔
2285
                        .sliding = pNode->sliding,
534,600✔
2286
                        .intervalUnit = pNode->intervalUnit,
534,600✔
2287
                        .slidingUnit = pNode->slidingUnit,
534,600✔
2288
                        .offset = pNode->offset,
534,600✔
2289
                        .precision = ((SColumnNode*)pNode->window.pTspk)->node.resType.precision,
534,600✔
2290
                        .timeRange = pNode->timeRange};
2291
  calcIntervalAutoOffset(&interval);
534,600✔
2292

2293
  SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
534,604✔
2294
  SExprSupp*                pSup = &pOperator->exprSupp;
534,604✔
2295
  pSup->hasWindowOrGroup = true;
534,604✔
2296

2297
  code = filterInitFromNode((SNode*)pNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
534,604✔
2298
                            pTaskInfo->pStreamRuntimeInfo);
534,604✔
2299
  QUERY_CHECK_CODE(code, lino, _error);
534,602!
2300

2301
  miaInfo->curTs = INT64_MIN;
534,602✔
2302
  iaInfo->win = pTaskInfo->window;
534,602✔
2303
  iaInfo->binfo.inputTsOrder = pNode->window.node.inputTsOrder;
534,602✔
2304
  iaInfo->binfo.outputTsOrder = pNode->window.node.outputTsOrder;
534,602✔
2305
  iaInfo->interval = interval;
534,602✔
2306
  iaInfo->primaryTsIndex = ((SColumnNode*)pNode->window.pTspk)->slotId;
534,602✔
2307
  iaInfo->binfo.mergeResultBlock = pNode->window.mergeDataBlock;
534,602✔
2308

2309
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
534,602✔
2310
  initResultSizeInfo(&pOperator->resultInfo, 512);
534,602✔
2311

2312
  int32_t    num = 0;
534,601✔
2313
  SExprInfo* pExprInfo = NULL;
534,601✔
2314
  code = createExprInfo(pNode->window.pFuncs, NULL, &pExprInfo, &num);
534,601✔
2315
  QUERY_CHECK_CODE(code, lino, _error);
534,603!
2316

2317
  code = initAggSup(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
534,603✔
2318
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
534,603✔
2319
  QUERY_CHECK_CODE(code, lino, _error);
534,602!
2320

2321
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pNode->window.node.pOutputDataBlockDesc);
534,602✔
2322
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
534,605!
2323
  initBasicInfo(&iaInfo->binfo, pResBlock);
534,605✔
2324
  code = initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
534,600✔
2325
  QUERY_CHECK_CODE(code, lino, _error);
534,604!
2326

2327
  iaInfo->timeWindowInterpo = false;
534,604✔
2328
  code = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo, &iaInfo->timeWindowInterpo);
534,604✔
2329
  QUERY_CHECK_CODE(code, lino, _error);
534,599!
2330
  if (iaInfo->timeWindowInterpo) {
534,599!
2331
    iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
2332
  }
2333

2334
  initResultRowInfo(&iaInfo->binfo.resultRowInfo);
534,599✔
2335
  code = blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity);
534,593✔
2336
  QUERY_CHECK_CODE(code, lino, _error);
534,605!
2337
  iaInfo->pOperator = pOperator;
534,605✔
2338
  setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL,
534,605✔
2339
                  false, OP_NOT_OPENED, miaInfo, pTaskInfo);
2340

2341
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAggNext, NULL, destroyMAIOperatorInfo,
534,604✔
2342
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2343
  setOperatorResetStateFn(pOperator, resetMergeAlignedIntervalOperState);
534,603✔
2344

2345
  code = appendDownstream(pOperator, &downstream, 1);
534,605✔
2346
  QUERY_CHECK_CODE(code, lino, _error);
534,605!
2347

2348
  *pOptrInfo = pOperator;
534,605✔
2349
  return TSDB_CODE_SUCCESS;
534,605✔
2350

2351
_error:
×
2352
  if (miaInfo != NULL) destroyMAIOperatorInfo(miaInfo);
×
2353
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2354
  pTaskInfo->code = code;
×
2355
  return code;
×
2356
}
2357

2358
//=====================================================================================================================
2359
// merge interval operator
2360
typedef struct SMergeIntervalAggOperatorInfo {
2361
  SIntervalAggOperatorInfo intervalAggOperatorInfo;
2362
  SList*                   groupIntervals;
2363
  SListIter                groupIntervalsIter;
2364
  bool                     hasGroupId;
2365
  uint64_t                 groupId;
2366
  SSDataBlock*             prefetchedBlock;
2367
  bool                     inputBlocksFinished;
2368
} SMergeIntervalAggOperatorInfo;
2369

2370
typedef struct SGroupTimeWindow {
2371
  uint64_t    groupId;
2372
  STimeWindow window;
2373
} SGroupTimeWindow;
2374

2375
void destroyMergeIntervalOperatorInfo(void* param) {
×
2376
  SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
×
2377
  miaInfo->groupIntervals = tdListFree(miaInfo->groupIntervals);
×
2378
  destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo);
×
2379

2380
  taosMemoryFreeClear(param);
×
2381
}
×
2382

2383
static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock,
×
2384
                                        STimeWindow* newWin) {
2385
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
2386
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2387
  bool                           ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
2388

2389
  SGroupTimeWindow groupTimeWindow = {.groupId = tableGroupId, .window = *newWin};
×
2390
  int32_t          code = tdListAppend(miaInfo->groupIntervals, &groupTimeWindow);
×
2391
  if (code != TSDB_CODE_SUCCESS) {
×
2392
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2393
    return code;
×
2394
  }
2395

2396
  SListIter iter = {0};
×
2397
  tdListInitIter(miaInfo->groupIntervals, &iter, TD_LIST_FORWARD);
×
2398
  SListNode* listNode = NULL;
×
2399
  while ((listNode = tdListNext(&iter)) != NULL) {
×
2400
    SGroupTimeWindow* prevGrpWin = (SGroupTimeWindow*)listNode->data;
×
2401
    if (prevGrpWin->groupId != tableGroupId) {
×
2402
      continue;
×
2403
    }
2404

2405
    STimeWindow* prevWin = &prevGrpWin->window;
×
2406
    if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) {
×
2407
      SListNode* tmp = tdListPopNode(miaInfo->groupIntervals, listNode);
×
2408
      taosMemoryFreeClear(tmp);
×
2409
    }
2410
  }
2411

2412
  return TSDB_CODE_SUCCESS;
×
2413
}
2414

2415
static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
×
2416
                                   int32_t scanFlag, SSDataBlock* pResultBlock) {
2417
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
2418
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2419

2420
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
×
2421
  SExprSupp*     pExprSup = &pOperatorInfo->exprSupp;
×
2422

2423
  int32_t     startPos = 0;
×
2424
  int32_t     numOfOutput = pExprSup->numOfExprs;
×
2425
  int64_t*    tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
×
2426
  uint64_t    tableGroupId = pBlock->info.id.groupId;
×
2427
  bool        ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
2428
  TSKEY       blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
×
2429
  SResultRow* pResult = NULL;
×
2430

2431
  STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
×
2432
                                        iaInfo->binfo.inputTsOrder);
2433

2434
  int32_t ret =
2435
      setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
2436
                             numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2437
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
×
2438
    T_LONG_JMP(pTaskInfo->env, ret);
×
2439
  }
2440

2441
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
×
2442
  int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
2443
                                                 iaInfo->binfo.inputTsOrder);
2444
  if (forwardRows <= 0) {
×
2445
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2446
  }
2447

2448
  // prev time window not interpolation yet.
2449
  if (iaInfo->timeWindowInterpo) {
×
2450
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
×
2451
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
×
2452

2453
    // restore current time window
2454
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
2455
                                 numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2456
    if (ret != TSDB_CODE_SUCCESS) {
×
2457
      T_LONG_JMP(pTaskInfo->env, ret);
×
2458
    }
2459

2460
    // window start key interpolation
2461
    ret = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &win, startPos, forwardRows, pExprSup);
×
2462
    if (ret != TSDB_CODE_SUCCESS) {
×
2463
      T_LONG_JMP(pTaskInfo->env, ret);
×
2464
    }
2465
  }
2466

2467
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, 1);
×
2468
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
×
2469
                                        forwardRows, pBlock->info.rows, numOfOutput);
×
2470
  if (ret != TSDB_CODE_SUCCESS) {
×
2471
    T_LONG_JMP(pTaskInfo->env, ret);
×
2472
  }
2473
  doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
2474

2475
  // output previous interval results after this interval (&win) is closed
2476
  int32_t code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &win);
×
2477
  if (code != TSDB_CODE_SUCCESS) {
×
2478
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2479
    T_LONG_JMP(pTaskInfo->env, code);
×
2480
  }
2481

2482
  STimeWindow nextWin = win;
×
2483
  while (1) {
×
2484
    int32_t prevEndPos = forwardRows - 1 + startPos;
×
2485
    startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos,
×
2486
                                      iaInfo->binfo.inputTsOrder);
2487
    if (startPos < 0) {
×
2488
      break;
×
2489
    }
2490

2491
    // null data, failed to allocate more memory buffer
2492
    code =
2493
        setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
×
2494
                               pExprSup->pCtx, numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2495
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
×
2496
      T_LONG_JMP(pTaskInfo->env, code);
×
2497
    }
2498

2499
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
×
2500
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
2501
                                           iaInfo->binfo.inputTsOrder);
2502

2503
    // window start(end) key interpolation
2504
    code = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup);
×
2505
    if (code != TSDB_CODE_SUCCESS) {
×
2506
      T_LONG_JMP(pTaskInfo->env, code);
×
2507
    }
2508

2509
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, 1);
×
2510
    code = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
×
2511
                                           forwardRows, pBlock->info.rows, numOfOutput);
×
2512
    if (code != TSDB_CODE_SUCCESS) {
×
2513
      T_LONG_JMP(pTaskInfo->env, code);
×
2514
    }
2515
    doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
2516

2517
    // output previous interval results after this interval (&nextWin) is closed
2518
    code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &nextWin);
×
2519
    if (code != TSDB_CODE_SUCCESS) {
×
2520
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2521
      T_LONG_JMP(pTaskInfo->env, code);
×
2522
    }
2523
  }
2524

2525
  if (iaInfo->timeWindowInterpo) {
×
2526
    saveDataBlockLastRow(iaInfo->pPrevValues, pBlock, iaInfo->pInterpCols);
×
2527
  }
2528
}
×
2529

2530
static int32_t doMergeIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
2531
  int32_t        code = TSDB_CODE_SUCCESS;
×
2532
  int32_t        lino = 0;
×
2533
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
×
2534

2535
  SMergeIntervalAggOperatorInfo* miaInfo = pOperator->info;
×
2536
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2537
  SExprSupp*                     pExpSupp = &pOperator->exprSupp;
×
2538

2539
  if (pOperator->status == OP_EXEC_DONE) {
×
2540
    (*ppRes) = NULL;
×
2541
    return code;
×
2542
  }
2543

2544
  SSDataBlock* pRes = iaInfo->binfo.pRes;
×
2545
  blockDataCleanup(pRes);
×
2546
  code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
×
2547
  QUERY_CHECK_CODE(code, lino, _end);
×
2548

2549
  if (!miaInfo->inputBlocksFinished) {
×
2550
    SOperatorInfo* downstream = pOperator->pDownstream[0];
×
2551
    while (1) {
×
2552
      SSDataBlock* pBlock = NULL;
×
2553
      if (miaInfo->prefetchedBlock == NULL) {
×
2554
        pBlock = getNextBlockFromDownstream(pOperator, 0);
×
2555
      } else {
2556
        pBlock = miaInfo->prefetchedBlock;
×
2557
        miaInfo->groupId = pBlock->info.id.groupId;
×
2558
        miaInfo->prefetchedBlock = NULL;
×
2559
      }
2560

2561
      if (pBlock == NULL) {
×
2562
        tdListInitIter(miaInfo->groupIntervals, &miaInfo->groupIntervalsIter, TD_LIST_FORWARD);
×
2563
        miaInfo->inputBlocksFinished = true;
×
2564
        break;
×
2565
      }
2566

2567
      if (!miaInfo->hasGroupId) {
×
2568
        miaInfo->hasGroupId = true;
×
2569
        miaInfo->groupId = pBlock->info.id.groupId;
×
2570
      } else if (miaInfo->groupId != pBlock->info.id.groupId) {
×
2571
        miaInfo->prefetchedBlock = pBlock;
×
2572
        break;
×
2573
      }
2574

2575
      pRes->info.scanFlag = pBlock->info.scanFlag;
×
2576
      code = setInputDataBlock(pExpSupp, pBlock, iaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
×
2577
      QUERY_CHECK_CODE(code, lino, _end);
×
2578

2579
      doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, pBlock->info.scanFlag, pRes);
×
2580

2581
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
×
2582
        break;
×
2583
      }
2584
    }
2585

2586
    pRes->info.id.groupId = miaInfo->groupId;
×
2587
  }
2588

2589
  if (miaInfo->inputBlocksFinished) {
×
2590
    SListNode* listNode = tdListNext(&miaInfo->groupIntervalsIter);
×
2591

2592
    if (listNode != NULL) {
×
2593
      SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data);
×
2594
      pRes->info.id.groupId = grpWin->groupId;
×
2595
    }
2596
  }
2597

2598
  if (pRes->info.rows == 0) {
×
2599
    setOperatorCompleted(pOperator);
×
2600
  }
2601

2602
  size_t rows = pRes->info.rows;
×
2603
  pOperator->resultInfo.totalRows += rows;
×
2604

2605
_end:
×
2606
  if (code != TSDB_CODE_SUCCESS) {
×
2607
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2608
    pTaskInfo->code = code;
×
2609
    T_LONG_JMP(pTaskInfo->env, code);
×
2610
  }
2611
  (*ppRes) = (rows == 0) ? NULL : pRes;
×
2612
  return code;
×
2613
}
2614

2615
static int32_t resetMergeIntervalOperState(SOperatorInfo* pOper) {
×
2616
  SMergeIntervalAggOperatorInfo* pInfo = pOper->info;
×
2617

2618
  pInfo->hasGroupId = false;
×
2619
  pInfo->groupId = 0;
×
2620
  pInfo->prefetchedBlock = NULL;
×
2621
  pInfo->inputBlocksFinished = false;
×
2622
  tdListEmpty(pInfo->groupIntervals);
×
2623
  
2624
  SIntervalAggOperatorInfo* pIntervalInfo = &pInfo->intervalAggOperatorInfo;
×
2625
  return resetInterval(pOper, pIntervalInfo);
×
2626
}
2627

2628
int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
×
2629
                                        SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2630
  QRY_PARAM_CHECK(pOptrInfo);
×
2631

2632
  int32_t                        code = TSDB_CODE_SUCCESS;
×
2633
  int32_t                        lino = 0;
×
2634
  SMergeIntervalAggOperatorInfo* pMergeIntervalInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo));
×
2635
  SOperatorInfo*                 pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
2636
  if (pMergeIntervalInfo == NULL || pOperator == NULL) {
×
2637
    code = terrno;
×
2638
    goto _error;
×
2639
  }
2640

2641
  pOperator->pPhyNode = pIntervalPhyNode;
×
2642
  SInterval interval = {.interval = pIntervalPhyNode->interval,
×
2643
                        .sliding = pIntervalPhyNode->sliding,
×
2644
                        .intervalUnit = pIntervalPhyNode->intervalUnit,
×
2645
                        .slidingUnit = pIntervalPhyNode->slidingUnit,
×
2646
                        .offset = pIntervalPhyNode->offset,
×
2647
                        .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision,
×
2648
                        .timeRange = pIntervalPhyNode->timeRange};
2649
  calcIntervalAutoOffset(&interval);
×
2650

2651
  pMergeIntervalInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow));
×
2652

2653
  SIntervalAggOperatorInfo* pIntervalInfo = &pMergeIntervalInfo->intervalAggOperatorInfo;
×
2654
  pIntervalInfo->win = pTaskInfo->window;
×
2655
  pIntervalInfo->binfo.inputTsOrder = pIntervalPhyNode->window.node.inputTsOrder;
×
2656
  pIntervalInfo->interval = interval;
×
2657
  pIntervalInfo->binfo.mergeResultBlock = pIntervalPhyNode->window.mergeDataBlock;
×
2658
  pIntervalInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
×
2659
  pIntervalInfo->binfo.outputTsOrder = pIntervalPhyNode->window.node.outputTsOrder;
×
2660

2661
  SExprSupp* pExprSupp = &pOperator->exprSupp;
×
2662
  pExprSupp->hasWindowOrGroup = true;
×
2663

2664
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
×
2665
  initResultSizeInfo(&pOperator->resultInfo, 4096);
×
2666

2667
  int32_t    num = 0;
×
2668
  SExprInfo* pExprInfo = NULL;
×
2669
  code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
×
2670
  QUERY_CHECK_CODE(code, lino, _error);
×
2671

2672
  code = initAggSup(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
×
2673
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
×
2674
  if (code != TSDB_CODE_SUCCESS) {
×
2675
    goto _error;
×
2676
  }
2677

2678
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pIntervalPhyNode->window.node.pOutputDataBlockDesc);
×
2679
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
×
2680
  initBasicInfo(&pIntervalInfo->binfo, pResBlock);
×
2681
  code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win);
×
2682
  QUERY_CHECK_CODE(code, lino, _error);
×
2683

2684
  pIntervalInfo->timeWindowInterpo = false;
×
2685
  code = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo, &pIntervalInfo->timeWindowInterpo);
×
2686
  QUERY_CHECK_CODE(code, lino, _error);
×
2687
  if (pIntervalInfo->timeWindowInterpo) {
×
2688
    pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
2689
    if (pIntervalInfo->binfo.resultRowInfo.openWindow == NULL) {
×
2690
      goto _error;
×
2691
    }
2692
  }
2693

2694
  pIntervalInfo->pOperator = pOperator;
×
2695
  initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo);
×
2696
  setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false,
×
2697
                  OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo);
2698
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAggNext, NULL, destroyMergeIntervalOperatorInfo,
×
2699
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2700
  setOperatorResetStateFn(pOperator, resetMergeIntervalOperState);
×
2701

2702
  code = appendDownstream(pOperator, &downstream, 1);
×
2703
  if (code != TSDB_CODE_SUCCESS) {
×
2704
    goto _error;
×
2705
  }
2706

2707
  *pOptrInfo = pOperator;
×
2708
  return TSDB_CODE_SUCCESS;
×
2709
_error:
×
2710
  if (pMergeIntervalInfo != NULL) {
×
2711
    destroyMergeIntervalOperatorInfo(pMergeIntervalInfo);
×
2712
  }
2713
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2714
  pTaskInfo->code = code;
×
2715
  return code;
×
2716
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc