• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3543

29 Nov 2024 02:58AM UTC coverage: 60.842% (+0.02%) from 60.819%
#3543

push

travis-ci

web-flow
Merge pull request #28973 from taosdata/merge/mainto3.0

merge: from main to 3.0

120460 of 253224 branches covered (47.57%)

Branch coverage included in aggregate %.

706 of 908 new or added lines in 18 files covered. (77.75%)

2401 existing lines in 137 files now uncovered.

201633 of 276172 relevant lines covered (73.01%)

19045673.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.91
/source/libs/executor/src/timewindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "executorInt.h"
16
#include "filter.h"
17
#include "function.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "querytask.h"
21
#include "tchecksum.h"
22
#include "tcommon.h"
23
#include "tcompare.h"
24
#include "tdatablock.h"
25
#include "tfill.h"
26
#include "tglobal.h"
27
#include "tlog.h"
28
#include "ttime.h"
29

30
typedef struct SSessionAggOperatorInfo {
31
  SOptrBasicInfo     binfo;
32
  SAggSupporter      aggSup;
33
  SExprSupp          scalarSupp;  // supporter for perform scalar function
34
  SGroupResInfo      groupResInfo;
35
  SWindowRowsSup     winSup;
36
  bool               reptScan;  // next round scan
37
  int64_t            gap;       // session window gap
38
  int32_t            tsSlotId;  // primary timestamp slot id
39
  STimeWindowAggSupp twAggSup;
40
  SOperatorInfo*     pOperator;
41
  bool               cleanGroupResInfo;
42
} SSessionAggOperatorInfo;
43

44
typedef struct SStateWindowOperatorInfo {
45
  SOptrBasicInfo     binfo;
46
  SAggSupporter      aggSup;
47
  SExprSupp          scalarSup;
48
  SGroupResInfo      groupResInfo;
49
  SWindowRowsSup     winSup;
50
  SColumn            stateCol;  // start row index
51
  bool               hasKey;
52
  SStateKeys         stateKey;
53
  int32_t            tsSlotId;  // primary timestamp column slot id
54
  STimeWindowAggSupp twAggSup;
55
  SOperatorInfo*     pOperator;
56
  bool               cleanGroupResInfo;
57
} SStateWindowOperatorInfo;
58

59
typedef enum SResultTsInterpType {
60
  RESULT_ROW_START_INTERP = 1,
61
  RESULT_ROW_END_INTERP = 2,
62
} SResultTsInterpType;
63

64
typedef struct SOpenWindowInfo {
65
  SResultRowPosition pos;
66
  uint64_t           groupId;
67
} SOpenWindowInfo;
68

69
static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo);
70

71
static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult,
72
                                              uint64_t groupId, SExecTaskInfo* pTaskInfo);
73
static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult);
74

75
static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, bool masterscan,
267,671,961✔
76
                                      SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
77
                                      int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
78
                                      SExecTaskInfo* pTaskInfo) {
79
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
267,671,961✔
80
                                                  masterscan, tableGroupId, pTaskInfo, true, pAggSup, true);
81

82
  if (pResultRow == NULL || pTaskInfo->code != 0) {
268,605,675!
83
    *pResult = NULL;
×
84
    qError("failed to set result output buffer, error:%s", tstrerror(pTaskInfo->code));
×
UNCOV
85
    return pTaskInfo->code;
×
86
  }
87

88
  // set time window for current result
89
  pResultRow->win = (*win);
268,703,390✔
90

91
  *pResult = pResultRow;
268,703,390✔
92
  return setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
268,703,390✔
93
}
94

95
void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) {
135,561,298✔
96
  pRowSup->win.ekey = ts;
135,561,298✔
97
  pRowSup->prevTs = ts;
135,561,298✔
98
  pRowSup->numOfRows += 1;
135,561,298✔
99
  pRowSup->groupId = groupId;
135,561,298✔
100
}
135,561,298✔
101

102
void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex,
112,269,357✔
103
                                     uint64_t groupId) {
104
  pRowSup->startRowIndex = rowIndex;
112,269,357✔
105
  pRowSup->numOfRows = 0;
112,269,357✔
106
  pRowSup->win.skey = tsList[rowIndex];
112,269,357✔
107
  pRowSup->groupId = groupId;
112,269,357✔
108
}
112,269,357✔
109

UNCOV
110
FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos,
×
111
                                            int32_t order, int64_t* pData) {
112
  int32_t forwardRows = 0;
251,610,959✔
113

UNCOV
114
  if (order == TSDB_ORDER_ASC) {
×
115
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
185,451,551✔
116
    if (end >= 0) {
187,460,813!
117
      forwardRows = end;
187,517,867✔
118

119
      while (pData[end + pos] == ekey) {
189,353,474!
120
        forwardRows += 1;
1,835,607✔
121
        ++pos;
1,835,607✔
122
      }
123
    }
124
  } else {
125
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
66,159,408✔
126
    if (end >= 0) {
66,824,056!
127
      forwardRows = end;
66,838,785✔
128

129
      while (pData[end + pos] == ekey) {
84,818,929!
130
        forwardRows += 1;
17,980,144✔
131
        ++pos;
17,980,144✔
132
      }
133
    }
134
    //    int32_t end = searchFn((char*)pData, pos + 1, ekey, order);
135
    //    if (end >= 0) {
136
    //      forwardRows = pos - end;
137
    //
138
    //      if (pData[end] == ekey) {
139
    //        forwardRows += 1;
140
    //      }
141
    //    }
142
  }
143

144
  return forwardRows;
254,284,869✔
145
}
146

147
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
376,361,609✔
148
  int32_t midPos = -1;
376,361,609✔
149
  int32_t numOfRows;
150

151
  if (num <= 0) {
376,361,609!
UNCOV
152
    return -1;
×
153
  }
154

155
  TSKEY*  keyList = (TSKEY*)pValue;
376,361,609✔
156
  int32_t firstPos = 0;
376,361,609✔
157
  int32_t lastPos = num - 1;
376,361,609✔
158

159
  if (order == TSDB_ORDER_DESC) {
376,361,609✔
160
    // find the first position which is smaller than the key
161
    while (1) {
162
      if (key >= keyList[firstPos]) return firstPos;
606,379,286✔
163
      if (key == keyList[lastPos]) return lastPos;
563,277,159✔
164

165
      if (key < keyList[lastPos]) {
562,594,622✔
166
        lastPos += 1;
65,131,539✔
167
        if (lastPos >= num) {
65,131,539!
UNCOV
168
          return -1;
×
169
        } else {
170
          return lastPos;
65,131,539✔
171
        }
172
      }
173

174
      numOfRows = lastPos - firstPos + 1;
497,463,083✔
175
      midPos = (numOfRows >> 1) + firstPos;
497,463,083✔
176

177
      if (key < keyList[midPos]) {
497,463,083✔
178
        firstPos = midPos + 1;
119,071,297✔
179
      } else if (key > keyList[midPos]) {
378,391,786!
180
        lastPos = midPos - 1;
379,840,887✔
181
      } else {
UNCOV
182
        break;
×
183
      }
184
    }
185

186
  } else {
187
    // find the first position which is bigger than the key
188
    while (1) {
189
      if (key <= keyList[firstPos]) return firstPos;
2,036,100,996✔
190
      if (key == keyList[lastPos]) return lastPos;
1,957,989,154✔
191

192
      if (key > keyList[lastPos]) {
1,956,374,656✔
193
        lastPos = lastPos + 1;
188,831,657✔
194
        if (lastPos >= num)
188,831,657!
UNCOV
195
          return -1;
×
196
        else
197
          return lastPos;
188,831,657✔
198
      }
199

200
      numOfRows = lastPos - firstPos + 1;
1,767,542,999✔
201
      midPos = (numOfRows >> 1u) + firstPos;
1,767,542,999✔
202

203
      if (key < keyList[midPos]) {
1,767,542,999✔
204
        lastPos = midPos - 1;
1,432,045,657✔
205
      } else if (key > keyList[midPos]) {
335,497,342✔
206
        firstPos = midPos + 1;
335,160,832✔
207
      } else {
208
        break;
336,510✔
209
      }
210
    }
211
  }
212

UNCOV
213
  return midPos;
×
214
}
215

216
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
257,137,478✔
217
                                 __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order) {
218
  int32_t num = -1;
257,137,478✔
219
  int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
257,137,478✔
220

221
  if (order == TSDB_ORDER_ASC) {
257,137,478✔
222
    if (ekey < pDataBlockInfo->window.ekey && pPrimaryColumn) {
188,845,271✔
223
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
184,865,931!
224
      if (item != NULL) {
187,460,813!
UNCOV
225
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
226
      }
227
    } else {
228
      num = pDataBlockInfo->rows - startPos;
3,979,340✔
229
      if (item != NULL) {
3,979,340!
UNCOV
230
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
231
      }
232
    }
233
  } else {  // desc
234
    if (ekey > pDataBlockInfo->window.skey && pPrimaryColumn) {
68,292,207!
235
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
66,745,028!
236
      if (item != NULL) {
66,824,056!
UNCOV
237
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
238
      }
239
    } else {
240
      num = pDataBlockInfo->rows - startPos;
1,547,179✔
241
      if (item != NULL) {
1,547,179!
UNCOV
242
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
243
      }
244
    }
245
  }
246

247
  return num;
259,811,388✔
248
}
249

250
void doTimeWindowInterpolation(SArray* pPrevValues, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs,
18,801,291✔
251
                               int32_t curRowIndex, TSKEY windowKey, int32_t type, SExprSupp* pSup) {
252
  SqlFunctionCtx* pCtx = pSup->pCtx;
18,801,291✔
253

254
  int32_t index = 1;
18,801,291✔
255
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
40,237,690✔
256
    if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) {
21,436,011✔
257
      pCtx[k].start.key = INT64_MIN;
2,627,359✔
258
      continue;
2,627,359✔
259
    }
260

261
    SFunctParam*     pParam = &pCtx[k].param[0];
18,808,661✔
262
    SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, pParam->pCol->slotId);
18,808,661✔
263

264
    double v1 = 0, v2 = 0, v = 0;
18,808,435✔
265
    if (prevRowIndex == -1) {
18,808,435!
266
      SGroupKeys* p = taosArrayGet(pPrevValues, index);
×
UNCOV
267
      GET_TYPED_DATA(v1, double, pColInfo->info.type, p->pData);
×
268
    } else {
269
      GET_TYPED_DATA(v1, double, pColInfo->info.type, colDataGetData(pColInfo, prevRowIndex));
18,809,522!
270
    }
271

272
    GET_TYPED_DATA(v2, double, pColInfo->info.type, colDataGetData(pColInfo, curRowIndex));
18,809,522!
273

274
#if 0
275
    if (functionId == FUNCTION_INTERP) {
276
      if (type == RESULT_ROW_START_INTERP) {
277
        pCtx[k].start.key = prevTs;
278
        pCtx[k].start.val = v1;
279

280
        pCtx[k].end.key = curTs;
281
        pCtx[k].end.val = v2;
282

283
        if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_VARBINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR ||
284
            pColInfo->info.type == TSDB_DATA_TYPE_GEOMETRY) {
285
          if (prevRowIndex == -1) {
286
            //            pCtx[k].start.ptr = (char*)pRuntimeEnv->prevRow[index];
287
          } else {
288
            pCtx[k].start.ptr = (char*)pColInfo->pData + prevRowIndex * pColInfo->info.bytes;
289
          }
290

291
          pCtx[k].end.ptr = (char*)pColInfo->pData + curRowIndex * pColInfo->info.bytes;
292
        }
293
      }
294
    } else if (functionId == FUNCTION_TWA) {
295
#endif
296

297
    SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
18,809,522✔
298
    SPoint point2 = (SPoint){.key = curTs, .val = &v2};
18,809,522✔
299
    SPoint point = (SPoint){.key = windowKey, .val = &v};
18,809,522✔
300

301
    if (!fmIsElapsedFunc(pCtx[k].functionId)) {
18,809,522✔
302
      taosGetLinearInterpolationVal(&point, TSDB_DATA_TYPE_DOUBLE, &point1, &point2, TSDB_DATA_TYPE_DOUBLE);
6,840,381✔
303
    }
304

305
    if (type == RESULT_ROW_START_INTERP) {
18,809,040✔
306
      pCtx[k].start.key = point.key;
9,212,967✔
307
      pCtx[k].start.val = v;
9,212,967✔
308
    } else {
309
      pCtx[k].end.key = point.key;
9,596,073✔
310
      pCtx[k].end.val = v;
9,596,073✔
311
    }
312

313
    index += 1;
18,809,040✔
314
  }
315
#if 0
316
  }
317
#endif
318
}
18,801,679✔
319

320
static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t type) {
1,011,517✔
321
  if (type == RESULT_ROW_START_INTERP) {
1,011,517✔
322
    for (int32_t k = 0; k < numOfOutput; ++k) {
1,521,015✔
323
      pCtx[k].start.key = INT64_MIN;
823,714✔
324
    }
325
  } else {
326
    for (int32_t k = 0; k < numOfOutput; ++k) {
695,233✔
327
      pCtx[k].end.key = INT64_MIN;
381,017✔
328
    }
329
  }
330
}
1,011,517✔
331

332
static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, int32_t pos, SSDataBlock* pBlock,
9,895,493✔
333
                                              const TSKEY* tsCols, STimeWindow* win, SExprSupp* pSup) {
334
  bool ascQuery = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
9,895,493✔
335

336
  TSKEY curTs = tsCols[pos];
9,895,493✔
337

338
  SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
9,895,493✔
339
  TSKEY       lastTs = *(int64_t*)pTsKey->pData;
9,895,425✔
340

341
  // lastTs == INT64_MIN and pos == 0 means this is the first time window, interpolation is not needed.
342
  // start exactly from this point, no need to do interpolation
343
  TSKEY key = ascQuery ? win->skey : win->ekey;
9,895,425!
344
  if (key == curTs) {
9,895,425✔
345
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
526,861✔
346
    return true;
526,861✔
347
  }
348

349
  // it is the first time window, no need to do interpolation
350
  if (pTsKey->isNull && pos == 0) {
9,368,564!
351
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
159,495✔
352
  } else {
353
    TSKEY prevTs = ((pos == 0) ? lastTs : tsCols[pos - 1]);
9,209,069!
354
    doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, pos - 1, curTs, pos, key,
9,209,069✔
355
                              RESULT_ROW_START_INTERP, pSup);
356
  }
357

358
  return true;
9,368,702✔
359
}
360

361
static int32_t setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex,
9,906,491✔
362
                                            int32_t nextRowIndex, SArray* pDataBlock, const TSKEY* tsCols,
363
                                            TSKEY blockEkey, STimeWindow* win, bool* pRes) {
364
  int32_t code = TSDB_CODE_SUCCESS;
9,906,491✔
365
  int32_t lino = 0;
9,906,491✔
366
  int32_t order = pInfo->binfo.inputTsOrder;
9,906,491✔
367

368
  TSKEY actualEndKey = tsCols[endRowIndex];
9,906,491✔
369
  TSKEY key = (order == TSDB_ORDER_ASC) ? win->ekey : win->skey;
9,906,491!
370

371
  // not ended in current data block, do not invoke interpolation
372
  if ((key > blockEkey && (order == TSDB_ORDER_ASC)) || (key < blockEkey && (order == TSDB_ORDER_DESC))) {
9,906,491!
373
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
172,439✔
374
    (*pRes) = false;
172,492✔
375
    return code;
172,492✔
376
  }
377

378
  // there is actual end point of current time window, no interpolation needs
379
  if (key == actualEndKey) {
9,734,052✔
380
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
141,724✔
381
    (*pRes) = true;
141,724✔
382
    return code;
141,724✔
383
  }
384

385
  if (nextRowIndex < 0) {
9,592,328!
386
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
UNCOV
387
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
388
  }
389

390
  TSKEY nextKey = tsCols[nextRowIndex];
9,592,328✔
391
  doTimeWindowInterpolation(pInfo->pPrevValues, pDataBlock, actualEndKey, endRowIndex, nextKey, nextRowIndex, key,
9,592,328✔
392
                            RESULT_ROW_END_INTERP, pSup);
393
  (*pRes) = true;
9,592,313✔
394
  return code;
9,592,313✔
395
}
396

397
bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd, EStreamType blockType) {
271,294,631✔
398
  if (pInterval->interval != pInterval->sliding &&
271,294,631✔
399
      ((pWin->ekey < calStart || pWin->skey > calEnd) || (blockType == STREAM_PULL_DATA && pWin->skey < calStart))) {
128,281,472!
400
    return false;
75✔
401
  }
402

403
  return true;
271,294,556✔
404
}
405

406
bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo) {
270,950,286✔
407
  return inCalSlidingWindow(pInterval, pWin, pBlockInfo->calWin.skey, pBlockInfo->calWin.ekey, pBlockInfo->type);
270,950,286✔
408
}
409

410
int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
258,410,770✔
411
                               TSKEY* primaryKeys, int32_t prevPosition, int32_t order) {
412
  bool ascQuery = (order == TSDB_ORDER_ASC);
258,410,770✔
413

414
  int32_t precision = pInterval->precision;
258,410,770✔
415
  getNextTimeWindow(pInterval, pNext, order);
258,410,770✔
416

417
  // next time window is not in current block
418
  if ((pNext->skey > pDataBlockInfo->window.ekey && order == TSDB_ORDER_ASC) ||
258,589,769!
419
      (pNext->ekey < pDataBlockInfo->window.skey && order == TSDB_ORDER_DESC)) {
256,412,416✔
420
    return -1;
2,775,559✔
421
  }
422

423
  if (!inSlidingWindow(pInterval, pNext, pDataBlockInfo) && order == TSDB_ORDER_ASC) {
255,814,210!
424
    return -1;
30✔
425
  }
426

427
  TSKEY   skey = ascQuery ? pNext->skey : pNext->ekey;
255,957,491✔
428
  int32_t startPos = 0;
255,957,491✔
429

430
  // tumbling time window query, a special case of sliding time window query
431
  if (pInterval->sliding == pInterval->interval && prevPosition != -1) {
255,957,491✔
432
    startPos = prevPosition + 1;
127,724,331✔
433
  } else {
434
    if ((skey <= pDataBlockInfo->window.skey && ascQuery) || (skey >= pDataBlockInfo->window.ekey && !ascQuery)) {
128,233,160✔
435
      startPos = 0;
3,139,155✔
436
    } else {
437
      startPos = binarySearchForKey((char*)primaryKeys, pDataBlockInfo->rows, skey, order);
125,094,005✔
438
    }
439
  }
440

441
  /* interp query with fill should not skip time window */
442
  //  if (pQueryAttr->pointInterpQuery && pQueryAttr->fillType != TSDB_FILL_NONE) {
443
  //    return startPos;
444
  //  }
445

446
  /*
447
   * This time window does not cover any data, try next time window,
448
   * this case may happen when the time window is too small
449
   */
450
  if (primaryKeys != NULL) {
256,198,678!
451
    if (ascQuery && primaryKeys[startPos] > pNext->ekey) {
325,994,263✔
452
      TSKEY next = primaryKeys[startPos];
69,739,728✔
453
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
69,739,728✔
454
        pNext->skey = taosTimeTruncate(next, pInterval);
42,467✔
455
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
2✔
456
      } else {
457
        pNext->ekey += ((next - pNext->ekey + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
69,697,261✔
458
        pNext->skey = pNext->ekey - pInterval->interval + 1;
69,697,261✔
459
      }
460
    } else if ((!ascQuery) && primaryKeys[startPos] < pNext->skey) {
186,542,472✔
461
      TSKEY next = primaryKeys[startPos];
29,672,075✔
462
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
29,672,075!
463
        pNext->skey = taosTimeTruncate(next, pInterval);
×
UNCOV
464
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
×
465
      } else {
466
        pNext->skey -= ((pNext->skey - next + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
29,674,210✔
467
        pNext->ekey = pNext->skey + pInterval->interval - 1;
29,674,210✔
468
      }
469
    }
470
  }
471

472
  return startPos;
256,173,148✔
473
}
474

475
static bool isResultRowInterpolated(SResultRow* pResult, SResultTsInterpType type) {
29,718,395✔
476
  if (type == RESULT_ROW_START_INTERP) {
29,718,395✔
477
    return pResult->startInterp == true;
9,906,453✔
478
  } else {
479
    return pResult->endInterp == true;
19,811,942✔
480
  }
481
}
482

483
static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
19,629,255✔
484
  if (type == RESULT_ROW_START_INTERP) {
19,629,255✔
485
    pResult->startInterp = true;
9,895,530✔
486
  } else {
487
    pResult->endInterp = true;
9,733,725✔
488
  }
489
}
19,629,255✔
490

491
static int32_t doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataBlock* pBlock, SResultRow* pResult,
242,285,080✔
492
                                           STimeWindow* win, int32_t startPos, int32_t forwardRows, SExprSupp* pSup) {
493
  int32_t code = TSDB_CODE_SUCCESS;
242,285,080✔
494
  int32_t lino = 0;
242,285,080✔
495
  if (!pInfo->timeWindowInterpo) {
242,285,080✔
496
    return code;
232,516,096✔
497
  }
498

499
  if (pBlock == NULL) {
9,768,984!
500
    code = TSDB_CODE_INVALID_PARA;
×
UNCOV
501
    return code;
×
502
  }
503

504
  if (pBlock->pDataBlock == NULL) {
9,768,984!
UNCOV
505
    return code;
×
506
  }
507

508
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
9,768,984✔
509

510
  TSKEY* tsCols = (TSKEY*)(pColInfo->pData);
9,906,462✔
511
  bool   done = isResultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
9,906,462✔
512
  if (!done) {  // it is not interpolated, now start to generated the interpolated value
9,906,441✔
513
    bool interp = setTimeWindowInterpolationStartTs(pInfo, startPos, pBlock, tsCols, win, pSup);
9,895,495✔
514
    if (interp) {
9,895,531!
515
      setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
9,895,534✔
516
    }
517
  } else {
518
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
10,946✔
519
  }
520

521
  // point interpolation does not require the end key time window interpolation.
522
  // interpolation query does not generate the time window end interpolation
523
  done = isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
9,906,465✔
524
  if (!done) {
9,906,460✔
525
    int32_t endRowIndex = startPos + forwardRows - 1;
9,906,459✔
526
    int32_t nextRowIndex = endRowIndex + 1;
9,906,459✔
527

528
    // duplicated ts row does not involve in the interpolation of end value for current time window
529
    int32_t x = endRowIndex;
9,906,459✔
530
    while (x > 0) {
9,906,477✔
531
      if (tsCols[x] == tsCols[x - 1]) {
9,853,610✔
532
        x -= 1;
18✔
533
      } else {
534
        endRowIndex = x;
9,853,592✔
535
        break;
9,853,592✔
536
      }
537
    }
538

539
    TSKEY endKey = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey;
9,906,459!
540
    bool  interp = false;
9,906,459✔
541
    code = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, nextRowIndex, pBlock->pDataBlock, tsCols,
9,906,459✔
542
                                           endKey, win, &interp);
543
    QUERY_CHECK_CODE(code, lino, _end);
9,906,512!
544
    if (interp) {
9,906,512✔
545
      setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
9,734,027✔
546
    }
547
  } else {
548
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
1✔
549
  }
550

551
_end:
9,906,508✔
552
  if (code != TSDB_CODE_SUCCESS) {
9,906,508!
UNCOV
553
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
554
  }
555
  return code;
9,906,505✔
556
}
557

558
static void saveDataBlockLastRow(SArray* pPrevKeys, const SSDataBlock* pBlock, SArray* pCols) {
62,367✔
559
  if (pBlock->pDataBlock == NULL) {
62,367!
UNCOV
560
    return;
×
561
  }
562

563
  size_t num = taosArrayGetSize(pPrevKeys);
62,367✔
564
  for (int32_t k = 0; k < num; ++k) {
187,853✔
565
    SColumn* pc = taosArrayGet(pCols, k);
125,486✔
566

567
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pc->slotId);
125,486✔
568

569
    SGroupKeys* pkey = taosArrayGet(pPrevKeys, k);
125,486✔
570
    for (int32_t i = pBlock->info.rows - 1; i >= 0; --i) {
125,486!
571
      if (colDataIsNull_s(pColInfo, i)) {
250,972!
UNCOV
572
        continue;
×
573
      }
574

575
      char* val = colDataGetData(pColInfo, i);
125,486!
576
      if (IS_VAR_DATA_TYPE(pkey->type)) {
125,486!
UNCOV
577
        memcpy(pkey->pData, val, varDataTLen(val));
×
578
      } else {
579
        memcpy(pkey->pData, val, pkey->bytes);
125,486✔
580
      }
581

582
      break;
125,486✔
583
    }
584
  }
585
}
586

587
static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t numOfExprs, SResultRowInfo* pResultRowInfo,
62,367✔
588
                                       SSDataBlock* pBlock, int32_t scanFlag, int64_t* tsCols, SResultRowPosition* p) {
589
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
62,367✔
590

591
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
62,367✔
592
  SExprSupp*                pSup = &pOperatorInfo->exprSupp;
62,367✔
593

594
  int32_t startPos = 0;
62,367✔
595
  int32_t numOfOutput = pSup->numOfExprs;
62,367✔
596

597
  SResultRow* pResult = NULL;
62,367✔
598

UNCOV
599
  while (1) {
×
600
    SListNode*          pn = tdListGetHead(pResultRowInfo->openWindow);
62,367✔
601
    SOpenWindowInfo*    pOpenWin = (SOpenWindowInfo*)pn->data;
62,367✔
602
    uint64_t            groupId = pOpenWin->groupId;
62,367✔
603
    SResultRowPosition* p1 = &pOpenWin->pos;
62,367✔
604
    if (p->pageId == p1->pageId && p->offset == p1->offset) {
62,367!
605
      break;
62,367✔
606
    }
607

608
    SResultRow* pr = getResultRowByPos(pInfo->aggSup.pResultBuf, p1, false);
×
609
    if (NULL == pr) {
×
UNCOV
610
      T_LONG_JMP(pTaskInfo->env, terrno);
×
611
    }
612

613
    if (!(pr->offset == p1->offset && pr->pageId == p1->pageId)) {
×
614
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
UNCOV
615
      T_LONG_JMP(pTaskInfo->env, terrno);
×
616
    }
617

618
    if (pr->closed) {
×
619
      if (!(isResultRowInterpolated(pr, RESULT_ROW_START_INTERP) &&
×
620
             isResultRowInterpolated(pr, RESULT_ROW_END_INTERP)) ) {
×
621
        pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
UNCOV
622
        T_LONG_JMP(pTaskInfo->env, terrno);
×
623
      }
624
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
×
625
      taosMemoryFree(pNode);
×
UNCOV
626
      continue;
×
627
    }
628

629
    STimeWindow w = pr->win;
×
UNCOV
630
    int32_t     ret = setTimeWindowOutputBuf(pResultRowInfo, &w, (scanFlag == MAIN_SCAN), &pResult, groupId, pSup->pCtx,
×
631
                                             numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
632
    if (ret != TSDB_CODE_SUCCESS) {
×
UNCOV
633
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
×
634
    }
635

636
    if(isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
×
637
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
UNCOV
638
      T_LONG_JMP(pTaskInfo->env, terrno);
×
639
    }
640

641
    SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
×
642
    if (!pTsKey) {
×
643
      pTaskInfo->code = terrno;
×
UNCOV
644
      T_LONG_JMP(pTaskInfo->env, terrno);
×
645
    }
646

647
    int64_t     prevTs = *(int64_t*)pTsKey->pData;
×
648
    if (groupId == pBlock->info.id.groupId) {
×
649
      TSKEY curTs = pBlock->info.window.skey;
×
650
      if (tsCols != NULL) {
×
UNCOV
651
        curTs = tsCols[startPos];
×
652
      }
UNCOV
653
      doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, curTs, startPos, w.ekey,
×
654
                                RESULT_ROW_END_INTERP, pSup);
655
    }
656

657
    setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
×
UNCOV
658
    setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
×
659

660
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, 1);
×
661
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0,
×
662
                                          pBlock->info.rows, numOfExprs);
×
663
    if (ret != TSDB_CODE_SUCCESS) {
×
UNCOV
664
      T_LONG_JMP(pTaskInfo->env, ret);
×
665
    }
666

667
    if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
×
668
      closeResultRow(pr);
×
669
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
×
UNCOV
670
      taosMemoryFree(pNode);
×
671
    } else {  // the remains are can not be closed yet.
UNCOV
672
      break;
×
673
    }
674
  }
675
}
62,367✔
676

677
static bool tsKeyCompFn(void* l, void* r, void* param) {
12,248,789✔
678
  TSKEY*                    lTS = (TSKEY*)l;
12,248,789✔
679
  TSKEY*                    rTS = (TSKEY*)r;
12,248,789✔
680
  SIntervalAggOperatorInfo* pInfo = param;
12,248,789✔
681
  return pInfo->binfo.outputTsOrder == ORDER_ASC ? *lTS < *rTS : *lTS > *rTS;
12,248,789✔
682
}
683

684
static bool isCalculatedWin(SIntervalAggOperatorInfo* pInfo, const STimeWindow* win, uint64_t tableGroupId) {
3,565,075✔
685
  char keyBuf[sizeof(TSKEY) + sizeof(uint64_t)] = {0};
3,565,075✔
686
  SET_RES_WINDOW_KEY(keyBuf, (char*)&win->skey, sizeof(TSKEY), tableGroupId);
3,565,075✔
687
  return tSimpleHashGet(pInfo->aggSup.pResultRowHashTable, keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))) != NULL;
3,565,075✔
688
}
689

690
/**
691
 * @brief check if cur window should be filtered out by limit info
692
 * @retval true if should be filtered out
693
 * @retval false if not filtering out
694
 * @note If no limit info, we skip filtering.
695
 *       If input/output ts order mismatch, we skip filtering too.
696
 *       eg. input ts order: desc, and output ts order: asc, limit: 10
697
 *       IntervalOperator should output the first 10 windows, however, we can't find the first 10 windows until we scan
698
 *       every tuple in every block.
699
 *       And the boundedQueue keeps refreshing all records with smaller ts key.
700
 */
701
static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STimeWindow* win, uint64_t groupId, SExecTaskInfo* pTaskInfo) {
243,338,052✔
702
  int32_t code = TSDB_CODE_SUCCESS;
243,338,052✔
703
  int32_t lino = 0;
243,338,052✔
704
  if (!pOperatorInfo->limited  // if no limit info, no filter will be applied
243,338,052✔
705
      || pOperatorInfo->binfo.inputTsOrder != pOperatorInfo->binfo.outputTsOrder
10,777,958✔
706
      // if input/output ts order mismatch, no filter
707
  ) {
708
    return false;
239,639,623✔
709
  }
710

711
  if (pOperatorInfo->limit == 0) return true;
3,698,429✔
712

713
  if (pOperatorInfo->pBQ == NULL) {
3,697,813✔
714
    pOperatorInfo->pBQ = createBoundedQueue(pOperatorInfo->limit - 1, tsKeyCompFn, taosMemoryFree, pOperatorInfo);
22,205✔
715
    QUERY_CHECK_NULL(pOperatorInfo->pBQ, code, lino, _end, terrno);
22,198!
716
  }
717

718
  bool shouldFilter = false;
3,697,806✔
719
  // if BQ has been full, compare it with top of BQ
720
  if (taosBQSize(pOperatorInfo->pBQ) == taosBQMaxSize(pOperatorInfo->pBQ) + 1) {
3,697,806✔
721
    PriorityQueueNode* top = taosBQTop(pOperatorInfo->pBQ);
1,586,578✔
722
    shouldFilter = tsKeyCompFn(top->data, &win->skey, pOperatorInfo);
1,584,723✔
723
  }
724
  if (shouldFilter) {
3,665,332✔
725
    return true;
94,710✔
726
  } else if (isCalculatedWin(pOperatorInfo, win, groupId)) {
3,570,622✔
727
    return false;
2,062,235✔
728
  }
729

730
  // cur win not been filtered out and not been pushed into BQ yet, push it into BQ
731
  PriorityQueueNode node = {.data = taosMemoryMalloc(sizeof(TSKEY))};
1,574,904✔
732
  QUERY_CHECK_NULL(node.data, code, lino, _end, terrno);
1,617,657!
733

734
  *((TSKEY*)node.data) = win->skey;
1,617,657✔
735

736
  if (NULL == taosBQPush(pOperatorInfo->pBQ, &node)) {
1,617,657!
737
    taosMemoryFree(node.data);
×
UNCOV
738
    return true;
×
739
  }
740

741
_end:
1,500,374✔
742
  if (code != TSDB_CODE_SUCCESS) {
1,500,374!
743
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
744
    pTaskInfo->code = code;
×
UNCOV
745
    T_LONG_JMP(pTaskInfo->env, code);
×
746
  }
747
  return false;
1,500,374✔
748
}
749

750
static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
2,378,337✔
751
                            int32_t scanFlag) {
752
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
2,378,337✔
753

754
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
2,378,337✔
755
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
2,378,337✔
756

757
  int32_t     startPos = 0;
2,378,337✔
758
  int32_t     numOfOutput = pSup->numOfExprs;
2,378,337✔
759
  int64_t*    tsCols = extractTsCol(pBlock, pInfo, pTaskInfo);
2,378,337✔
760
  uint64_t    tableGroupId = pBlock->info.id.groupId;
2,378,261✔
761
  bool        ascScan = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
2,378,261✔
762
  TSKEY       ts = getStartTsKey(&pBlock->info.window, tsCols);
2,378,261✔
763
  SResultRow* pResult = NULL;
2,378,253✔
764

765
  if (tableGroupId != pInfo->curGroupId) {
2,378,253✔
766
    pInfo->handledGroupNum += 1;
117,138✔
767
    if (pInfo->slimited && pInfo->handledGroupNum > pInfo->slimit) {
117,138✔
768
      return true;
138✔
769
    } else {
770
      pInfo->curGroupId = tableGroupId;
117,000✔
771
      destroyBoundedQueue(pInfo->pBQ);
117,000✔
772
      pInfo->pBQ = NULL;
116,989✔
773
    }
774
  }
775

776
  STimeWindow win =
777
      getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->binfo.inputTsOrder);
2,378,104✔
778
  if (filterWindowWithLimit(pInfo, &win, tableGroupId, pTaskInfo)) return false;
2,378,037✔
779

780
  int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
2,291,086✔
781
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
782
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
2,291,130!
783
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
4!
784
  }
785

786
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
2,291,126✔
787
  int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
2,291,126✔
788
                                                 pInfo->binfo.inputTsOrder);
789

790
  // prev time window not interpolation yet.
791
  if (pInfo->timeWindowInterpo) {
2,291,323✔
792
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
62,367✔
793
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
62,367✔
794

795
    // restore current time window
796
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
62,367✔
797
                                 numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
798
    if (ret != TSDB_CODE_SUCCESS) {
62,367!
UNCOV
799
      T_LONG_JMP(pTaskInfo->env, ret);
×
800
    }
801

802
    // window start key interpolation
803
    ret = doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup);
62,367✔
804
    if (ret != TSDB_CODE_SUCCESS) {
62,367!
UNCOV
805
      T_LONG_JMP(pTaskInfo->env, ret);
×
806
    }
807
  }
808

809
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, 1);
2,291,323✔
810
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
2,291,287✔
811
                                  pBlock->info.rows, numOfOutput);
2,291,287✔
812
  if (ret != TSDB_CODE_SUCCESS) {
2,291,219!
UNCOV
813
    T_LONG_JMP(pTaskInfo->env, ret);
×
814
  }
815

816
  doCloseWindow(pResultRowInfo, pInfo, pResult);
2,291,219✔
817

818
  STimeWindow nextWin = win;
2,291,193✔
819
  while (1) {
241,015,166✔
820
    int32_t prevEndPos = forwardRows - 1 + startPos;
243,306,359✔
821
    startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos,
243,306,359✔
822
                                      pInfo->binfo.inputTsOrder);
823
    if (startPos < 0 || filterWindowWithLimit(pInfo, &nextWin, tableGroupId, pTaskInfo)) {
243,487,930!
824
      break;
825
    }
826
    // null data, failed to allocate more memory buffer
827
    int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
240,767,029✔
828
                                          pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
829
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
239,451,040!
UNCOV
830
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
×
831
    }
832

833
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
239,668,807✔
834
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
239,668,807✔
835
                                           pInfo->binfo.inputTsOrder);
836
    // window start(end) key interpolation
837
    code = doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
242,271,930✔
838
    if (code != TSDB_CODE_SUCCESS) {
242,035,035!
UNCOV
839
      T_LONG_JMP(pTaskInfo->env, code);
×
840
    }
841
    // TODO: add to open window? how to close the open windows after input blocks exhausted?
842
#if 0
843
    if ((ascScan && ekey <= pBlock->info.window.ekey) ||
844
        (!ascScan && ekey >= pBlock->info.window.skey)) {
845
      // window start(end) key interpolation
846
      doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
847
    } else if (pInfo->timeWindowInterpo) {
848
      addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
849
    }
850
#endif
851
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, 1);
242,035,035✔
852
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
241,659,045✔
853
                                          pBlock->info.rows, numOfOutput);
241,659,045✔
854
    if (ret != TSDB_CODE_SUCCESS) {
241,054,616!
UNCOV
855
      T_LONG_JMP(pTaskInfo->env, ret);
×
856
    }
857
    doCloseWindow(pResultRowInfo, pInfo, pResult);
241,054,616✔
858
  }
859

860
  if (pInfo->timeWindowInterpo) {
2,188,116✔
861
    saveDataBlockLastRow(pInfo->pPrevValues, pBlock, pInfo->pInterpCols);
62,367✔
862
  }
863
  return false;
2,291,292✔
864
}
865

866
void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult) {
243,336,790✔
867
  // current result is done in computing final results.
868
  if (pInfo->timeWindowInterpo && isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
243,336,790✔
869
    closeResultRow(pResult);
9,734,085✔
870
    SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
9,734,073✔
871
    taosMemoryFree(pNode);
9,734,058✔
872
  }
873
}
243,336,714✔
874

875
SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId,
62,367✔
876
                                       SExecTaskInfo* pTaskInfo) {
877
  int32_t         code = TSDB_CODE_SUCCESS;
62,367✔
878
  int32_t         lino = 0;
62,367✔
879
  SOpenWindowInfo openWin = {0};
62,367✔
880
  openWin.pos.pageId = pResult->pageId;
62,367✔
881
  openWin.pos.offset = pResult->offset;
62,367✔
882
  openWin.groupId = groupId;
62,367✔
883
  SListNode* pn = tdListGetTail(pResultRowInfo->openWindow);
62,367✔
884
  if (pn == NULL) {
62,367✔
885
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
60,143✔
886
    QUERY_CHECK_CODE(code, lino, _end);
60,143!
887
    return openWin.pos;
60,143✔
888
  }
889

890
  SOpenWindowInfo* px = (SOpenWindowInfo*)pn->data;
2,224✔
891
  if (px->pos.pageId != openWin.pos.pageId || px->pos.offset != openWin.pos.offset || px->groupId != openWin.groupId) {
2,224!
892
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
×
UNCOV
893
    QUERY_CHECK_CODE(code, lino, _end);
×
894
  }
895

896
_end:
2,224✔
897
  if (code != TSDB_CODE_SUCCESS) {
2,224!
898
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
899
    pTaskInfo->code = code;
×
UNCOV
900
    T_LONG_JMP(pTaskInfo->env, code);
×
901
  }
902
  return openWin.pos;
2,224✔
903
}
904

905
int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo) {
2,807,751✔
906
  TSKEY* tsCols = NULL;
2,807,751✔
907

908
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
2,807,751!
909
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
2,807,790✔
910
    if (!pColDataInfo) {
2,807,644!
911
      pTaskInfo->code = terrno;
×
UNCOV
912
      T_LONG_JMP(pTaskInfo->env, terrno);
×
913
    }
914

915
    tsCols = (int64_t*)pColDataInfo->pData;
2,807,644✔
916
    if(tsCols[0] == 0) {
2,807,644✔
917
      qWarn("%s at line %d.block start ts:%" PRId64 ",end ts:%" PRId64, __func__, __LINE__, tsCols[0], tsCols[pBlock->info.rows - 1]);
1!
918
    }
919

920
    if (tsCols[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
2,807,673!
921
      int32_t code = blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
821,087✔
922
      if (code != TSDB_CODE_SUCCESS) {
821,083!
923
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
924
        pTaskInfo->code = code;
×
UNCOV
925
        T_LONG_JMP(pTaskInfo->env, code);
×
926
      }
927
    }
928
  }
929

930
  return tsCols;
2,807,630✔
931
}
932

933
static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
1,941,164✔
934
  if (OPTR_IS_OPENED(pOperator)) {
1,941,164✔
935
    return TSDB_CODE_SUCCESS;
478,927✔
936
  }
937

938
  int32_t        code = TSDB_CODE_SUCCESS;
1,462,237✔
939
  int32_t        lino = 0;
1,462,237✔
940
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1,462,237✔
941
  SOperatorInfo* downstream = pOperator->pDownstream[0];
1,462,237✔
942

943
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
1,462,237✔
944
  SExprSupp*                pSup = &pOperator->exprSupp;
1,462,237✔
945

946
  int32_t scanFlag = MAIN_SCAN;
1,462,237✔
947
  int64_t st = taosGetTimestampUs();
1,465,312✔
948

949
  pInfo->cleanGroupResInfo = false;
1,465,312✔
950
  while (1) {
2,378,183✔
951
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
3,843,495✔
952
    if (pBlock == NULL) {
3,846,758✔
953
      break;
1,468,665✔
954
    }
955

956
    pInfo->binfo.pRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
2,378,093✔
957

958
    if (pInfo->scalarSupp.pExprInfo != NULL) {
2,378,093✔
959
      SExprSupp* pExprSup = &pInfo->scalarSupp;
105,907✔
960
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
105,907✔
961
      QUERY_CHECK_CODE(code, lino, _end);
105,903!
962
    }
963

964
    // the pDataBlock are always the same one, no need to call this again
965
    code = setInputDataBlock(pSup, pBlock, pInfo->binfo.inputTsOrder, scanFlag, true);
2,378,089✔
966
    QUERY_CHECK_CODE(code, lino, _end);
2,378,370!
967
    if (hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag)) break;
2,378,370✔
968
  }
969

970
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->binfo.outputTsOrder);
1,468,803✔
971
  QUERY_CHECK_CODE(code, lino, _end);
1,468,806!
972
  pInfo->cleanGroupResInfo = true;
1,468,806✔
973

974
  OPTR_SET_OPENED(pOperator);
1,468,806✔
975

976
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
1,468,972✔
977

978
_end:
1,468,972✔
979
  if (code != TSDB_CODE_SUCCESS) {
1,468,972!
980
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
981
    pTaskInfo->code = code;
×
UNCOV
982
    T_LONG_JMP(pTaskInfo->env, code);
×
983
  }
984
  return code;
1,468,972✔
985
}
986

987
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock) {
70,335✔
988
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
70,335✔
989
  SExprSupp*     pSup = &pOperator->exprSupp;
70,335✔
990

991
  SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->stateCol.slotId);
70,335✔
992
  if (!pStateColInfoData) {
70,336!
993
    pTaskInfo->code = terrno;
×
UNCOV
994
    T_LONG_JMP(pTaskInfo->env, terrno);
×
995
  }
996
  int64_t          gid = pBlock->info.id.groupId;
70,336✔
997

998
  bool    masterScan = true;
70,336✔
999
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
70,336✔
1000
  int32_t bytes = pStateColInfoData->info.bytes;
70,336✔
1001

1002
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
70,336✔
1003
  if (!pColInfoData) {
70,336✔
1004
    pTaskInfo->code = terrno;
1✔
UNCOV
1005
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1006
  }
1007
  TSKEY*           tsList = (TSKEY*)pColInfoData->pData;
70,335✔
1008

1009
  SWindowRowsSup* pRowSup = &pInfo->winSup;
70,335✔
1010
  pRowSup->numOfRows = 0;
70,335✔
1011
  pRowSup->startRowIndex = 0;
70,335✔
1012

1013
  struct SColumnDataAgg* pAgg = NULL;
70,335✔
1014
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
18,974,467✔
1015
    pAgg = (pBlock->pBlockAgg != NULL) ? &pBlock->pBlockAgg[pInfo->stateCol.slotId] : NULL;
18,904,120!
1016
    if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pAgg)) {
37,808,240✔
1017
      continue;
7,390✔
1018
    }
1019

1020
    char* val = colDataGetData(pStateColInfoData, j);
18,896,730!
1021

1022
    if (gid != pRowSup->groupId || !pInfo->hasKey) {
18,896,730✔
1023
      // todo extract method
1024
      if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) {
57,488!
1025
        varDataCopy(pInfo->stateKey.pData, val);
39✔
1026
      } else {
1027
        memcpy(pInfo->stateKey.pData, val, bytes);
57,449✔
1028
      }
1029

1030
      pInfo->hasKey = true;
57,488✔
1031

1032
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
57,488✔
1033
      doKeepTuple(pRowSup, tsList[j], gid);
57,682✔
1034
    } else if (compareVal(val, &pInfo->stateKey)) {
18,839,242✔
1035
      doKeepTuple(pRowSup, tsList[j], gid);
2,170,081✔
1036
    } else {  // a new state window started
1037
      SResultRow* pResult = NULL;
16,669,354✔
1038

1039
      // keep the time window for the closed time window.
1040
      STimeWindow window = pRowSup->win;
16,669,354✔
1041

1042
      pRowSup->win.ekey = pRowSup->win.skey;
16,669,354✔
1043
      int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
16,669,354✔
1044
                                           numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1045
      if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
16,668,866!
UNCOV
1046
        T_LONG_JMP(pTaskInfo->env, ret);
×
1047
      }
1048

1049
      updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
16,668,866✔
1050
      ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData,
16,668,874✔
1051
                                            pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
16,668,874✔
1052
      if (ret != TSDB_CODE_SUCCESS) {
16,669,136!
UNCOV
1053
        T_LONG_JMP(pTaskInfo->env, ret);
×
1054
      }
1055

1056
      // here we start a new session window
1057
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
16,669,136✔
1058
      doKeepTuple(pRowSup, tsList[j], gid);
16,669,080✔
1059

1060
      // todo extract method
1061
      if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) {
16,669,077!
UNCOV
1062
        varDataCopy(pInfo->stateKey.pData, val);
×
1063
      } else {
1064
        memcpy(pInfo->stateKey.pData, val, bytes);
16,669,077✔
1065
      }
1066
    }
1067
  }
1068

1069
  SResultRow* pResult = NULL;
70,347✔
1070
  pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
70,347✔
1071
  int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
70,347✔
1072
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1073
  if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
70,336!
UNCOV
1074
    T_LONG_JMP(pTaskInfo->env, ret);
×
1075
  }
1076

1077
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
70,336✔
1078
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
70,336✔
1079
                                        pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
70,336✔
1080
  if (ret != TSDB_CODE_SUCCESS) {
70,336!
UNCOV
1081
    T_LONG_JMP(pTaskInfo->env, ret);
×
1082
  }
1083
}
70,336✔
1084

1085
static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) {
73,592✔
1086
  if (OPTR_IS_OPENED(pOperator)) {
73,592✔
1087
    return TSDB_CODE_SUCCESS;
2,493✔
1088
  }
1089

1090
  int32_t                   code = TSDB_CODE_SUCCESS;
71,099✔
1091
  int32_t                   lino = 0;
71,099✔
1092
  SStateWindowOperatorInfo* pInfo = pOperator->info;
71,099✔
1093
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
71,099✔
1094

1095
  SExprSupp* pSup = &pOperator->exprSupp;
71,099✔
1096
  int32_t    order = pInfo->binfo.inputTsOrder;
71,099✔
1097
  int64_t    st = taosGetTimestampUs();
71,099✔
1098

1099
  SOperatorInfo* downstream = pOperator->pDownstream[0];
71,099✔
1100
  pInfo->cleanGroupResInfo = false;
71,099✔
1101
  while (1) {
70,336✔
1102
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
141,435✔
1103
    if (pBlock == NULL) {
141,435✔
1104
      break;
71,099✔
1105
    }
1106

1107
    pInfo->binfo.pRes->info.scanFlag = pBlock->info.scanFlag;
70,336✔
1108
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
70,336✔
1109
    QUERY_CHECK_CODE(code, lino, _end);
70,336!
1110

1111
    code = blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
70,336✔
1112
    QUERY_CHECK_CODE(code, lino, _end);
70,336!
1113

1114
    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
1115
    if (pInfo->scalarSup.pExprInfo != NULL) {
70,336✔
1116
      pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
5,247✔
1117
                                              pInfo->scalarSup.numOfExprs, NULL);
1118
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
5,247!
UNCOV
1119
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1120
      }
1121
    }
1122

1123
    doStateWindowAggImpl(pOperator, pInfo, pBlock);
70,336✔
1124
  }
1125

1126
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
71,099✔
1127
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
71,099✔
1128
  QUERY_CHECK_CODE(code, lino, _end);
71,099!
1129
  pInfo->cleanGroupResInfo = true;
71,099✔
1130
  pOperator->status = OP_RES_TO_RETURN;
71,099✔
1131

1132
_end:
71,099✔
1133
  if (code != TSDB_CODE_SUCCESS) {
71,099!
1134
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1135
    pTaskInfo->code = code;
×
UNCOV
1136
    T_LONG_JMP(pTaskInfo->env, code);
×
1137
  }
1138
  return code;
71,099✔
1139
}
1140

1141
static int32_t doStateWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
128,165✔
1142
  if (pOperator->status == OP_EXEC_DONE) {
128,165✔
1143
    (*ppRes) = NULL;
54,573✔
1144
    return TSDB_CODE_SUCCESS;
54,573✔
1145
  }
1146

1147
  int32_t                   code = TSDB_CODE_SUCCESS;
73,592✔
1148
  int32_t                   lino = 0;
73,592✔
1149
  SStateWindowOperatorInfo* pInfo = pOperator->info;
73,592✔
1150
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
73,592✔
1151
  SOptrBasicInfo*           pBInfo = &pInfo->binfo;
73,592✔
1152

1153
  code = pOperator->fpSet._openFn(pOperator);
73,592✔
1154
  QUERY_CHECK_CODE(code, lino, _end);
73,592!
1155

1156
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
73,592✔
1157
  QUERY_CHECK_CODE(code, lino, _end);
73,592!
1158

UNCOV
1159
  while (1) {
×
1160
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
73,592✔
1161
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
73,592✔
1162
    QUERY_CHECK_CODE(code, lino, _end);
73,592!
1163

1164
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
73,592✔
1165
    if (!hasRemain) {
73,592✔
1166
      setOperatorCompleted(pOperator);
71,099✔
1167
      break;
71,099✔
1168
    }
1169

1170
    if (pBInfo->pRes->info.rows > 0) {
2,493!
1171
      break;
2,493✔
1172
    }
1173
  }
1174

1175
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
73,592✔
1176

1177
_end:
73,592✔
1178
  if (code != TSDB_CODE_SUCCESS) {
73,592!
1179
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1180
    pTaskInfo->code = code;
×
UNCOV
1181
    T_LONG_JMP(pTaskInfo->env, code);
×
1182
  }
1183
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
73,592✔
1184
  return code;
73,592✔
1185
}
1186

1187
static int32_t doBuildIntervalResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
2,783,949✔
1188
  int32_t                   code = TSDB_CODE_SUCCESS;
2,783,949✔
1189
  int32_t                   lino = 0;
2,783,949✔
1190
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
2,783,949✔
1191
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
2,783,949✔
1192

1193
  if (pOperator->status == OP_EXEC_DONE) {
2,783,949✔
1194
    (*ppRes) = NULL;
842,494✔
1195
    return code;
842,494✔
1196
  }
1197

1198
  SSDataBlock* pBlock = pInfo->binfo.pRes;
1,941,455✔
1199
  code = pOperator->fpSet._openFn(pOperator);
1,941,455✔
1200
  QUERY_CHECK_CODE(code, lino, _end);
1,947,755!
1201

1202
  while (1) {
15✔
1203
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
1,947,770✔
1204
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
1,947,577✔
1205
    QUERY_CHECK_CODE(code, lino, _end);
1,947,520!
1206

1207
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
1,947,520✔
1208
    if (!hasRemain) {
1,947,480✔
1209
      setOperatorCompleted(pOperator);
1,468,331✔
1210
      break;
1,468,689✔
1211
    }
1212

1213
    if (pBlock->info.rows > 0) {
479,149✔
1214
      break;
479,134✔
1215
    }
1216
  }
1217

1218
  size_t rows = pBlock->info.rows;
1,947,823✔
1219
  pOperator->resultInfo.totalRows += rows;
1,947,823✔
1220

1221
_end:
1,947,823✔
1222
  if (code != TSDB_CODE_SUCCESS) {
1,947,823!
1223
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1224
    pTaskInfo->code = code;
×
UNCOV
1225
    T_LONG_JMP(pTaskInfo->env, code);
×
1226
  }
1227
  (*ppRes) = (rows == 0) ? NULL : pBlock;
1,947,823✔
1228
  return code;
1,947,823✔
1229
}
1230

1231
static void destroyStateWindowOperatorInfo(void* param) {
71,098✔
1232
  if (param == NULL) {
71,098!
UNCOV
1233
    return;
×
1234
  }
1235
  SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
71,098✔
1236
  cleanupBasicInfo(&pInfo->binfo);
71,098✔
1237
  taosMemoryFreeClear(pInfo->stateKey.pData);
71,098!
1238
  if (pInfo->pOperator) {
71,099!
1239
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
71,099✔
1240
                      pInfo->cleanGroupResInfo);
71,099✔
1241
    pInfo->pOperator = NULL;
71,099✔
1242
  }
1243

1244
  cleanupExprSupp(&pInfo->scalarSup);
71,099✔
1245
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
71,099✔
1246
  cleanupAggSup(&pInfo->aggSup);
71,099✔
1247
  cleanupGroupResInfo(&pInfo->groupResInfo);
71,099✔
1248

1249
  taosMemoryFreeClear(param);
71,099!
1250
}
1251

1252
static void freeItem(void* param) {
188,923✔
1253
  SGroupKeys* pKey = (SGroupKeys*)param;
188,923✔
1254
  taosMemoryFree(pKey->pData);
188,923✔
1255
}
188,922✔
1256

1257
void destroyIntervalOperatorInfo(void* param) {
2,003,364✔
1258
  if (param == NULL) {
2,003,364!
UNCOV
1259
    return;
×
1260
  }
1261

1262
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
2,003,364✔
1263

1264
  cleanupBasicInfo(&pInfo->binfo);
2,003,364✔
1265
  if (pInfo->pOperator) {
2,003,440!
1266
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
2,003,448✔
1267
                      pInfo->cleanGroupResInfo);
2,003,448✔
1268
    pInfo->pOperator = NULL;
2,003,179✔
1269
  }
1270

1271
  cleanupAggSup(&pInfo->aggSup);
2,003,171✔
1272
  cleanupExprSupp(&pInfo->scalarSupp);
2,003,480✔
1273

1274
  pInfo->binfo.resultRowInfo.openWindow = tdListFree(pInfo->binfo.resultRowInfo.openWindow);
2,003,412✔
1275

1276
  taosArrayDestroy(pInfo->pInterpCols);
2,003,403✔
1277
  pInfo->pInterpCols = NULL;
2,003,368✔
1278

1279
  taosArrayDestroyEx(pInfo->pPrevValues, freeItem);
2,003,368✔
1280
  pInfo->pPrevValues = NULL;
2,003,327✔
1281

1282
  cleanupGroupResInfo(&pInfo->groupResInfo);
2,003,327✔
1283
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
2,003,416✔
1284
  destroyBoundedQueue(pInfo->pBQ);
2,003,492✔
1285
  taosMemoryFreeClear(param);
2,003,374!
1286
}
1287

1288
static int32_t timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SIntervalAggOperatorInfo* pInfo,
1,999,515✔
1289
                                   bool* pRes) {
1290
  // the primary timestamp column
1291
  bool    needed = false;
1,999,515✔
1292
  int32_t code = TSDB_CODE_SUCCESS;
1,999,515✔
1293
  int32_t lino = 0;
1,999,515✔
1294
  void*   tmp = NULL;
1,999,515✔
1295

1296
  for (int32_t i = 0; i < numOfCols; ++i) {
6,540,045✔
1297
    SExprInfo* pExpr = pCtx[i].pExpr;
4,633,671✔
1298
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
4,633,671✔
1299
      needed = true;
94,087✔
1300
      break;
94,087✔
1301
    }
1302
  }
1303

1304
  if (needed) {
2,000,461✔
1305
    pInfo->pInterpCols = taosArrayInit(4, sizeof(SColumn));
94,087✔
1306
    QUERY_CHECK_NULL(pInfo->pInterpCols, code, lino, _end, terrno);
94,087!
1307

1308
    pInfo->pPrevValues = taosArrayInit(4, sizeof(SGroupKeys));
94,087✔
1309
    QUERY_CHECK_NULL(pInfo->pPrevValues, code, lino, _end, terrno);
94,087!
1310

1311
    {  // ts column
1312
      SColumn c = {0};
94,087✔
1313
      c.colId = 1;
94,087✔
1314
      c.slotId = pInfo->primaryTsIndex;
94,087✔
1315
      c.type = TSDB_DATA_TYPE_TIMESTAMP;
94,087✔
1316
      c.bytes = sizeof(int64_t);
94,087✔
1317
      tmp = taosArrayPush(pInfo->pInterpCols, &c);
94,087✔
1318
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
94,087!
1319

1320
      SGroupKeys key;
1321
      key.bytes = c.bytes;
94,087✔
1322
      key.type = c.type;
94,087✔
1323
      key.isNull = true;  // to denote no value is assigned yet
94,087✔
1324
      key.pData = taosMemoryCalloc(1, c.bytes);
94,087✔
1325
      QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
94,087!
1326

1327
      tmp = taosArrayPush(pInfo->pPrevValues, &key);
94,087✔
1328
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
94,087!
1329
    }
1330
  }
1331

1332
  for (int32_t i = 0; i < numOfCols; ++i) {
6,632,577✔
1333
    SExprInfo* pExpr = pCtx[i].pExpr;
4,629,423✔
1334

1335
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
4,629,423✔
1336
      SFunctParam* pParam = &pExpr->base.pParam[0];
94,623✔
1337

1338
      SColumn c = *pParam->pCol;
94,623✔
1339
      tmp = taosArrayPush(pInfo->pInterpCols, &c);
94,623✔
1340
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
94,839!
1341

1342
      SGroupKeys key = {0};
94,839✔
1343
      key.bytes = c.bytes;
94,839✔
1344
      key.type = c.type;
94,839✔
1345
      key.isNull = false;
94,839✔
1346
      key.pData = taosMemoryCalloc(1, c.bytes);
94,839✔
1347
      QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
94,839!
1348

1349
      tmp = taosArrayPush(pInfo->pPrevValues, &key);
94,839✔
1350
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
94,839!
1351
    }
1352
  }
1353

1354
_end:
2,003,154✔
1355
  if (code != TSDB_CODE_SUCCESS) {
2,003,154!
UNCOV
1356
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1357
  }
1358
  *pRes = needed;
1,999,711✔
1359
  return code;
1,999,711✔
1360
}
1361

1362
int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
1,466,174✔
1363
                                   SOperatorInfo** pOptrInfo) {
1364
  QRY_PARAM_CHECK(pOptrInfo);
1,466,174!
1365

1366
  int32_t                   code = TSDB_CODE_SUCCESS;
1,466,174✔
1367
  int32_t                   lino = 0;
1,466,174✔
1368
  SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
1,466,174✔
1369
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1,466,762✔
1370
  if (pInfo == NULL || pOperator == NULL) {
1,467,355!
1371
    code = terrno;
×
1372
    lino = __LINE__;
×
UNCOV
1373
    goto _error;
×
1374
  }
1375

1376
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->window.node.pOutputDataBlockDesc);
1,467,448✔
1377
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
1,468,426!
1378
  initBasicInfo(&pInfo->binfo, pResBlock);
1,468,426✔
1379

1380
  SExprSupp* pSup = &pOperator->exprSupp;
1,467,938✔
1381
  pSup->hasWindowOrGroup = true;
1,467,938✔
1382

1383
  pInfo->primaryTsIndex = ((SColumnNode*)pPhyNode->window.pTspk)->slotId;
1,467,938✔
1384

1385
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
1,467,938✔
1386
  initResultSizeInfo(&pOperator->resultInfo, 512);
1,467,938✔
1387
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
1,468,361✔
1388
  QUERY_CHECK_CODE(code, lino, _error);
1,467,971!
1389

1390
  int32_t    num = 0;
1,467,971✔
1391
  SExprInfo* pExprInfo = NULL;
1,467,971✔
1392
  code = createExprInfo(pPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
1,467,971✔
1393
  QUERY_CHECK_CODE(code, lino, _error);
1,468,646!
1394

1395
  code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
1,468,646✔
1396
                    &pTaskInfo->storageAPI.functionStore);
1397
  QUERY_CHECK_CODE(code, lino, _error);
1,466,186!
1398

1399
  SInterval interval = {.interval = pPhyNode->interval,
1,466,186✔
1400
                        .sliding = pPhyNode->sliding,
1,466,186✔
1401
                        .intervalUnit = pPhyNode->intervalUnit,
1,466,186✔
1402
                        .slidingUnit = pPhyNode->slidingUnit,
1,466,186✔
1403
                        .offset = pPhyNode->offset,
1,466,186✔
1404
                        .precision = ((SColumnNode*)pPhyNode->window.pTspk)->node.resType.precision};
1,466,186✔
1405

1406
  STimeWindowAggSupp as = {
1,466,186✔
1407
      .waterMark = pPhyNode->window.watermark,
1,466,186✔
1408
      .calTrigger = pPhyNode->window.triggerType,
1,466,186✔
1409
      .maxTs = INT64_MIN,
1410
  };
1411

1412
  pInfo->win = pTaskInfo->window;
1,466,186✔
1413
  pInfo->binfo.inputTsOrder = pPhyNode->window.node.inputTsOrder;
1,466,186✔
1414
  pInfo->binfo.outputTsOrder = pPhyNode->window.node.outputTsOrder;
1,466,186✔
1415
  pInfo->interval = interval;
1,466,186✔
1416
  pInfo->twAggSup = as;
1,466,186✔
1417
  pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock;
1,466,186✔
1418
  if (pPhyNode->window.node.pLimit) {
1,466,186✔
1419
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pLimit;
43,448✔
1420
    pInfo->limited = true;
43,448✔
1421
    pInfo->limit = pLimit->limit + pLimit->offset;
43,448✔
1422
  }
1423
  if (pPhyNode->window.node.pSlimit) {
1,466,186✔
1424
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pSlimit;
563✔
1425
    pInfo->slimited = true;
563✔
1426
    pInfo->slimit = pLimit->limit + pLimit->offset;
563✔
1427
    pInfo->curGroupId = UINT64_MAX;
563✔
1428
  }
1429

1430
  if (pPhyNode->window.pExprs != NULL) {
1,466,186✔
1431
    int32_t    numOfScalar = 0;
3,714✔
1432
    SExprInfo* pScalarExprInfo = NULL;
3,714✔
1433
    code = createExprInfo(pPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
3,714✔
1434
    QUERY_CHECK_CODE(code, lino, _error);
3,714!
1435

1436
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
3,714✔
1437
    if (code != TSDB_CODE_SUCCESS) {
3,713!
UNCOV
1438
      goto _error;
×
1439
    }
1440
  }
1441

1442
  code = filterInitFromNode((SNode*)pPhyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
1,466,185✔
1443
  if (code != TSDB_CODE_SUCCESS) {
1,465,425!
UNCOV
1444
    goto _error;
×
1445
  }
1446

1447
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
1,465,425✔
1448
  QUERY_CHECK_CODE(code, lino, _error);
1,468,188!
1449

1450
  pInfo->timeWindowInterpo = false;
1,468,188✔
1451
  code = timeWindowinterpNeeded(pSup->pCtx, num, pInfo, &pInfo->timeWindowInterpo);
1,468,188✔
1452
  QUERY_CHECK_CODE(code, lino, _error);
1,465,735!
1453
  if (pInfo->timeWindowInterpo) {
1,465,735✔
1454
    pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
94,087✔
1455
    if (pInfo->binfo.resultRowInfo.openWindow == NULL) {
94,087!
UNCOV
1456
      goto _error;
×
1457
    }
1458
  }
1459

1460
  pInfo->pOperator = pOperator;
1,465,735✔
1461
  pInfo->cleanGroupResInfo = false;
1,465,735✔
1462
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
1,465,735✔
1463
  setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED,
1,464,451✔
1464
                  pInfo, pTaskInfo);
1465

1466
  pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResultNext, NULL, destroyIntervalOperatorInfo,
1,464,598✔
1467
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1468

1469
  code = appendDownstream(pOperator, &downstream, 1);
1,464,897✔
1470
  if (code != TSDB_CODE_SUCCESS) {
1,466,808!
UNCOV
1471
    goto _error;
×
1472
  }
1473

1474
  *pOptrInfo = pOperator;
1,466,808✔
1475
  return TSDB_CODE_SUCCESS;
1,466,808✔
1476

1477
_error:
×
1478
  if (pInfo != NULL) {
×
UNCOV
1479
    destroyIntervalOperatorInfo(pInfo);
×
1480
  }
1481

1482
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1483
  pTaskInfo->code = code;
×
1484
  qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code));
×
UNCOV
1485
  return code;
×
1486
}
1487

1488
// todo handle multiple timeline cases. assume no timeline interweaving
1489
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) {
114,164✔
1490
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
114,164✔
1491
  SExprSupp*     pSup = &pOperator->exprSupp;
114,164✔
1492

1493
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
114,164✔
1494
  if (!pColInfoData) {
114,164!
1495
    pTaskInfo->code = terrno;
×
UNCOV
1496
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1497
  }
1498

1499
  bool    masterScan = true;
114,164✔
1500
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
114,164✔
1501
  int64_t gid = pBlock->info.id.groupId;
114,164✔
1502

1503
  int64_t gap = pInfo->gap;
114,164✔
1504

1505
  if (!pInfo->reptScan) {
114,164✔
1506
    pInfo->reptScan = true;
81,151✔
1507
    pInfo->winSup.prevTs = INT64_MIN;
81,151✔
1508
  }
1509

1510
  SWindowRowsSup* pRowSup = &pInfo->winSup;
114,164✔
1511
  pRowSup->numOfRows = 0;
114,164✔
1512
  pRowSup->startRowIndex = 0;
114,164✔
1513

1514
  // In case of ascending or descending order scan data, only one time window needs to be kepted for each table.
1515
  TSKEY* tsList = (TSKEY*)pColInfoData->pData;
114,164✔
1516
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
28,990,748✔
1517
    if (gid != pRowSup->groupId || pInfo->winSup.prevTs == INT64_MIN) {
28,876,548✔
1518
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
102,256✔
1519
      doKeepTuple(pRowSup, tsList[j], gid);
102,270✔
1520
    } else if (((tsList[j] - pRowSup->prevTs >= 0) && (tsList[j] - pRowSup->prevTs <= gap)) ||
28,774,292✔
1521
               ((pRowSup->prevTs - tsList[j] >= 0) && (pRowSup->prevTs - tsList[j] <= gap))) {
7,728,388!
1522
      // The gap is less than the threshold, so it belongs to current session window that has been opened already.
1523
      doKeepTuple(pRowSup, tsList[j], gid);
21,045,924✔
1524
    } else {  // start a new session window
1525
      // start a new session window
1526
      if (pRowSup->numOfRows > 0) {  // handled data that belongs to the previous session window
7,728,368✔
1527
        SResultRow* pResult = NULL;
7,723,449✔
1528

1529
        // keep the time window for the closed time window.
1530
        STimeWindow window = pRowSup->win;
7,723,449✔
1531

1532
        int32_t ret =
1533
            setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
7,723,449✔
1534
                                   numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1535
        if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
7,723,413!
UNCOV
1536
          T_LONG_JMP(pTaskInfo->env, ret);
×
1537
        }
1538

1539
        // pInfo->numOfRows data belong to the current session window
1540
        updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
7,723,413✔
1541
        ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
7,723,404✔
1542
                                              pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
7,723,404✔
1543
        if (ret != TSDB_CODE_SUCCESS) {
7,723,518!
UNCOV
1544
          T_LONG_JMP(pTaskInfo->env, ret);
×
1545
        }
1546
      }
1547

1548
      // here we start a new session window
1549
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
7,728,437✔
1550
      doKeepTuple(pRowSup, tsList[j], gid);
7,728,420✔
1551
    }
1552
  }
1553

1554
  SResultRow* pResult = NULL;
114,200✔
1555
  pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
114,200✔
1556
  int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
114,200✔
1557
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1558
  if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
114,164!
UNCOV
1559
    T_LONG_JMP(pTaskInfo->env, ret);
×
1560
  }
1561

1562
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
114,164✔
1563
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
114,164✔
1564
                                        pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
114,164✔
1565
  if (ret != TSDB_CODE_SUCCESS) {
114,164!
UNCOV
1566
    T_LONG_JMP(pTaskInfo->env, ret);
×
1567
  }
1568
}
114,164✔
1569

1570
static int32_t doSessionWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
197,637✔
1571
  if (pOperator->status == OP_EXEC_DONE) {
197,637✔
1572
    (*ppRes) = NULL;
79,577✔
1573
    return TSDB_CODE_SUCCESS;
79,577✔
1574
  }
1575

1576
  int32_t                  code = TSDB_CODE_SUCCESS;
118,060✔
1577
  int32_t                  lino = 0;
118,060✔
1578
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
118,060✔
1579
  SSessionAggOperatorInfo* pInfo = pOperator->info;
118,060✔
1580
  SOptrBasicInfo*          pBInfo = &pInfo->binfo;
118,060✔
1581
  SExprSupp*               pSup = &pOperator->exprSupp;
118,060✔
1582

1583
  pInfo->cleanGroupResInfo = false;
118,060✔
1584
  if (pOperator->status == OP_RES_TO_RETURN) {
118,060✔
UNCOV
1585
    while (1) {
×
1586
      doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
21,119✔
1587
      code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
21,119✔
1588
      QUERY_CHECK_CODE(code, lino, _end);
21,119!
1589

1590
      bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
21,119✔
1591
      if (!hasRemain) {
21,119✔
1592
        setOperatorCompleted(pOperator);
3,529✔
1593
        break;
3,529✔
1594
      }
1595

1596
      if (pBInfo->pRes->info.rows > 0) {
17,590!
1597
        break;
17,590✔
1598
      }
1599
    }
1600
    pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
21,119✔
1601
    (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
21,119!
1602
    return code;
21,119✔
1603
  }
1604

1605
  int64_t st = taosGetTimestampUs();
96,944✔
1606
  int32_t order = pInfo->binfo.inputTsOrder;
96,944✔
1607

1608
  SOperatorInfo* downstream = pOperator->pDownstream[0];
96,944✔
1609

1610
  while (1) {
114,164✔
1611
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
211,108✔
1612
    if (pBlock == NULL) {
211,113✔
1613
      break;
96,948✔
1614
    }
1615

1616
    pBInfo->pRes->info.scanFlag = pBlock->info.scanFlag;
114,165✔
1617
    if (pInfo->scalarSupp.pExprInfo != NULL) {
114,165✔
1618
      SExprSupp* pExprSup = &pInfo->scalarSupp;
3✔
1619
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
3✔
1620
      QUERY_CHECK_CODE(code, lino, _end);
3!
1621
    }
1622
    // the pDataBlock are always the same one, no need to call this again
1623
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
114,165✔
1624
    QUERY_CHECK_CODE(code, lino, _end);
114,164!
1625

1626
    code = blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
114,164✔
1627
    QUERY_CHECK_CODE(code, lino, _end);
114,164!
1628

1629
    doSessionWindowAggImpl(pOperator, pInfo, pBlock);
114,164✔
1630
  }
1631

1632
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
96,948✔
1633

1634
  // restore the value
1635
  pOperator->status = OP_RES_TO_RETURN;
96,948✔
1636

1637
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
96,948✔
1638
  QUERY_CHECK_CODE(code, lino, _end);
96,947!
1639
  pInfo->cleanGroupResInfo = true;
96,947✔
1640

1641
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
96,947✔
1642
  QUERY_CHECK_CODE(code, lino, _end);
96,949!
UNCOV
1643
  while (1) {
×
1644
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
96,949✔
1645
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
96,949✔
1646
    QUERY_CHECK_CODE(code, lino, _end);
96,949!
1647

1648
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
96,949✔
1649
    if (!hasRemain) {
96,948✔
1650
      setOperatorCompleted(pOperator);
93,419✔
1651
      break;
93,419✔
1652
    }
1653

1654
    if (pBInfo->pRes->info.rows > 0) {
3,529!
1655
      break;
3,529✔
1656
    }
1657
  }
1658
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
96,948✔
1659

1660
_end:
96,948✔
1661
  if (code != TSDB_CODE_SUCCESS) {
96,948!
1662
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1663
    pTaskInfo->code = code;
×
UNCOV
1664
    T_LONG_JMP(pTaskInfo->env, code);
×
1665
  }
1666
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
96,948✔
1667
  return code;
96,948✔
1668
}
1669

1670
// todo make this as an non-blocking operator
1671
int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode,
71,099✔
1672
                                      SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
1673
  QRY_PARAM_CHECK(pOptrInfo);
71,099!
1674

1675
  int32_t                   code = TSDB_CODE_SUCCESS;
71,099✔
1676
  int32_t                   lino = 0;
71,099✔
1677
  SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
71,099✔
1678
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
71,098✔
1679
  if (pInfo == NULL || pOperator == NULL) {
71,099!
1680
    code = terrno;
×
UNCOV
1681
    goto _error;
×
1682
  }
1683

1684
  pOperator->exprSupp.hasWindowOrGroup = true;
71,099✔
1685
  int32_t      tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
71,099✔
1686
  SColumnNode* pColNode = (SColumnNode*)(pStateNode->pStateKey);
71,099✔
1687

1688
  if (pStateNode->window.pExprs != NULL) {
71,099✔
1689
    int32_t    numOfScalarExpr = 0;
4,567✔
1690
    SExprInfo* pScalarExprInfo = NULL;
4,567✔
1691
    code = createExprInfo(pStateNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
4,567✔
1692
    QUERY_CHECK_CODE(code, lino, _error);
4,567!
1693

1694
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
4,567✔
1695
    if (code != TSDB_CODE_SUCCESS) {
4,567!
UNCOV
1696
      goto _error;
×
1697
    }
1698
  }
1699

1700
  pInfo->stateCol = extractColumnFromColumnNode(pColNode);
71,099✔
1701
  pInfo->stateKey.type = pInfo->stateCol.type;
71,099✔
1702
  pInfo->stateKey.bytes = pInfo->stateCol.bytes;
71,099✔
1703
  pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes);
71,099✔
1704
  if (pInfo->stateKey.pData == NULL) {
71,099!
UNCOV
1705
    goto _error;
×
1706
  }
1707
  pInfo->binfo.inputTsOrder = pStateNode->window.node.inputTsOrder;
71,099✔
1708
  pInfo->binfo.outputTsOrder = pStateNode->window.node.outputTsOrder;
71,099✔
1709

1710
  code = filterInitFromNode((SNode*)pStateNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
71,099✔
1711
  if (code != TSDB_CODE_SUCCESS) {
71,099!
UNCOV
1712
    goto _error;
×
1713
  }
1714

1715
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
71,099✔
1716

1717
  int32_t    num = 0;
71,099✔
1718
  SExprInfo* pExprInfo = NULL;
71,099✔
1719
  code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &num);
71,099✔
1720
  QUERY_CHECK_CODE(code, lino, _error);
71,098!
1721

1722
  initResultSizeInfo(&pOperator->resultInfo, 4096);
71,098✔
1723

1724
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
71,098✔
1725
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
71,098✔
1726
  if (code != TSDB_CODE_SUCCESS) {
71,099!
UNCOV
1727
    goto _error;
×
1728
  }
1729

1730
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pStateNode->window.node.pOutputDataBlockDesc);
71,099✔
1731
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
71,099!
1732
  initBasicInfo(&pInfo->binfo, pResBlock);
71,099✔
1733
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
71,099✔
1734

1735
  pInfo->twAggSup =
71,099✔
1736
      (STimeWindowAggSupp){.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};
71,099✔
1737

1738
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
71,099✔
1739
  QUERY_CHECK_CODE(code, lino, _error);
71,099!
1740

1741
  pInfo->tsSlotId = tsSlotId;
71,099✔
1742
  pInfo->pOperator = pOperator;
71,099✔
1743
  pInfo->cleanGroupResInfo = false;
71,099✔
1744
  setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
71,099✔
1745
                  pTaskInfo);
1746
  pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAggNext, NULL, destroyStateWindowOperatorInfo,
71,099✔
1747
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1748

1749
  code = appendDownstream(pOperator, &downstream, 1);
71,099✔
1750
  if (code != TSDB_CODE_SUCCESS) {
71,099!
UNCOV
1751
    goto _error;
×
1752
  }
1753

1754
  *pOptrInfo = pOperator;
71,099✔
1755
  return TSDB_CODE_SUCCESS;
71,099✔
1756

1757
_error:
×
1758
  if (pInfo != NULL) {
×
UNCOV
1759
    destroyStateWindowOperatorInfo(pInfo);
×
1760
  }
1761

1762
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1763
  pTaskInfo->code = code;
×
UNCOV
1764
  return code;
×
1765
}
1766

1767
void destroySWindowOperatorInfo(void* param) {
96,949✔
1768
  SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param;
96,949✔
1769
  if (pInfo == NULL) {
96,949!
UNCOV
1770
    return;
×
1771
  }
1772

1773
  cleanupBasicInfo(&pInfo->binfo);
96,949✔
1774
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
96,949✔
1775
  if (pInfo->pOperator) {
96,948!
1776
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
96,948✔
1777
                      pInfo->cleanGroupResInfo);
96,948✔
1778
    pInfo->pOperator = NULL;
96,949✔
1779
  }
1780

1781
  cleanupAggSup(&pInfo->aggSup);
96,949✔
1782
  cleanupExprSupp(&pInfo->scalarSupp);
96,949✔
1783

1784
  cleanupGroupResInfo(&pInfo->groupResInfo);
96,949✔
1785
  taosMemoryFreeClear(param);
96,949!
1786
}
1787

1788
int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
96,945✔
1789
                                     SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
1790
  QRY_PARAM_CHECK(pOptrInfo);
96,945!
1791

1792
  int32_t                  code = TSDB_CODE_SUCCESS;
96,945✔
1793
  int32_t                  lino = 0;
96,945✔
1794
  SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
96,945✔
1795
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
96,947✔
1796
  if (pInfo == NULL || pOperator == NULL) {
96,947!
1797
    code = terrno;
×
UNCOV
1798
    goto _error;
×
1799
  }
1800

1801
  pOperator->exprSupp.hasWindowOrGroup = true;
96,947✔
1802

1803
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
96,947✔
1804
  initResultSizeInfo(&pOperator->resultInfo, 4096);
96,947✔
1805

1806
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc);
96,948✔
1807
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
96,948!
1808
  initBasicInfo(&pInfo->binfo, pResBlock);
96,948✔
1809

1810
  int32_t      numOfCols = 0;
96,948✔
1811
  SExprInfo*   pExprInfo = NULL;
96,948✔
1812
  code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
96,948✔
1813
  QUERY_CHECK_CODE(code, lino, _error);
96,949!
1814

1815
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
96,949✔
1816
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
96,949✔
1817
  QUERY_CHECK_CODE(code, lino, _error);
96,948!
1818

1819
  pInfo->twAggSup.waterMark = pSessionNode->window.watermark;
96,948✔
1820
  pInfo->twAggSup.calTrigger = pSessionNode->window.triggerType;
96,948✔
1821
  pInfo->gap = pSessionNode->gap;
96,948✔
1822

1823
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
96,948✔
1824
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
96,949✔
1825
  QUERY_CHECK_CODE(code, lino, _error);
96,948!
1826

1827
  pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
96,948✔
1828
  pInfo->binfo.pRes = pResBlock;
96,948✔
1829
  pInfo->winSup.prevTs = INT64_MIN;
96,948✔
1830
  pInfo->reptScan = false;
96,948✔
1831
  pInfo->binfo.inputTsOrder = pSessionNode->window.node.inputTsOrder;
96,948✔
1832
  pInfo->binfo.outputTsOrder = pSessionNode->window.node.outputTsOrder;
96,948✔
1833

1834
  if (pSessionNode->window.pExprs != NULL) {
96,948✔
1835
    int32_t    numOfScalar = 0;
1✔
1836
    SExprInfo* pScalarExprInfo = NULL;
1✔
1837
    code = createExprInfo(pSessionNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
1✔
1838
    QUERY_CHECK_CODE(code, lino, _error);
1!
1839

1840
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
1✔
1841
    QUERY_CHECK_CODE(code, lino, _error);
1!
1842
  }
1843

1844
  code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
96,948✔
1845
  QUERY_CHECK_CODE(code, lino, _error);
96,945!
1846

1847
  pInfo->pOperator = pOperator;
96,945✔
1848
  pInfo->cleanGroupResInfo = false;
96,945✔
1849
  setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED,
96,945✔
1850
                  pInfo, pTaskInfo);
1851
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSessionWindowAggNext, NULL, destroySWindowOperatorInfo,
96,944✔
1852
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1853
  pOperator->pTaskInfo = pTaskInfo;
96,944✔
1854
  code = appendDownstream(pOperator, &downstream, 1);
96,944✔
1855
  QUERY_CHECK_CODE(code, lino, _error);
96,944!
1856

1857
  *pOptrInfo = pOperator;
96,944✔
1858
  return TSDB_CODE_SUCCESS;
96,944✔
1859

1860
_error:
×
1861
  if (pInfo != NULL) destroySWindowOperatorInfo(pInfo);
×
1862
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1863
  pTaskInfo->code = code;
×
UNCOV
1864
  return code;
×
1865
}
1866

1867
void destroyMAIOperatorInfo(void* param) {
534,413✔
1868
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = (SMergeAlignedIntervalAggOperatorInfo*)param;
534,413✔
1869
  destroyIntervalOperatorInfo(miaInfo->intervalAggOperatorInfo);
534,413✔
1870
  taosMemoryFreeClear(param);
534,418!
1871
}
534,420✔
1872

1873
static SResultRow* doSetSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, SAggSupporter* pSup) {
338,321✔
1874
  SResultRow* pResult = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
338,321✔
1875
  if (NULL == pResult) {
338,322!
UNCOV
1876
    return pResult;
×
1877
  }
1878
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
338,322✔
1879
  return pResult;
338,322✔
1880
}
1881

1882
static int32_t setSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult,
68,185,389✔
1883
                                       SExprSupp* pExprSup, SAggSupporter* pAggSup) {
1884
  if (*pResult == NULL) {
68,185,389✔
1885
    *pResult = doSetSingleOutputTupleBuf(pResultRowInfo, pAggSup);
338,321✔
1886
    if (*pResult == NULL) {
338,322!
UNCOV
1887
      return terrno;
×
1888
    }
1889
  }
1890

1891
  // set time window for current result
1892
  (*pResult)->win = (*win);
68,185,390✔
1893
  return setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
68,185,390✔
1894
}
1895

1896
static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,
429,394✔
1897
                                          SSDataBlock* pBlock, SSDataBlock* pResultBlock) {
1898
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
429,394✔
1899
  SIntervalAggOperatorInfo*             iaInfo = miaInfo->intervalAggOperatorInfo;
429,394✔
1900

1901
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
429,394✔
1902
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
429,394✔
1903
  SInterval*     pInterval = &iaInfo->interval;
429,394✔
1904

1905
  int32_t  startPos = 0;
429,394✔
1906
  int64_t* tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
429,394✔
1907

1908
  TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
429,394✔
1909

1910
  // there is an result exists
1911
  if (miaInfo->curTs != INT64_MIN) {
429,393✔
1912
    if (ts != miaInfo->curTs) {
69,555✔
1913
      finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
48,685✔
1914
      resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
48,684✔
1915
      miaInfo->curTs = ts;
48,684✔
1916
    }
1917
  } else {
1918
    miaInfo->curTs = ts;
359,838✔
1919
  }
1920

1921
  STimeWindow win = {0};
429,392✔
1922
  win.skey = miaInfo->curTs;
429,392✔
1923
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
429,392✔
1924

1925
  int32_t ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
429,395✔
1926
  if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
429,396!
UNCOV
1927
    T_LONG_JMP(pTaskInfo->env, ret);
×
1928
  }
1929

1930
  int32_t currPos = startPos;
429,396✔
1931

1932
  STimeWindow currWin = win;
429,396✔
1933
  while (++currPos < pBlock->info.rows) {
121,574,682✔
1934
    if (tsCols[currPos] == miaInfo->curTs) {
121,143,067✔
1935
      continue;
53,396,220✔
1936
    }
1937

1938
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
67,746,847✔
1939
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
67,748,620✔
1940
                                          currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
67,748,620✔
1941
    if (ret != TSDB_CODE_SUCCESS) {
67,763,460!
UNCOV
1942
      T_LONG_JMP(pTaskInfo->env, ret);
×
1943
    }
1944

1945
    finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
67,763,460✔
1946
    resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
67,691,615✔
1947
    miaInfo->curTs = tsCols[currPos];
67,717,925✔
1948

1949
    currWin.skey = miaInfo->curTs;
67,717,925✔
1950
    currWin.ekey = taosTimeAdd(currWin.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
67,717,925✔
1951

1952
    startPos = currPos;
67,751,924✔
1953
    ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
67,751,924✔
1954
    if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
67,747,126!
UNCOV
1955
      T_LONG_JMP(pTaskInfo->env, ret);
×
1956
    }
1957

1958
    miaInfo->curTs = currWin.skey;
67,749,066✔
1959
  }
1960

1961
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
431,615✔
1962
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
429,397✔
1963
                                        currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
429,397✔
1964
  if (ret != TSDB_CODE_SUCCESS) {
429,396!
UNCOV
1965
    T_LONG_JMP(pTaskInfo->env, ret);
×
1966
  }
1967
}
429,396✔
1968

1969
static void cleanupAfterGroupResultGen(SMergeAlignedIntervalAggOperatorInfo* pMiaInfo, SSDataBlock* pRes) {
359,502✔
1970
  pRes->info.id.groupId = pMiaInfo->groupId;
359,502✔
1971
  pMiaInfo->curTs = INT64_MIN;
359,502✔
1972
  pMiaInfo->groupId = 0;
359,502✔
1973
}
359,502✔
1974

1975
static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
625,083✔
1976
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
625,083✔
1977
  int32_t                               code = TSDB_CODE_SUCCESS;
625,083✔
1978
  int32_t                               lino = 0;
625,083✔
1979
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
625,083✔
1980
  SIntervalAggOperatorInfo*             pIaInfo = pMiaInfo->intervalAggOperatorInfo;
625,083✔
1981

1982
  SExprSupp*      pSup = &pOperator->exprSupp;
625,083✔
1983
  SSDataBlock*    pRes = pIaInfo->binfo.pRes;
625,083✔
1984
  SResultRowInfo* pResultRowInfo = &pIaInfo->binfo.resultRowInfo;
625,083✔
1985
  SOperatorInfo*  downstream = pOperator->pDownstream[0];
625,083✔
1986

1987
  while (1) {
359,907✔
1988
    SSDataBlock* pBlock = NULL;
984,990✔
1989
    if (pMiaInfo->prefetchedBlock == NULL) {
984,990✔
1990
      pBlock = getNextBlockFromDownstream(pOperator, 0);
963,471✔
1991
    } else {
1992
      pBlock = pMiaInfo->prefetchedBlock;
21,519✔
1993
      pMiaInfo->prefetchedBlock = NULL;
21,519✔
1994

1995
      pMiaInfo->groupId = pBlock->info.id.groupId;
21,519✔
1996
    }
1997

1998
    // no data exists, all query processing is done
1999
    if (pBlock == NULL) {
984,988✔
2000
      // close last unclosed time window
2001
      if (pMiaInfo->curTs != INT64_MIN) {
534,077✔
2002
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
337,983✔
2003
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
337,984✔
2004
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
337,983✔
2005
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
337,984✔
2006
        QUERY_CHECK_CODE(code, lino, _end);
337,984!
2007
      }
2008

2009
      setOperatorCompleted(pOperator);
534,078✔
2010
      break;
534,079✔
2011
    }
2012

2013
    if (pMiaInfo->groupId == 0) {
450,911✔
2014
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
399,944✔
2015
        pMiaInfo->groupId = pBlock->info.id.groupId;
2,236✔
2016
        pRes->info.id.groupId = pMiaInfo->groupId;
2,236✔
2017
      }
2018
    } else {
2019
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
50,967✔
2020
        // if there are unclosed time window, close it firstly.
2021
        if (pMiaInfo->curTs == INT64_MIN) {
21,519!
2022
          pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
UNCOV
2023
          T_LONG_JMP(pTaskInfo->env, terrno);
×
2024
        }
2025
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
21,519✔
2026
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
21,519✔
2027

2028
        pMiaInfo->prefetchedBlock = pBlock;
21,519✔
2029
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
21,519✔
2030
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
21,519✔
2031
        QUERY_CHECK_CODE(code, lino, _end);
21,519!
2032
        if (pRes->info.rows == 0) {
21,519✔
2033
          // After filtering for last group, the result is empty, so we need to continue to process next group
2034
          continue;
34✔
2035
        } else {
2036
          break;
21,485✔
2037
        }
2038
      } else {
2039
        // continue
2040
        pRes->info.id.groupId = pMiaInfo->groupId;
29,448✔
2041
      }
2042
    }
2043

2044
    pRes->info.scanFlag = pBlock->info.scanFlag;
429,392✔
2045
    code = setInputDataBlock(pSup, pBlock, pIaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
429,392✔
2046
    QUERY_CHECK_CODE(code, lino, _end);
429,395!
2047

2048
    doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes);
429,395✔
2049

2050
    code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
429,396✔
2051
    QUERY_CHECK_CODE(code, lino, _end);
429,398!
2052

2053
    if (pRes->info.rows >= pOperator->resultInfo.capacity) {
429,398✔
2054
      break;
69,525✔
2055
    }
2056
  }
2057

2058
_end:
625,089✔
2059
  if (code != TSDB_CODE_SUCCESS) {
625,089!
2060
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2061
    pTaskInfo->code = code;
×
UNCOV
2062
    T_LONG_JMP(pTaskInfo->env, code);
×
2063
  }
2064
}
625,089✔
2065

2066
static int32_t mergeAlignedIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,241,748✔
2067
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
1,241,748✔
2068
  int32_t                               code = TSDB_CODE_SUCCESS;
1,241,748✔
2069
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
1,241,748✔
2070
  SIntervalAggOperatorInfo*             iaInfo = pMiaInfo->intervalAggOperatorInfo;
1,241,748✔
2071
  if (pOperator->status == OP_EXEC_DONE) {
1,241,748✔
2072
    (*ppRes) = NULL;
621,694✔
2073
    return code;
621,694✔
2074
  }
2075

2076
  SSDataBlock* pRes = iaInfo->binfo.pRes;
620,054✔
2077
  blockDataCleanup(pRes);
620,054✔
2078

2079
  if (iaInfo->binfo.mergeResultBlock) {
620,058✔
2080
    while (1) {
2081
      if (pOperator->status == OP_EXEC_DONE) {
506,052✔
2082
        break;
194,082✔
2083
      }
2084

2085
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
311,970✔
2086
        break;
56,431✔
2087
      }
2088

2089
      doMergeAlignedIntervalAgg(pOperator);
255,539✔
2090
    }
2091
  } else {
2092
    doMergeAlignedIntervalAgg(pOperator);
369,546✔
2093
  }
2094

2095
  size_t rows = pRes->info.rows;
620,061✔
2096
  pOperator->resultInfo.totalRows += rows;
620,061✔
2097
  (*ppRes) = (rows == 0) ? NULL : pRes;
620,061✔
2098
  return code;
620,061✔
2099
}
2100

2101
int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
534,415✔
2102
                                                      SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2103
  QRY_PARAM_CHECK(pOptrInfo);
534,415!
2104

2105
  int32_t                               code = TSDB_CODE_SUCCESS;
534,415✔
2106
  int32_t                               lino = 0;
534,415✔
2107
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedIntervalAggOperatorInfo));
534,415✔
2108
  SOperatorInfo*                        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
534,419✔
2109
  if (miaInfo == NULL || pOperator == NULL) {
534,415!
2110
    code = terrno;
×
UNCOV
2111
    goto _error;
×
2112
  }
2113

2114
  miaInfo->intervalAggOperatorInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
534,415✔
2115
  if (miaInfo->intervalAggOperatorInfo == NULL) {
534,413!
2116
    code = terrno;
×
UNCOV
2117
    goto _error;
×
2118
  }
2119

2120
  SInterval interval = {.interval = pNode->interval,
534,413✔
2121
                        .sliding = pNode->sliding,
534,413✔
2122
                        .intervalUnit = pNode->intervalUnit,
534,413✔
2123
                        .slidingUnit = pNode->slidingUnit,
534,413✔
2124
                        .offset = pNode->offset,
534,413✔
2125
                        .precision = ((SColumnNode*)pNode->window.pTspk)->node.resType.precision};
534,413✔
2126

2127
  SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
534,413✔
2128
  SExprSupp*                pSup = &pOperator->exprSupp;
534,413✔
2129
  pSup->hasWindowOrGroup = true;
534,413✔
2130

2131
  code = filterInitFromNode((SNode*)pNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
534,413✔
2132
  QUERY_CHECK_CODE(code, lino, _error);
534,417!
2133

2134
  miaInfo->curTs = INT64_MIN;
534,417✔
2135
  iaInfo->win = pTaskInfo->window;
534,417✔
2136
  iaInfo->binfo.inputTsOrder = pNode->window.node.inputTsOrder;
534,417✔
2137
  iaInfo->binfo.outputTsOrder = pNode->window.node.outputTsOrder;
534,417✔
2138
  iaInfo->interval = interval;
534,417✔
2139
  iaInfo->primaryTsIndex = ((SColumnNode*)pNode->window.pTspk)->slotId;
534,417✔
2140
  iaInfo->binfo.mergeResultBlock = pNode->window.mergeDataBlock;
534,417✔
2141

2142
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
534,417✔
2143
  initResultSizeInfo(&pOperator->resultInfo, 512);
534,417✔
2144

2145
  int32_t    num = 0;
534,417✔
2146
  SExprInfo* pExprInfo = NULL;
534,417✔
2147
  code = createExprInfo(pNode->window.pFuncs, NULL, &pExprInfo, &num);
534,417✔
2148
  QUERY_CHECK_CODE(code, lino, _error);
534,417!
2149

2150
  code = initAggSup(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
534,417✔
2151
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
534,417✔
2152
  QUERY_CHECK_CODE(code, lino, _error);
534,419!
2153

2154
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pNode->window.node.pOutputDataBlockDesc);
534,419✔
2155
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
534,420!
2156
  initBasicInfo(&iaInfo->binfo, pResBlock);
534,420✔
2157
  code = initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
534,419✔
2158
  QUERY_CHECK_CODE(code, lino, _error);
534,417!
2159

2160
  iaInfo->timeWindowInterpo = false;
534,417✔
2161
  code = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo, &iaInfo->timeWindowInterpo);
534,417✔
2162
  QUERY_CHECK_CODE(code, lino, _error);
534,410!
2163
  if (iaInfo->timeWindowInterpo) {
534,410!
UNCOV
2164
    iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
2165
  }
2166

2167
  initResultRowInfo(&iaInfo->binfo.resultRowInfo);
534,410✔
2168
  code = blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity);
534,411✔
2169
  QUERY_CHECK_CODE(code, lino, _error);
534,419!
2170
  iaInfo->pOperator = pOperator;
534,419✔
2171
  setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL,
534,419✔
2172
                  false, OP_NOT_OPENED, miaInfo, pTaskInfo);
2173

2174
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAggNext, NULL, destroyMAIOperatorInfo,
534,420✔
2175
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2176

2177
  code = appendDownstream(pOperator, &downstream, 1);
534,419✔
2178
  QUERY_CHECK_CODE(code, lino, _error);
534,419!
2179

2180
  *pOptrInfo = pOperator;
534,419✔
2181
  return TSDB_CODE_SUCCESS;
534,419✔
2182

2183
_error:
×
2184
  if (miaInfo != NULL) destroyMAIOperatorInfo(miaInfo);
×
2185
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2186
  pTaskInfo->code = code;
×
UNCOV
2187
  return code;
×
2188
}
2189

2190
//=====================================================================================================================
2191
// merge interval operator
2192
typedef struct SMergeIntervalAggOperatorInfo {
2193
  SIntervalAggOperatorInfo intervalAggOperatorInfo;
2194
  SList*                   groupIntervals;
2195
  SListIter                groupIntervalsIter;
2196
  bool                     hasGroupId;
2197
  uint64_t                 groupId;
2198
  SSDataBlock*             prefetchedBlock;
2199
  bool                     inputBlocksFinished;
2200
} SMergeIntervalAggOperatorInfo;
2201

2202
typedef struct SGroupTimeWindow {
2203
  uint64_t    groupId;
2204
  STimeWindow window;
2205
} SGroupTimeWindow;
2206

2207
void destroyMergeIntervalOperatorInfo(void* param) {
×
2208
  SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
×
2209
  miaInfo->groupIntervals = tdListFree(miaInfo->groupIntervals);
×
UNCOV
2210
  destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo);
×
2211

2212
  taosMemoryFreeClear(param);
×
UNCOV
2213
}
×
2214

UNCOV
2215
static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock,
×
2216
                                        STimeWindow* newWin) {
2217
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
2218
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
UNCOV
2219
  bool                           ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
2220

2221
  SGroupTimeWindow groupTimeWindow = {.groupId = tableGroupId, .window = *newWin};
×
2222
  int32_t          code = tdListAppend(miaInfo->groupIntervals, &groupTimeWindow);
×
2223
  if (code != TSDB_CODE_SUCCESS) {
×
2224
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
2225
    return code;
×
2226
  }
2227

2228
  SListIter iter = {0};
×
2229
  tdListInitIter(miaInfo->groupIntervals, &iter, TD_LIST_FORWARD);
×
2230
  SListNode* listNode = NULL;
×
2231
  while ((listNode = tdListNext(&iter)) != NULL) {
×
2232
    SGroupTimeWindow* prevGrpWin = (SGroupTimeWindow*)listNode->data;
×
2233
    if (prevGrpWin->groupId != tableGroupId) {
×
UNCOV
2234
      continue;
×
2235
    }
2236

2237
    STimeWindow* prevWin = &prevGrpWin->window;
×
2238
    if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) {
×
2239
      SListNode* tmp = tdListPopNode(miaInfo->groupIntervals, listNode);
×
UNCOV
2240
      taosMemoryFreeClear(tmp);
×
2241
    }
2242
  }
2243

UNCOV
2244
  return TSDB_CODE_SUCCESS;
×
2245
}
2246

UNCOV
2247
static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
×
2248
                                   int32_t scanFlag, SSDataBlock* pResultBlock) {
2249
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
UNCOV
2250
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2251

2252
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
×
UNCOV
2253
  SExprSupp*     pExprSup = &pOperatorInfo->exprSupp;
×
2254

2255
  int32_t     startPos = 0;
×
2256
  int32_t     numOfOutput = pExprSup->numOfExprs;
×
2257
  int64_t*    tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
×
2258
  uint64_t    tableGroupId = pBlock->info.id.groupId;
×
2259
  bool        ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
2260
  TSKEY       blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
×
UNCOV
2261
  SResultRow* pResult = NULL;
×
2262

UNCOV
2263
  STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
×
2264
                                        iaInfo->binfo.inputTsOrder);
2265

2266
  int32_t ret =
UNCOV
2267
      setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
2268
                             numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2269
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
×
UNCOV
2270
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
×
2271
  }
2272

2273
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
×
UNCOV
2274
  int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
2275
                                                 iaInfo->binfo.inputTsOrder);
2276
  if(forwardRows <= 0) {
×
UNCOV
2277
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2278
  }
2279

2280
  // prev time window not interpolation yet.
2281
  if (iaInfo->timeWindowInterpo) {
×
2282
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
×
UNCOV
2283
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
×
2284

2285
    // restore current time window
UNCOV
2286
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
2287
                                 numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2288
    if (ret != TSDB_CODE_SUCCESS) {
×
UNCOV
2289
      T_LONG_JMP(pTaskInfo->env, ret);
×
2290
    }
2291

2292
    // window start key interpolation
2293
    ret = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &win, startPos, forwardRows, pExprSup);
×
2294
    if (ret != TSDB_CODE_SUCCESS) {
×
UNCOV
2295
      T_LONG_JMP(pTaskInfo->env, ret);
×
2296
    }
2297
  }
2298

2299
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, 1);
×
2300
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,
×
2301
                                  pBlock->info.rows, numOfOutput);
×
2302
  if (ret != TSDB_CODE_SUCCESS) {
×
UNCOV
2303
    T_LONG_JMP(pTaskInfo->env, ret);
×
2304
  }
UNCOV
2305
  doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
2306

2307
  // output previous interval results after this interval (&win) is closed
2308
  int32_t code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &win);
×
2309
  if (code != TSDB_CODE_SUCCESS) {
×
2310
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
2311
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
×
2312
  }
2313

2314
  STimeWindow nextWin = win;
×
2315
  while (1) {
×
2316
    int32_t prevEndPos = forwardRows - 1 + startPos;
×
UNCOV
2317
    startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos,
×
2318
                                      iaInfo->binfo.inputTsOrder);
2319
    if (startPos < 0) {
×
UNCOV
2320
      break;
×
2321
    }
2322

2323
    // null data, failed to allocate more memory buffer
2324
    code =
UNCOV
2325
        setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
×
2326
                               pExprSup->pCtx, numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2327
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
×
UNCOV
2328
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
×
2329
    }
2330

2331
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
×
UNCOV
2332
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
2333
                                           iaInfo->binfo.inputTsOrder);
2334

2335
    // window start(end) key interpolation
2336
    code = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup);
×
2337
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
2338
      T_LONG_JMP(pTaskInfo->env, code);
×
2339
    }
2340

2341
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, 1);
×
2342
    code = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
×
2343
                                           forwardRows, pBlock->info.rows, numOfOutput);
×
2344
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
2345
      T_LONG_JMP(pTaskInfo->env, code);
×
2346
    }
UNCOV
2347
    doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
2348

2349
    // output previous interval results after this interval (&nextWin) is closed
2350
    code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &nextWin);
×
2351
    if (code != TSDB_CODE_SUCCESS) {
×
2352
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
2353
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
×
2354
    }
2355
  }
2356

2357
  if (iaInfo->timeWindowInterpo) {
×
UNCOV
2358
    saveDataBlockLastRow(iaInfo->pPrevValues, pBlock, iaInfo->pInterpCols);
×
2359
  }
UNCOV
2360
}
×
2361

2362
static int32_t doMergeIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
2363
  int32_t        code = TSDB_CODE_SUCCESS;
×
2364
  int32_t        lino = 0;
×
UNCOV
2365
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
×
2366

2367
  SMergeIntervalAggOperatorInfo* miaInfo = pOperator->info;
×
2368
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
UNCOV
2369
  SExprSupp*                     pExpSupp = &pOperator->exprSupp;
×
2370

2371
  if (pOperator->status == OP_EXEC_DONE) {
×
2372
    (*ppRes) = NULL;
×
UNCOV
2373
    return code;
×
2374
  }
2375

2376
  SSDataBlock* pRes = iaInfo->binfo.pRes;
×
2377
  blockDataCleanup(pRes);
×
2378
  code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
×
UNCOV
2379
  QUERY_CHECK_CODE(code, lino, _end);
×
2380

2381
  if (!miaInfo->inputBlocksFinished) {
×
2382
    SOperatorInfo* downstream = pOperator->pDownstream[0];
×
2383
    while (1) {
×
2384
      SSDataBlock* pBlock = NULL;
×
2385
      if (miaInfo->prefetchedBlock == NULL) {
×
UNCOV
2386
        pBlock = getNextBlockFromDownstream(pOperator, 0);
×
2387
      } else {
2388
        pBlock = miaInfo->prefetchedBlock;
×
2389
        miaInfo->groupId = pBlock->info.id.groupId;
×
UNCOV
2390
        miaInfo->prefetchedBlock = NULL;
×
2391
      }
2392

2393
      if (pBlock == NULL) {
×
2394
        tdListInitIter(miaInfo->groupIntervals, &miaInfo->groupIntervalsIter, TD_LIST_FORWARD);
×
2395
        miaInfo->inputBlocksFinished = true;
×
UNCOV
2396
        break;
×
2397
      }
2398

2399
      if (!miaInfo->hasGroupId) {
×
2400
        miaInfo->hasGroupId = true;
×
2401
        miaInfo->groupId = pBlock->info.id.groupId;
×
2402
      } else if (miaInfo->groupId != pBlock->info.id.groupId) {
×
2403
        miaInfo->prefetchedBlock = pBlock;
×
UNCOV
2404
        break;
×
2405
      }
2406

2407
      pRes->info.scanFlag = pBlock->info.scanFlag;
×
2408
      code = setInputDataBlock(pExpSupp, pBlock, iaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
×
UNCOV
2409
      QUERY_CHECK_CODE(code, lino, _end);
×
2410

UNCOV
2411
      doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, pBlock->info.scanFlag, pRes);
×
2412

2413
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
×
UNCOV
2414
        break;
×
2415
      }
2416
    }
2417

UNCOV
2418
    pRes->info.id.groupId = miaInfo->groupId;
×
2419
  }
2420

2421
  if (miaInfo->inputBlocksFinished) {
×
UNCOV
2422
    SListNode* listNode = tdListNext(&miaInfo->groupIntervalsIter);
×
2423

2424
    if (listNode != NULL) {
×
2425
      SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data);
×
UNCOV
2426
      pRes->info.id.groupId = grpWin->groupId;
×
2427
    }
2428
  }
2429

2430
  if (pRes->info.rows == 0) {
×
UNCOV
2431
    setOperatorCompleted(pOperator);
×
2432
  }
2433

2434
  size_t rows = pRes->info.rows;
×
UNCOV
2435
  pOperator->resultInfo.totalRows += rows;
×
2436

2437
_end:
×
2438
  if (code != TSDB_CODE_SUCCESS) {
×
2439
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2440
    pTaskInfo->code = code;
×
UNCOV
2441
    T_LONG_JMP(pTaskInfo->env, code);
×
2442
  }
2443
  (*ppRes) = (rows == 0) ? NULL : pRes;
×
UNCOV
2444
  return code;
×
2445
}
2446

UNCOV
2447
int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
×
2448
                                               SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
UNCOV
2449
  QRY_PARAM_CHECK(pOptrInfo);
×
2450

2451
  int32_t                        code = TSDB_CODE_SUCCESS;
×
2452
  int32_t                        lino = 0;
×
2453
  SMergeIntervalAggOperatorInfo* pMergeIntervalInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo));
×
2454
  SOperatorInfo*                 pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
2455
  if (pMergeIntervalInfo == NULL || pOperator == NULL) {
×
2456
    code = terrno;
×
UNCOV
2457
    goto _error;
×
2458
  }
2459

2460
  SInterval interval = {.interval = pIntervalPhyNode->interval,
×
2461
                        .sliding = pIntervalPhyNode->sliding,
×
2462
                        .intervalUnit = pIntervalPhyNode->intervalUnit,
×
2463
                        .slidingUnit = pIntervalPhyNode->slidingUnit,
×
2464
                        .offset = pIntervalPhyNode->offset,
×
UNCOV
2465
                        .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
×
2466

UNCOV
2467
  pMergeIntervalInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow));
×
2468

2469
  SIntervalAggOperatorInfo* pIntervalInfo = &pMergeIntervalInfo->intervalAggOperatorInfo;
×
2470
  pIntervalInfo->win = pTaskInfo->window;
×
2471
  pIntervalInfo->binfo.inputTsOrder = pIntervalPhyNode->window.node.inputTsOrder;
×
2472
  pIntervalInfo->interval = interval;
×
2473
  pIntervalInfo->binfo.mergeResultBlock = pIntervalPhyNode->window.mergeDataBlock;
×
2474
  pIntervalInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
×
UNCOV
2475
  pIntervalInfo->binfo.outputTsOrder = pIntervalPhyNode->window.node.outputTsOrder;
×
2476

2477
  SExprSupp* pExprSupp = &pOperator->exprSupp;
×
UNCOV
2478
  pExprSupp->hasWindowOrGroup = true;
×
2479

2480
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
×
UNCOV
2481
  initResultSizeInfo(&pOperator->resultInfo, 4096);
×
2482

2483
  int32_t    num = 0;
×
2484
  SExprInfo* pExprInfo = NULL;
×
2485
  code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
×
UNCOV
2486
  QUERY_CHECK_CODE(code, lino, _error);
×
2487

2488
  code = initAggSup(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
×
2489
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
×
2490
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
2491
    goto _error;
×
2492
  }
2493

2494
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pIntervalPhyNode->window.node.pOutputDataBlockDesc);
×
2495
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
×
2496
  initBasicInfo(&pIntervalInfo->binfo, pResBlock);
×
2497
  code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win);
×
UNCOV
2498
  QUERY_CHECK_CODE(code, lino, _error);
×
2499

2500
  pIntervalInfo->timeWindowInterpo = false;
×
2501
  code = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo, &pIntervalInfo->timeWindowInterpo);
×
2502
  QUERY_CHECK_CODE(code, lino, _error);
×
2503
  if (pIntervalInfo->timeWindowInterpo) {
×
2504
    pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
2505
    if (pIntervalInfo->binfo.resultRowInfo.openWindow == NULL) {
×
UNCOV
2506
      goto _error;
×
2507
    }
2508
  }
2509

2510
  pIntervalInfo->pOperator = pOperator;
×
2511
  initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo);
×
UNCOV
2512
  setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false,
×
2513
                  OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo);
UNCOV
2514
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAggNext, NULL, destroyMergeIntervalOperatorInfo,
×
2515
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2516

2517
  code = appendDownstream(pOperator, &downstream, 1);
×
2518
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
2519
    goto _error;
×
2520
  }
2521

2522
  *pOptrInfo = pOperator;
×
2523
  return TSDB_CODE_SUCCESS;
×
2524
_error:
×
2525
  if (pMergeIntervalInfo != NULL) {
×
UNCOV
2526
    destroyMergeIntervalOperatorInfo(pMergeIntervalInfo);
×
2527
  }
2528
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2529
  pTaskInfo->code = code;
×
UNCOV
2530
  return code;
×
2531
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc