• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4867

26 Nov 2025 05:46AM UTC coverage: 64.473% (-0.03%) from 64.504%
#4867

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

765 of 945 new or added lines in 33 files covered. (80.95%)

3147 existing lines in 126 files now uncovered.

158042 of 245129 relevant lines covered (64.47%)

111495961.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.35
/source/libs/executor/src/externalwindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "operator.h"
18
#include "querytask.h"
19
#include "tdatablock.h"
20
#include "stream.h"
21
#include "filter.h"
22
#include "cmdnodes.h"
23

24
typedef struct SBlockList {
25
  const SSDataBlock* pSrcBlock;
26
  SList*             pBlocks;
27
  int32_t            blockRowNumThreshold;
28
} SBlockList;
29

30

31
typedef int32_t (*extWinGetWinFp)(SOperatorInfo*, int64_t*, int32_t*, SDataBlockInfo*, SExtWinTimeWindow**, int32_t*);
32

33
typedef struct SExtWindowStat {
34
  int64_t resBlockCreated;
35
  int64_t resBlockDestroyed;
36
  int64_t resBlockRecycled;
37
  int64_t resBlockReused;
38
  int64_t resBlockAppend;
39
} SExtWindowStat;
40

41
typedef struct SExternalWindowOperator {
42
  SOptrBasicInfo     binfo;
43
  SExprSupp          scalarSupp;
44
  int32_t            primaryTsIndex;
45
  EExtWinMode        mode;
46
  bool               multiTableMode;
47
  bool               inputHasOrder;
48
  SArray*            pWins;           // SArray<SExtWinTimeWindow>
49
  SArray*            pPseudoColInfo;  
50
  STimeRangeNode*    timeRangeExpr;
51
  
52
  extWinGetWinFp     getWinFp;
53

54
  bool               blkWinStartSet;
55
  int32_t            blkWinStartIdx;
56
  int32_t            blkWinIdx;
57
  int32_t            blkRowStartIdx;
58
  int32_t            outputWinId;
59
  int32_t            outputWinNum;
60
  int32_t            outWinIdx;
61

62
  // for project&indefRows
63
  SList*             pFreeBlocks;    // SList<SSDatablock*+SAarray*>
64
  SArray*            pOutputBlocks;  // SArray<SList*>, for each window, we have a list of blocks
65
  SListNode*         pLastBlkNode; 
66
  SSDataBlock*       pTmpBlock;
67
  
68
  // for agg
69
  SAggSupporter      aggSup;
70
  STimeWindowAggSupp twAggSup;
71

72
  int32_t            resultRowCapacity;
73
  SResultRow*        pResultRow;
74

75
  int64_t            lastSKey;
76
  int32_t            lastWinId;
77
  SSDataBlock*       pEmptyInputBlock;
78
  bool               hasCountFunc;
79
  SExtWindowStat     stat;
80
  SArray*            pWinRowIdx;
81
} SExternalWindowOperator;
82

83

84
static int32_t extWinBlockListAddBlock(SExternalWindowOperator* pExtW, SList* pList, int32_t rows, SSDataBlock** ppBlock, SArray** ppIdx) {
836✔
85
  SSDataBlock* pRes = NULL;
836✔
86
  int32_t code = 0, lino = 0;
836✔
87

88
  if (listNEles(pExtW->pFreeBlocks) > 0) {
836✔
89
    SListNode* pNode = tdListPopHead(pExtW->pFreeBlocks);
×
90
    *ppBlock = *(SSDataBlock**)pNode->data;
×
91
    *ppIdx = *(SArray**)((SArray**)pNode->data + 1);
×
92
    tdListAppendNode(pList, pNode);
×
93
    pExtW->stat.resBlockReused++;
×
94
  } else {
95
    TAOS_CHECK_EXIT(createOneDataBlock(pExtW->binfo.pRes, false, &pRes));
836✔
96
    TAOS_CHECK_EXIT(blockDataEnsureCapacity(pRes, TMAX(rows, 4096)));
836✔
97
    SArray* pIdx = taosArrayInit(10, sizeof(int64_t));
836✔
98
    TSDB_CHECK_NULL(pIdx, code, lino, _exit, terrno);
836✔
99
    void* res[2] = {pRes, pIdx};
836✔
100
    TAOS_CHECK_EXIT(tdListAppend(pList, res));
836✔
101

102
    *ppBlock = pRes;
836✔
103
    *ppIdx = pIdx;
836✔
104
    pExtW->stat.resBlockCreated++;
836✔
105
  }
106
  
107
_exit:
836✔
108

109
  if (code) {
836✔
110
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
111
    blockDataDestroy(pRes);
×
112
  }
113
  
114
  return code;
836✔
115
}
116

117
static int32_t extWinGetLastBlockFromList(SExternalWindowOperator* pExtW, SList* pList, int32_t rows, SSDataBlock** ppBlock, SArray** ppIdx) {
2,090✔
118
  int32_t    code = 0, lino = 0;
2,090✔
119
  SSDataBlock* pRes = NULL;
2,090✔
120

121
  SListNode* pNode = TD_DLIST_TAIL(pList);
2,090✔
122
  if (NULL == pNode) {
2,090✔
123
    TAOS_CHECK_EXIT(extWinBlockListAddBlock(pExtW, pList, rows, ppBlock, ppIdx));
836✔
124
    return code;
836✔
125
  }
126

127
  pRes = *(SSDataBlock**)pNode->data;
1,254✔
128
  if ((pRes->info.rows + rows) > pRes->info.capacity) {
1,254✔
129
    TAOS_CHECK_EXIT(extWinBlockListAddBlock(pExtW, pList, rows, ppBlock, ppIdx));
×
130
    return code;
×
131
  }
132

133
  *ppIdx = *(SArray**)((SSDataBlock**)pNode->data + 1);
1,254✔
134
  *ppBlock = pRes;
1,254✔
135
  pExtW->stat.resBlockAppend++;
1,254✔
136

137
_exit:
1,254✔
138

139
  if (code) {
1,254✔
140
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
141
  }
142
  
143
  return code;
1,254✔
144
}
145

146
static void extWinDestroyBlockList(void* p) {
17,121,280✔
147
  if (NULL == p) {
17,121,280✔
148
    return;
×
149
  }
150

151
  SListNode* pTmp = NULL;
17,121,280✔
152
  SList** ppList = (SList**)p;
17,121,280✔
153
  if ((*ppList) && TD_DLIST_NELES(*ppList) > 0) {
17,121,280✔
154
    SListNode* pNode = TD_DLIST_HEAD(*ppList);
×
155
    while (pNode) {
×
156
      SSDataBlock* pBlock = *(SSDataBlock**)pNode->data;
×
157
      blockDataDestroy(pBlock);
×
158
      SArray* pIdx = *(SArray**)((SArray**)pNode->data + 1);
×
159
      taosArrayDestroy(pIdx);
×
160
      pTmp = pNode;
×
161
      pNode = pNode->dl_next_;
×
162
      taosMemoryFree(pTmp);
×
163
    }
164
  }
165
  taosMemoryFree(*ppList);
17,121,280✔
166
}
167

168

169
static void extWinRecycleBlkNode(SExternalWindowOperator* pExtW, SListNode** ppNode) {
237,770✔
170
  if (NULL == ppNode || NULL == *ppNode) {
237,770✔
171
    return;
238,664✔
172
  }
173

UNCOV
174
  SSDataBlock* pBlock = *(SSDataBlock**)(*ppNode)->data;
×
175
  SArray* pIdx = *(SArray**)((SArray**)(*ppNode)->data + 1);
418✔
176
  
177
  if (listNEles(pExtW->pFreeBlocks) >= 10) {
418✔
178
    blockDataDestroy(pBlock);
×
179
    taosArrayDestroy(pIdx);
×
180
    taosMemoryFreeClear(*ppNode);
×
181
    pExtW->stat.resBlockDestroyed++;
×
182
    return;
×
183
  }
184
  
185
  blockDataCleanup(pBlock);
418✔
186
  taosArrayClear(pIdx);
418✔
187
  tdListPrependNode(pExtW->pFreeBlocks, *ppNode);
418✔
188
  *ppNode = NULL;
418✔
189
  pExtW->stat.resBlockRecycled++;
418✔
190
}
191

192
static void extWinRecycleBlockList(SExternalWindowOperator* pExtW, void* p) {
836✔
193
  if (NULL == p) {
836✔
194
    return;
×
195
  }
196

197
  SListNode* pTmp = NULL;
836✔
198
  SList** ppList = (SList**)p;
836✔
199
  if ((*ppList) && TD_DLIST_NELES(*ppList) > 0) {
836✔
200
    SListNode* pNode = TD_DLIST_HEAD(*ppList);
×
201
    while (pNode) {
×
202
      pTmp = pNode;
×
203
      pNode = pNode->dl_next_;
×
204
      extWinRecycleBlkNode(pExtW, &pTmp);
×
205
    }
206
  }
207
  taosMemoryFree(*ppList);
836✔
208
}
209
static void extWinDestroyBlkNode(SExternalWindowOperator* pInfo, SListNode* pNode) {
69,727✔
210
  if (NULL == pNode) {
69,727✔
211
    return;
68,891✔
212
  }
213

214
  SSDataBlock* pBlock = *(SSDataBlock**)pNode->data;
836✔
215
  SArray* pIdx = *(SArray**)((SArray**)pNode->data + 1);
836✔
216
  
217
  blockDataDestroy(pBlock);
836✔
218
  taosArrayDestroy(pIdx);
836✔
219

220
  taosMemoryFree(pNode);
836✔
221

222
  pInfo->stat.resBlockDestroyed++;
836✔
223
}
224

225

226
void destroyExternalWindowOperatorInfo(void* param) {
69,309✔
227
  if (NULL == param) {
69,309✔
228
    return;
×
229
  }
230
  SExternalWindowOperator* pInfo = (SExternalWindowOperator*)param;
69,309✔
231
  cleanupBasicInfo(&pInfo->binfo);
69,309✔
232

233
  taosArrayDestroyEx(pInfo->pOutputBlocks, extWinDestroyBlockList);
69,309✔
234
  taosArrayDestroy(pInfo->pWins);
69,309✔
235
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
69,309✔
236
  taosArrayDestroy(pInfo->pWinRowIdx);
69,309✔
237
  
238
  taosArrayDestroy(pInfo->pPseudoColInfo);
69,309✔
239
  blockDataDestroy(pInfo->pTmpBlock);
69,309✔
240
  blockDataDestroy(pInfo->pEmptyInputBlock);
69,309✔
241

242
  extWinDestroyBlkNode(pInfo, pInfo->pLastBlkNode);
69,309✔
243
  if (pInfo->pFreeBlocks) {
69,309✔
244
    SListNode *node;
245
    while ((node = TD_DLIST_HEAD(pInfo->pFreeBlocks)) != NULL) {
836✔
246
      TD_DLIST_POP(pInfo->pFreeBlocks, node);
418✔
247
      extWinDestroyBlkNode(pInfo, node);
418✔
248
    }
249
    taosMemoryFree(pInfo->pFreeBlocks);
418✔
250
  }
251
  
252
  cleanupAggSup(&pInfo->aggSup);
69,309✔
253
  cleanupExprSupp(&pInfo->scalarSupp);
69,309✔
254
  taosMemoryFreeClear(pInfo->pResultRow);
69,309✔
255

256
  pInfo->binfo.resultRowInfo.openWindow = tdListFree(pInfo->binfo.resultRowInfo.openWindow);
69,309✔
257

258
  qDebug("ext window stat at destroy, created:%" PRId64 ", destroyed:%" PRId64 ", recycled:%" PRId64 ", reused:%" PRId64 ", append:%" PRId64, 
69,309✔
259
      pInfo->stat.resBlockCreated, pInfo->stat.resBlockDestroyed, pInfo->stat.resBlockRecycled, 
260
      pInfo->stat.resBlockReused, pInfo->stat.resBlockAppend);
261

262
  taosMemoryFreeClear(pInfo);
69,309✔
263
}
264

265
static int32_t extWinOpen(SOperatorInfo* pOperator);
266
static int32_t extWinNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
267

268
typedef struct SMergeAlignedExternalWindowOperator {
269
  SExternalWindowOperator* pExtW;
270
  int64_t curTs;
271
  SResultRow*  pResultRow;
272
} SMergeAlignedExternalWindowOperator;
273

274
void destroyMergeAlignedExternalWindowOperator(void* pOperator) {
2,846✔
275
  SMergeAlignedExternalWindowOperator* pMlExtInfo = (SMergeAlignedExternalWindowOperator*)pOperator;
2,846✔
276
  destroyExternalWindowOperatorInfo(pMlExtInfo->pExtW);
2,846✔
277
  taosMemoryFreeClear(pMlExtInfo);
2,846✔
278
}
2,846✔
279

280
int64_t* extWinExtractTsCol(SSDataBlock* pBlock, int32_t primaryTsIndex, SExecTaskInfo* pTaskInfo) {
4,199,096✔
281
  TSKEY* tsCols = NULL;
4,199,096✔
282

283
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
4,199,096✔
284
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, primaryTsIndex);
4,199,096✔
285
    if (!pColDataInfo) {
4,199,096✔
286
      pTaskInfo->code = terrno;
×
287
      T_LONG_JMP(pTaskInfo->env, terrno);
×
288
    }
289

290
    tsCols = (int64_t*)pColDataInfo->pData;
4,199,096✔
291
    if (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0) {
4,199,096✔
292
      int32_t code = blockDataUpdateTsWindow(pBlock, primaryTsIndex);
4,199,096✔
293
      if (code != TSDB_CODE_SUCCESS) {
4,199,096✔
294
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
295
        pTaskInfo->code = code;
×
296
        T_LONG_JMP(pTaskInfo->env, code);
×
297
      }
298
    }
299
  }
300

301
  return tsCols;
4,199,096✔
302
}
303

304
static int32_t extWinGetCurWinIdx(SExecTaskInfo* pTaskInfo) {
24,687,858✔
305
  return pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx;
24,687,858✔
306
}
307

308
static void extWinIncCurWinIdx(SOperatorInfo* pOperator) {
×
309
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
×
310
  pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx++;
×
311
}
×
312

313
static void extWinSetCurWinIdx(SOperatorInfo* pOperator, int32_t idx) {
24,462,198✔
314
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
24,462,198✔
315
  pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx = idx;
24,464,133✔
316
}
24,464,126✔
317

318

319
static void extWinIncCurWinOutIdx(SStreamRuntimeInfo* pStreamRuntimeInfo) {
836✔
320
  pStreamRuntimeInfo->funcInfo.curOutIdx++;
836✔
321
}
836✔
322

323

324
static const STimeWindow* extWinGetNextWin(SExternalWindowOperator* pExtW, SExecTaskInfo* pTaskInfo) {
×
325
  int32_t curIdx = extWinGetCurWinIdx(pTaskInfo);
×
326
  if (curIdx + 1 >= pExtW->pWins->size) return NULL;
×
327
  return taosArrayGet(pExtW->pWins, curIdx + 1);
×
328
}
329

330

331
static int32_t extWinAppendWinIdx(SExecTaskInfo*       pTaskInfo, SArray* pIdx, SSDataBlock* pBlock, int32_t currWinIdx, int32_t rows) {
281,898✔
332
  int32_t  code = 0, lino = 0;
281,898✔
333
  int64_t* lastRes = taosArrayGetLast(pIdx);
281,898✔
334
  int32_t* lastWinIdx = (int32_t*)lastRes;
281,898✔
335
  int32_t* lastRowIdx = lastWinIdx ? (lastWinIdx + 1) : NULL;
281,898✔
336
  int64_t  res = 0;
281,898✔
337
  int32_t* pWinIdx = (int32_t*)&res;
281,898✔
338
  int32_t* pRowIdx = pWinIdx + 1;
281,898✔
339

340
  if (lastWinIdx && *lastWinIdx == currWinIdx) {
281,898✔
341
    return code;
1,254✔
342
  }
343

344
  *pWinIdx = currWinIdx;
280,644✔
345
  *pRowIdx = pBlock->info.rows - rows;
280,644✔
346

347
  TSDB_CHECK_NULL(taosArrayPush(pIdx, &res), code, lino, _exit, terrno);
280,644✔
348

349
_exit:
280,644✔
350

351
  if (code) {
280,644✔
352
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
353
  }
354

355
  return code;
280,644✔
356
}
357

358

359
static int32_t mergeAlignExtWinSetOutputBuf(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, const STimeWindow* pWin, SResultRow** pResult,
3,702✔
360
                                       SExprSupp* pExprSup, SAggSupporter* pAggSup) {
361
  if (*pResult == NULL) {
3,702✔
362
    *pResult = getNewResultRow(pAggSup->pResultBuf, &pAggSup->currentPageId, pAggSup->resultRowSize);
2,448✔
363
    if (!*pResult) {
2,448✔
364
      qError("get new resultRow failed, err:%s", tstrerror(terrno));
×
365
      return terrno;
×
366
    }
367
    pResultRowInfo->cur = (SResultRowPosition){.pageId = (*pResult)->pageId, .offset = (*pResult)->offset};
2,448✔
368
  }
369
  
370
  (*pResult)->win = *pWin;
3,702✔
371
  (*pResult)->winIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
3,702✔
372
  
373
  return setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
3,702✔
374
}
375

376

377
static int32_t mergeAlignExtWinGetWinFromTs(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW, TSKEY ts, STimeWindow** ppWin) {
3,702✔
378
  int32_t blkWinIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
3,702✔
379
  
380
  // TODO handle desc order
381
  for (int32_t i = blkWinIdx; i < pExtW->pWins->size; ++i) {
4,956✔
382
    STimeWindow* pWin = taosArrayGet(pExtW->pWins, i);
4,956✔
383
    if (ts == pWin->skey) {
4,956✔
384
      extWinSetCurWinIdx(pOperator, i);
3,702✔
385
      *ppWin = pWin;
3,702✔
386
      return TSDB_CODE_SUCCESS;
3,702✔
387
    } else if (ts < pWin->skey) {
1,254✔
388
      qError("invalid ts %" PRId64 " for current window idx %d skey %" PRId64, ts, i, pWin->skey);
×
389
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
390
    }
391
  }
392
  
393
  qError("invalid ts %" PRId64 " to find merge aligned ext window, size:%d", ts, (int32_t)pExtW->pWins->size);
×
394
  return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
395
}
396

397
static int32_t mergeAlignExtWinFinalizeResult(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, SSDataBlock* pResultBlock) {
3,702✔
398
  int32_t        code = 0, lino = 0;
3,702✔
399
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
3,702✔
400
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
3,702✔
401
  SExprSupp*     pSup = &pOperator->exprSupp;
3,702✔
402
  SResultRow*  pResultRow = pMlExtInfo->pResultRow;
3,702✔
403
  
404
  finalizeResultRows(pExtW->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pOperator->pTaskInfo);
3,702✔
405
  
406
  if (pResultRow->numOfRows > 0) {
3,702✔
407
    TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pExtW->pWinRowIdx, pResultBlock, pResultRow->winIdx, pResultRow->numOfRows));
3,702✔
408
  }
409

410
_exit:
3,702✔
411

412
  if (code) {
3,702✔
413
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
414
  }
415

416
  return code;
3,702✔
417
}
418

419
static int32_t mergeAlignExtWinAggDo(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock, SSDataBlock* pResultBlock) {
2,448✔
420
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
2,448✔
421
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
2,448✔
422

423
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2,448✔
424
  SExprSupp*     pSup = &pOperator->exprSupp;
2,448✔
425
  int32_t        code = 0, lino = 0;
2,448✔
426
  STimeWindow *pWin = NULL;
2,448✔
427

428
  int32_t startPos = 0;
2,448✔
429
  int64_t* tsCols = extWinExtractTsCol(pBlock, pExtW->primaryTsIndex, pTaskInfo);
2,448✔
430
  TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
2,448✔
431
  
432
  code = mergeAlignExtWinGetWinFromTs(pOperator, pExtW, ts, &pWin);
2,448✔
433
  if (code) {
2,448✔
434
    qError("failed to get time window for ts:%" PRId64 ", prim ts index:%d, error:%s", ts, pExtW->primaryTsIndex, tstrerror(code));
×
435
    TAOS_CHECK_EXIT(code);
×
436
  }
437

438
  if (pMlExtInfo->curTs != INT64_MIN && pMlExtInfo->curTs != pWin->skey) {
2,448✔
439
    TAOS_CHECK_EXIT(mergeAlignExtWinFinalizeResult(pOperator, pResultRowInfo, pResultBlock));
×
440
    resetResultRow(pMlExtInfo->pResultRow, pExtW->aggSup.resultRowSize - sizeof(SResultRow));
×
441
  }
442
  
443
  TAOS_CHECK_EXIT(mergeAlignExtWinSetOutputBuf(pOperator, pResultRowInfo, pWin, &pMlExtInfo->pResultRow, pSup, &pExtW->aggSup));
2,448✔
444

445
  int32_t currPos = startPos;
2,448✔
446
  pMlExtInfo->curTs = pWin->skey;
2,448✔
447
  
448
  while (++currPos < pBlock->info.rows) {
6,210✔
449
    if (tsCols[currPos] == pMlExtInfo->curTs) continue;
3,762✔
450

451
    qDebug("current ts:%" PRId64 ", startPos:%d, currPos:%d, tsCols[currPos]:%" PRId64,
1,254✔
452
      pMlExtInfo->curTs, startPos, currPos, tsCols[currPos]); 
453
    TAOS_CHECK_EXIT(applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pExtW->twAggSup.timeWindowData, startPos,
1,254✔
454
                                           currPos - startPos, pBlock->info.rows, pSup->numOfExprs));
455

456
    TAOS_CHECK_EXIT(mergeAlignExtWinFinalizeResult(pOperator, pResultRowInfo, pResultBlock));
1,254✔
457
    resetResultRow(pMlExtInfo->pResultRow, pExtW->aggSup.resultRowSize - sizeof(SResultRow));
1,254✔
458

459
    TAOS_CHECK_EXIT(mergeAlignExtWinGetWinFromTs(pOperator, pExtW, tsCols[currPos], &pWin));
1,254✔
460
    
461
    qDebug("ext window align2 start:%" PRId64 ", end:%" PRId64, pWin->skey, pWin->ekey);
1,254✔
462
    startPos = currPos;
1,254✔
463
    
464
    TAOS_CHECK_EXIT(mergeAlignExtWinSetOutputBuf(pOperator, pResultRowInfo, pWin, &pMlExtInfo->pResultRow, pSup, &pExtW->aggSup));
1,254✔
465

466
    pMlExtInfo->curTs = pWin->skey;
1,254✔
467
  }
468

469
  code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pExtW->twAggSup.timeWindowData, startPos,
4,896✔
470
                                         currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
2,448✔
471

472
_exit:
2,448✔
473

474
  if (code != 0) {
2,448✔
475
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
476
    T_LONG_JMP(pTaskInfo->env, code);
×
477
  }
478
  
479
  return code;
2,448✔
480
}
481

482
static int32_t mergeAlignExtWinBuildWinRowIdx(SOperatorInfo* pOperator, SSDataBlock* pInput, SSDataBlock* pResult) {
×
483
  SExternalWindowOperator* pExtW = pOperator->info;
×
484
  int64_t* tsCols = extWinExtractTsCol(pInput, pExtW->primaryTsIndex, pOperator->pTaskInfo);
×
485
  STimeWindow* pWin = NULL;
×
486
  int32_t code = 0, lino = 0;
×
487
  int64_t prevTs = INT64_MIN;
×
488
  
489
  for (int32_t i = 0; i < pInput->info.rows; ++i) {
×
490
    if (prevTs == tsCols[i]) {
×
491
      continue;
×
492
    }
493
    
494
    TAOS_CHECK_EXIT(mergeAlignExtWinGetWinFromTs(pOperator, pExtW, tsCols[i], &pWin));
×
495
    TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pExtW->pWinRowIdx, pResult, extWinGetCurWinIdx(pOperator->pTaskInfo), pInput->info.rows - i));
×
496

497
    prevTs = tsCols[i];
×
498
  }
499

500
_exit:
×
501

502
  if (code != 0) {
×
503
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
504
  }
505

506
  return code;  
×
507
}
508

509
static int32_t mergeAlignExtWinProjectDo(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
×
510
                                            SSDataBlock* pResultBlock) {
511
  SExternalWindowOperator* pExtW = pOperator->info;
×
512
  SExprSupp*               pExprSup = &pExtW->scalarSupp;
×
513
  int32_t                  code = 0, lino = 0;
×
514
  
515
  TAOS_CHECK_EXIT(projectApplyFunctions(pExprSup->pExprInfo, pResultBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
×
516
                        GET_STM_RTINFO(pOperator->pTaskInfo)));
517

518
  TAOS_CHECK_EXIT(mergeAlignExtWinBuildWinRowIdx(pOperator, pBlock, pResultBlock));
×
519

520
_exit:
×
521

522
  if (code != 0) {
×
523
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
524
  }
525

526
  return code;
×
527
}
528

529
void mergeAlignExtWinDo(SOperatorInfo* pOperator) {
2,846✔
530
  SExecTaskInfo*                       pTaskInfo = pOperator->pTaskInfo;
2,846✔
531
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
2,846✔
532
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
2,846✔
533
  SResultRow*                          pResultRow = NULL;
2,846✔
534
  int32_t                              code = 0;
2,846✔
535
  SSDataBlock*                         pRes = pExtW->binfo.pRes;
2,846✔
536
  SExprSupp*                           pSup = &pOperator->exprSupp;
2,846✔
537
  int32_t                              lino = 0;
2,846✔
538

539
  taosArrayClear(pExtW->pWinRowIdx);
2,846✔
540
  blockDataCleanup(pRes);
2,846✔
541

542
  while (1) {
2,448✔
543
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
5,294✔
544

545
    if (pBlock == NULL) {
5,294✔
546
      // close last time window
547
      if (pMlExtInfo->curTs != INT64_MIN && EEXT_MODE_AGG == pExtW->mode) {
2,846✔
548
        TAOS_CHECK_EXIT(mergeAlignExtWinFinalizeResult(pOperator, &pExtW->binfo.resultRowInfo, pRes));
2,448✔
549
      }
550
      setOperatorCompleted(pOperator);
2,846✔
551
      break;
2,846✔
552
    }
553

554
    pRes->info.scanFlag = pBlock->info.scanFlag;
2,448✔
555
    code = setInputDataBlock(pSup, pBlock, pExtW->binfo.inputTsOrder, pBlock->info.scanFlag, true);
2,448✔
556
    QUERY_CHECK_CODE(code, lino, _exit);
2,448✔
557

558
    printDataBlock(pBlock, __func__, "externalwindowAlign", pTaskInfo->id.queryId);
2,448✔
559
    qDebug("ext windowpExtWAlign->scalarMode:%d", pExtW->mode);
2,448✔
560

561
    if (EEXT_MODE_SCALAR == pExtW->mode) {
2,448✔
562
      TAOS_CHECK_EXIT(mergeAlignExtWinProjectDo(pOperator, &pExtW->binfo.resultRowInfo, pBlock, pRes));
×
563
    } else {
564
      TAOS_CHECK_EXIT(mergeAlignExtWinAggDo(pOperator, &pExtW->binfo.resultRowInfo, pBlock, pRes));
2,448✔
565
    }
566

567
    if (pRes->info.rows >= pOperator->resultInfo.threshold) {
2,448✔
568
      break;
×
569
    }
570
  }
571

572
  pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamBlkWinIdx = pExtW->pWinRowIdx;
2,846✔
573
  
574
_exit:
2,846✔
575

576
  if (code != 0) {
2,846✔
577
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
578
    pTaskInfo->code = code;
×
579
    T_LONG_JMP(pTaskInfo->env, code);
×
580
  }
581
}
2,846✔
582

583
static int32_t mergeAlignExtWinNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
5,294✔
584
  SExecTaskInfo*                       pTaskInfo = pOperator->pTaskInfo;
5,294✔
585
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
5,294✔
586
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
5,294✔
587
  int32_t                              code = 0;
5,294✔
588
  int32_t lino = 0;
5,294✔
589

590
  if (pOperator->status == OP_EXEC_DONE) {
5,294✔
591
    (*ppRes) = NULL;
2,448✔
592
    return TSDB_CODE_SUCCESS;
2,448✔
593
  }
594

595
  SSDataBlock* pRes = pExtW->binfo.pRes;
2,846✔
596
  blockDataCleanup(pRes);
2,846✔
597

598
  if (taosArrayGetSize(pExtW->pWins) <= 0) {
2,846✔
599
    size_t size = taosArrayGetSize(pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPesudoFuncVals);
2,846✔
600
    STimeWindow* pWin = taosArrayReserve(pExtW->pWins, size);
2,846✔
601
    TSDB_CHECK_NULL(pWin, code, lino, _exit, terrno);
2,846✔
602

603
    for (int32_t i = 0; i < size; ++i) {
6,946✔
604
      SSTriggerCalcParam* pParam = taosArrayGet(pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPesudoFuncVals, i);
4,100✔
605
      pWin[i].skey = pParam->wstart;
4,100✔
606
      pWin[i].ekey = pParam->wstart + 1;
4,100✔
607
    }
608
    
609
    pExtW->outputWinId = pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx;
2,846✔
610
  }
611

612
  mergeAlignExtWinDo(pOperator);
2,846✔
613
  
614
  size_t rows = pRes->info.rows;
2,846✔
615
  pOperator->resultInfo.totalRows += rows;
2,846✔
616
  (*ppRes) = (rows == 0) ? NULL : pRes;
2,846✔
617

618
_exit:
2,846✔
619

620
  if (code != 0) {
2,846✔
621
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
622
    pTaskInfo->code = code;
×
623
    T_LONG_JMP(pTaskInfo->env, code);
×
624
  }
625
  return code;
2,846✔
626
}
627

628
int32_t resetMergeAlignedExtWinOperator(SOperatorInfo* pOperator) {
2,846✔
629
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
2,846✔
630
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
2,846✔
631
  SExecTaskInfo*                       pTaskInfo = pOperator->pTaskInfo;
2,846✔
632
  SMergeAlignedIntervalPhysiNode * pPhynode = (SMergeAlignedIntervalPhysiNode*)pOperator->pPhyNode;
2,846✔
633
  pOperator->status = OP_NOT_OPENED;
2,846✔
634

635
  taosArrayClear(pExtW->pWins);
2,846✔
636

637
  resetBasicOperatorState(&pExtW->binfo);
2,846✔
638
  pMlExtInfo->pResultRow = NULL;
2,846✔
639
  pMlExtInfo->curTs = INT64_MIN;
2,846✔
640

641
  int32_t code = resetAggSup(&pOperator->exprSupp, &pExtW->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
5,692✔
642
                             sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
2,846✔
643
                             &pTaskInfo->storageAPI.functionStore);
644
  if (code == 0) {
2,846✔
645
    colDataDestroy(&pExtW->twAggSup.timeWindowData);
2,846✔
646
    code = initExecTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pTaskInfo->window);
2,846✔
647
  }
648
  return code;
2,846✔
649
}
650

651
int32_t createMergeAlignedExternalWindowOperator(SOperatorInfo* pDownstream, SPhysiNode* pNode,
2,846✔
652
                                                 SExecTaskInfo* pTaskInfo, SOperatorInfo** ppOptrOut) {
653
  SMergeAlignedIntervalPhysiNode* pPhynode = (SMergeAlignedIntervalPhysiNode*)pNode;
2,846✔
654
  int32_t code = 0;
2,846✔
655
  int32_t lino = 0;
2,846✔
656
  SMergeAlignedExternalWindowOperator* pMlExtInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedExternalWindowOperator));
2,846✔
657
  SOperatorInfo*                       pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2,846✔
658

659
  if (pTaskInfo->pStreamRuntimeInfo != NULL){
2,846✔
660
    pTaskInfo->pStreamRuntimeInfo->funcInfo.withExternalWindow = true;
2,846✔
661
  }
662
  pOperator->pPhyNode = pNode;
2,846✔
663
  if (!pMlExtInfo || !pOperator) {
2,846✔
664
    code = terrno;
×
665
    goto _error;
×
666
  }
667

668
  pMlExtInfo->pExtW = taosMemoryCalloc(1, sizeof(SExternalWindowOperator));
2,846✔
669
  if (!pMlExtInfo->pExtW) {
2,846✔
670
    code = terrno;
×
671
    goto _error;
×
672
  }
673

674
  SExternalWindowOperator* pExtW = pMlExtInfo->pExtW;
2,846✔
675
  SExprSupp* pSup = &pOperator->exprSupp;
2,846✔
676
  pSup->hasWindowOrGroup = true;
2,846✔
677
  pMlExtInfo->curTs = INT64_MIN;
2,846✔
678

679
  pExtW->primaryTsIndex = ((SColumnNode*)pPhynode->window.pTspk)->slotId;
2,846✔
680
  pExtW->mode = pPhynode->window.pProjs ? EEXT_MODE_SCALAR : EEXT_MODE_AGG;
2,846✔
681
  pExtW->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC;
2,846✔
682
  pExtW->binfo.outputTsOrder = pExtW->binfo.inputTsOrder;
2,846✔
683

684
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
2,846✔
685
  initResultSizeInfo(&pOperator->resultInfo, 4096);
2,846✔
686

687
  int32_t num = 0;
2,846✔
688
  SExprInfo* pExprInfo = NULL;
2,846✔
689
  code = createExprInfo(pPhynode->window.pFuncs, NULL, &pExprInfo, &num);
2,846✔
690
  QUERY_CHECK_CODE(code, lino, _error);
2,846✔
691

692
  if (pExtW->mode == EEXT_MODE_AGG) {
2,846✔
693
    code = initAggSup(pSup, &pExtW->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
2,846✔
694
                      &pTaskInfo->storageAPI.functionStore);
695
    QUERY_CHECK_CODE(code, lino, _error);
2,846✔
696
  }
697

698
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhynode->window.node.pOutputDataBlockDesc);
2,846✔
699
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
2,846✔
700
  initBasicInfo(&pExtW->binfo, pResBlock);
2,846✔
701

702
  pExtW->pWins = taosArrayInit(4096, sizeof(STimeWindow));
2,846✔
703
  if (!pExtW->pWins) QUERY_CHECK_CODE(terrno, lino, _error);
2,846✔
704

705
  pExtW->pWinRowIdx = taosArrayInit(4096, sizeof(int64_t));
2,846✔
706
  TSDB_CHECK_NULL(pExtW->pWinRowIdx, code, lino, _error, terrno);
2,846✔
707

708
  initResultRowInfo(&pExtW->binfo.resultRowInfo);
2,846✔
709
  code = blockDataEnsureCapacity(pExtW->binfo.pRes, pOperator->resultInfo.capacity);
2,846✔
710
  QUERY_CHECK_CODE(code, lino, _error);
2,846✔
711
  setOperatorInfo(pOperator, "MergeAlignedExternalWindowOperator", QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW, false, OP_NOT_OPENED, pMlExtInfo, pTaskInfo);
2,846✔
712
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignExtWinNext, NULL,
2,846✔
713
                                         destroyMergeAlignedExternalWindowOperator, optrDefaultBufFn, NULL,
714
                                         optrDefaultGetNextExtFn, NULL);
715
  setOperatorResetStateFn(pOperator, resetMergeAlignedExtWinOperator);
2,846✔
716

717
  code = appendDownstream(pOperator, &pDownstream, 1);
2,846✔
718
  QUERY_CHECK_CODE(code, lino, _error);
2,846✔
719
  *ppOptrOut = pOperator;
2,846✔
720
  return code;
2,846✔
721
  
722
_error:
×
723
  if (pMlExtInfo) destroyMergeAlignedExternalWindowOperator(pMlExtInfo);
×
724
  destroyOperatorAndDownstreams(pOperator, &pDownstream, 1);
×
725
  pTaskInfo->code = code;
×
726
  return code;
×
727
}
728

729
static int32_t resetExternalWindowExprSupp(SExternalWindowOperator* pExtW, SExecTaskInfo* pTaskInfo,
134,367✔
730
                                           SExternalWindowPhysiNode* pPhynode) {
731
  int32_t    code = 0, lino = 0, num = 0;
134,367✔
732
  SExprInfo* pExprInfo = NULL;
134,367✔
733
  cleanupExprSuppWithoutFilter(&pExtW->scalarSupp);
134,852✔
734

735
  SNodeList* pNodeList = NULL;
135,145✔
736
  if (pPhynode->window.pProjs) {
135,145✔
737
    pNodeList = pPhynode->window.pProjs;
×
738
  } else {
739
    pNodeList = pPhynode->window.pExprs;
135,145✔
740
  }
741

742
  code = createExprInfo(pNodeList, NULL, &pExprInfo, &num);
134,660✔
743
  QUERY_CHECK_CODE(code, lino, _error);
135,145✔
744
  code = initExprSupp(&pExtW->scalarSupp, pExprInfo, num, &pTaskInfo->storageAPI.functionStore);
135,145✔
745
  QUERY_CHECK_CODE(code, lino, _error);
134,660✔
746
  return code;
134,660✔
747
_error:
×
748
  if (code != TSDB_CODE_SUCCESS) {
×
749
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
750
    pTaskInfo->code = code;
×
751
  }
752
  return code;
×
753
}
754

755
static int32_t resetExternalWindowOperator(SOperatorInfo* pOperator) {
134,367✔
756
  int32_t code = 0, lino = 0;
134,367✔
757
  SExternalWindowOperator* pExtW = pOperator->info;
134,367✔
758
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
134,367✔
759
  SExternalWindowPhysiNode* pPhynode = (SExternalWindowPhysiNode*)pOperator->pPhyNode;
134,367✔
760
  pOperator->status = OP_NOT_OPENED;
134,852✔
761

762
  //resetBasicOperatorState(&pExtW->binfo);
763
  initResultRowInfo(&pExtW->binfo.resultRowInfo);
134,367✔
764

765
  pExtW->outputWinId = 0;
135,145✔
766
  pExtW->lastWinId = -1;
135,145✔
767
  pExtW->outputWinNum = 0;
134,273✔
768
  taosArrayClear(pExtW->pWins);
134,273✔
769
  extWinRecycleBlkNode(pExtW, &pExtW->pLastBlkNode);
134,758✔
770

771
/*
772
  int32_t code = blockDataEnsureCapacity(pExtW->binfo.pRes, pOperator->resultInfo.capacity);
773
  if (code == 0) {
774
    code = resetAggSup(&pOperator->exprSupp, &pExtW->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
775
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
776
                       &pTaskInfo->storageAPI.functionStore);
777
  }
778
*/
779
  TAOS_CHECK_EXIT(resetExternalWindowExprSupp(pExtW, pTaskInfo, pPhynode));
134,273✔
780
  colDataDestroy(&pExtW->twAggSup.timeWindowData);
134,660✔
781
  TAOS_CHECK_EXIT(initExecTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pTaskInfo->window));
135,239✔
782

783
  pExtW->outWinIdx = 0;
135,239✔
784
  pExtW->lastSKey = INT64_MIN;
134,754✔
785

786
  qDebug("%s ext window stat at reset, created:%" PRId64 ", destroyed:%" PRId64 ", recycled:%" PRId64 ", reused:%" PRId64 ", append:%" PRId64, 
134,881✔
787
      pTaskInfo->id.str, pExtW->stat.resBlockCreated, pExtW->stat.resBlockDestroyed, pExtW->stat.resBlockRecycled, 
788
      pExtW->stat.resBlockReused, pExtW->stat.resBlockAppend);
789

790
_exit:
14,118✔
791

792
  if (code) {
135,239✔
793
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
794
  }
795
  
796
  return code;
135,239✔
797
}
798

799
static EDealRes extWinHasCountLikeFunc(SNode* pNode, void* res) {
596,916✔
800
  if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
596,916✔
801
    SFunctionNode* pFunc = (SFunctionNode*)pNode;
203,720✔
802
    if (fmIsCountLikeFunc(pFunc->funcId) || (pFunc->hasOriginalFunc && fmIsCountLikeFunc(pFunc->originalFuncId))) {
203,720✔
803
      *(bool*)res = true;
47,783✔
804
      return DEAL_RES_END;
48,975✔
805
    }
806
  }
807
  return DEAL_RES_CONTINUE;
551,912✔
808
}
809

810

811
static int32_t extWinCreateEmptyInputBlock(SOperatorInfo* pOperator, SSDataBlock** ppBlock) {
46,989✔
812
  int32_t code = TSDB_CODE_SUCCESS;
46,989✔
813
  int32_t lino = 0;
46,989✔
814
  SSDataBlock* pBlock = NULL;
46,989✔
815
  if (!tsCountAlwaysReturnValue) {
48,975✔
816
    return TSDB_CODE_SUCCESS;
×
817
  }
818

819
  SExternalWindowOperator* pExtW = pOperator->info;
48,975✔
820

821
  if (!pExtW->hasCountFunc) {
48,975✔
822
    return TSDB_CODE_SUCCESS;
×
823
  }
824

825
  code = createDataBlock(&pBlock);
48,975✔
826
  if (code) {
48,975✔
827
    return code;
×
828
  }
829

830
  pBlock->info.rows = 1;
48,975✔
831
  pBlock->info.capacity = 0;
48,975✔
832

833
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
226,084✔
834
    SColumnInfoData colInfo = {0};
177,506✔
835
    colInfo.hasNull = true;
177,109✔
836
    colInfo.info.type = TSDB_DATA_TYPE_NULL;
177,109✔
837
    colInfo.info.bytes = 1;
177,109✔
838

839
    SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
177,109✔
840
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
375,555✔
841
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
198,049✔
842
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
198,049✔
843
        int32_t slotId = pFuncParam->pCol->slotId;
127,473✔
844
        int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
127,473✔
845
        if (slotId >= numOfCols) {
128,267✔
846
          code = taosArrayEnsureCap(pBlock->pDataBlock, slotId + 1);
82,490✔
847
          QUERY_CHECK_CODE(code, lino, _end);
82,093✔
848

849
          for (int32_t k = numOfCols; k < slotId + 1; ++k) {
164,583✔
850
            void* tmp = taosArrayPush(pBlock->pDataBlock, &colInfo);
82,093✔
851
            QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
82,490✔
852
          }
853
        }
854
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
70,179✔
855
        // do nothing
856
      }
857
    }
858
  }
859

860
  code = blockDataEnsureCapacity(pBlock, pBlock->info.rows);
48,578✔
861
  QUERY_CHECK_CODE(code, lino, _end);
48,975✔
862

863
  for (int32_t i = 0; i < blockDataGetNumOfCols(pBlock); ++i) {
131,861✔
864
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
83,284✔
865
    QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
83,284✔
866
    colDataSetNULL(pColInfoData, 0);
867
  }
868
  *ppBlock = pBlock;
48,975✔
869

870
_end:
49,372✔
871
  if (code != TSDB_CODE_SUCCESS) {
48,975✔
872
    blockDataDestroy(pBlock);
×
873
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
874
  }
875
  return code;
48,975✔
876
}
877

878

879

880
static int extWinTsWinCompare(const void* pLeft, const void* pRight) {
6,525,812✔
881
  int64_t ts = *(int64_t*)pLeft;
6,525,812✔
882
  SExtWinTimeWindow* pWin = (SExtWinTimeWindow*)pRight;
6,525,812✔
883
  if (ts < pWin->tw.skey) {
6,525,812✔
884
    return -1;
2,343,501✔
885
  }
886
  if (ts >= pWin->tw.ekey) {
4,182,311✔
887
    return 1;
123,980✔
888
  }
889

890
  return 0;
4,058,331✔
891
}
892

893

894
static int32_t extWinGetMultiTbWinFromTs(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW, int64_t* tsCol, int64_t rowNum, int32_t* startPos) {
2,318,491✔
895
  int32_t idx = taosArraySearchIdx(pExtW->pWins, tsCol, extWinTsWinCompare, TD_EQ);
2,318,491✔
896
  if (idx >= 0) {
2,318,491✔
897
    *startPos = 0;
2,317,631✔
898
    return idx;
2,317,631✔
899
  }
900

901
  SExtWinTimeWindow* pWin = NULL;
860✔
902
  int32_t w = 0;
860✔
903
  for (int64_t i = 1; i < rowNum; ++i) {
1,720✔
904
    for (; w < pExtW->pWins->size; ++w) {
1,720✔
905
      pWin = TARRAY_GET_ELEM(pExtW->pWins, w);
1,720✔
906
      if (tsCol[i] < pWin->tw.skey) {
1,720✔
907
        break;
860✔
908
      }
909
      
910
      if (tsCol[i] < pWin->tw.ekey) {
860✔
911
        *startPos = i;
×
912
        return w;
×
913
      }
914
    }
915
  }
916

917
  return -1;
860✔
918
}
919

920
static int32_t extWinGetNoOvlpWin(SOperatorInfo* pOperator, int64_t* tsCol, int32_t* startPos, SDataBlockInfo* pInfo, SExtWinTimeWindow** ppWin, int32_t* winRows) {
49,152✔
921
  SExternalWindowOperator* pExtW = pOperator->info;
49,152✔
922
  if ((*startPos) >= pInfo->rows) {
49,152✔
923
    qDebug("%s %s blk rowIdx %d reach the end, size: %d, skip block", 
17,011✔
924
        GET_TASKID(pOperator->pTaskInfo), __func__, *startPos, (int32_t)pInfo->rows);
925
    *ppWin = NULL;
17,011✔
926
    return TSDB_CODE_SUCCESS;
17,011✔
927
  }
928
  
929
  if (pExtW->blkWinIdx < 0) {
32,141✔
930
    pExtW->blkWinIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
17,011✔
931
  } else {
932
    pExtW->blkWinIdx++;
15,130✔
933
  }
934

935
  if (pExtW->blkWinIdx >= pExtW->pWins->size) {
32,141✔
936
    qDebug("%s %s ext win blk idx %d reach the end, size: %d, skip block", 
×
937
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (int32_t)pExtW->pWins->size);
938
    *ppWin = NULL;
×
939
    return TSDB_CODE_SUCCESS;
×
940
  }
941
  
942
  SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
32,141✔
943
  if (tsCol[pInfo->rows - 1] < pWin->tw.skey) {
32,141✔
944
    qDebug("%s %s block end ts %" PRId64 " is small than curr win %d skey %" PRId64 ", skip block", 
×
945
        GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[pInfo->rows - 1], pExtW->blkWinIdx, pWin->tw.skey);
946
    *ppWin = NULL;
×
947
    return TSDB_CODE_SUCCESS;
×
948
  }
949

950
  int32_t r = *startPos;
32,141✔
951

952
  qDebug("%s %s start to get novlp win from winIdx %d rowIdx %d", GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, r);
32,141✔
953

954
  // TODO handle desc order
955
  for (; pExtW->blkWinIdx < pExtW->pWins->size; ++pExtW->blkWinIdx) {
40,581✔
956
    pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
40,581✔
957
    for (; r < pInfo->rows; ++r) {
42,793✔
958
      if (tsCol[r] < pWin->tw.skey) {
42,793✔
959
        continue;
2,212✔
960
      }
961

962
      if (tsCol[r] < pWin->tw.ekey) {
40,581✔
963
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
32,141✔
964
        *ppWin = pWin;
32,141✔
965
        *startPos = r;
32,141✔
966
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
32,141✔
967

968
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
32,141✔
969
            GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, r, tsCol[r], tsCol[r + *winRows - 1]);
970
        
971
        return TSDB_CODE_SUCCESS;
32,141✔
972
      }
973

974
      break;
8,440✔
975
    }
976

977
    if (r == pInfo->rows) {
8,440✔
UNCOV
978
      break;
×
979
    }
980
  }
981

UNCOV
982
  qDebug("%s %s no more ext win in block, TR[%" PRId64 ", %" PRId64 "), skip it", 
×
983
      GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
984

UNCOV
985
  *ppWin = NULL;
×
UNCOV
986
  return TSDB_CODE_SUCCESS;
×
987
}
988

989
static int32_t extWinGetOvlpWin(SOperatorInfo* pOperator, int64_t* tsCol, int32_t* startPos, SDataBlockInfo* pInfo, SExtWinTimeWindow** ppWin, int32_t* winRows) {
2,674✔
990
  SExternalWindowOperator* pExtW = pOperator->info;
2,674✔
991
  if (pExtW->blkWinIdx < 0) {
2,674✔
992
    pExtW->blkWinIdx = pExtW->blkWinStartIdx;
1,146✔
993
  } else {
994
    pExtW->blkWinIdx++;
1,528✔
995
  }
996

997
  if (pExtW->blkWinIdx >= pExtW->pWins->size) {
2,674✔
998
    qDebug("%s %s ext win blk idx %d reach the end, size: %d, skip block", 
1,146✔
999
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (int32_t)pExtW->pWins->size);
1000
    *ppWin = NULL;
1,146✔
1001
    return TSDB_CODE_SUCCESS;
1,146✔
1002
  }
1003
  
1004
  SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
1,528✔
1005
  if (tsCol[pInfo->rows - 1] < pWin->tw.skey) {
1,528✔
1006
    qDebug("%s %s block end ts %" PRId64 " is small than curr win %d skey %" PRId64 ", skip block", 
×
1007
        GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[pInfo->rows - 1], pExtW->blkWinIdx, pWin->tw.skey);
1008
    *ppWin = NULL;
×
1009
    return TSDB_CODE_SUCCESS;
×
1010
  }
1011

1012
  int64_t r = 0;
1,528✔
1013

1014
  qDebug("%s %s start to get ovlp win from winIdx %d rowIdx %d", GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pExtW->blkRowStartIdx);
1,528✔
1015
  
1016
  // TODO handle desc order
1017
  for (; pExtW->blkWinIdx < pExtW->pWins->size; ++pExtW->blkWinIdx) {
1,528✔
1018
    pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
1,528✔
1019
    for (r = pExtW->blkRowStartIdx; r < pInfo->rows; ++r) {
4,966✔
1020
      if (tsCol[r] < pWin->tw.skey) {
4,966✔
1021
        pExtW->blkRowStartIdx = r + 1;
3,438✔
1022
        continue;
3,438✔
1023
      }
1024

1025
      if (tsCol[r] < pWin->tw.ekey) {
1,528✔
1026
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
1,528✔
1027
        *ppWin = pWin;
1,528✔
1028
        *startPos = r;
1,528✔
1029
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
1,528✔
1030

1031
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
1,528✔
1032
            GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, (int32_t)r, tsCol[r], tsCol[r + *winRows - 1]);
1033
        
1034
        if ((r + *winRows) < pInfo->rows) {
1,528✔
1035
          pExtW->blkWinStartIdx = pExtW->blkWinIdx + 1;
382✔
1036
          pExtW->blkWinStartSet = true;
382✔
1037
        }
1038
        
1039
        return TSDB_CODE_SUCCESS;
1,528✔
1040
      }
1041

UNCOV
1042
      break;
×
1043
    }
1044

UNCOV
1045
    if (r >= pInfo->rows) {
×
UNCOV
1046
      if (!pExtW->blkWinStartSet) {
×
UNCOV
1047
        pExtW->blkWinStartIdx = pExtW->blkWinIdx;
×
1048
      }
1049
      
UNCOV
1050
      break;
×
1051
    }
1052
  }
1053

UNCOV
1054
  qDebug("%s %s no more ext win in block, TR[%" PRId64 ", %" PRId64 "), skip it", 
×
1055
      GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1056

UNCOV
1057
  *ppWin = NULL;
×
UNCOV
1058
  return TSDB_CODE_SUCCESS;
×
1059
}
1060

1061

1062
static int32_t extWinGetMultiTbNoOvlpWin(SOperatorInfo* pOperator, int64_t* tsCol, int32_t* startPos, SDataBlockInfo* pInfo, SExtWinTimeWindow** ppWin, int32_t* winRows) {
11,856,358✔
1063
  SExternalWindowOperator* pExtW = pOperator->info;
11,856,358✔
1064
  if ((*startPos) >= pInfo->rows) {
11,857,132✔
1065
    qDebug("%s %s blk rowIdx %d reach the end, size: %d, skip block", 
2,317,631✔
1066
        GET_TASKID(pOperator->pTaskInfo), __func__, *startPos, (int32_t)pInfo->rows);
1067
    *ppWin = NULL;
2,317,631✔
1068
    return TSDB_CODE_SUCCESS;
2,317,631✔
1069
  }
1070
  
1071
  if (pExtW->blkWinIdx < 0) {
9,539,888✔
1072
    pExtW->blkWinIdx = extWinGetMultiTbWinFromTs(pOperator, pExtW, tsCol, pInfo->rows, startPos);
2,318,491✔
1073
    if (pExtW->blkWinIdx < 0) {
2,318,491✔
1074
      qDebug("%s %s blk TR[%" PRId64 ", %" PRId64 ") not in any win, skip block", 
860✔
1075
          GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1076
      *ppWin = NULL;
860✔
1077
      return TSDB_CODE_SUCCESS;
860✔
1078
    }
1079

1080
    extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
2,317,631✔
1081
    *ppWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
2,317,631✔
1082
    *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, *startPos, (*ppWin)->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
2,317,631✔
1083

1084
    qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
2,317,631✔
1085
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (*ppWin)->tw.skey, (*ppWin)->tw.ekey, *winRows, *startPos, tsCol[*startPos], tsCol[*startPos + *winRows - 1]);
1086
    
1087
    return TSDB_CODE_SUCCESS;
2,317,631✔
1088
  } else {
1089
    pExtW->blkWinIdx++;
7,221,010✔
1090
  }
1091

1092
  if (pExtW->blkWinIdx >= pExtW->pWins->size) {
7,221,010✔
1093
    qDebug("%s %s ext win blk idx %d reach the end, size: %d, skip block", 
×
1094
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (int32_t)pExtW->pWins->size);
1095
    *ppWin = NULL;
×
1096
    return TSDB_CODE_SUCCESS;
×
1097
  }
1098
  
1099
  SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
7,221,010✔
1100
  if (tsCol[pInfo->rows - 1] < pWin->tw.skey) {
7,221,397✔
1101
    qDebug("%s %s block end ts %" PRId64 " is small than curr win %d skey %" PRId64 ", skip block", 
×
1102
        GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[pInfo->rows - 1], pExtW->blkWinIdx, pWin->tw.skey);
1103
    *ppWin = NULL;
×
1104
    return TSDB_CODE_SUCCESS;
×
1105
  }
1106

1107
  int32_t r = *startPos;
7,220,623✔
1108

1109
  qDebug("%s %s start to get mnovlp win from winIdx %d rowIdx %d", GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, r);
7,220,236✔
1110

1111
  // TODO handle desc order
1112
  for (; pExtW->blkWinIdx < pExtW->pWins->size; ++pExtW->blkWinIdx) {
7,247,560✔
1113
    pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
7,247,560✔
1114
    for (; r < pInfo->rows; ++r) {
7,250,041✔
1115
      if (tsCol[r] < pWin->tw.skey) {
7,250,041✔
1116
        continue;
2,481✔
1117
      }
1118

1119
      if (tsCol[r] < pWin->tw.ekey) {
7,247,560✔
1120
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
7,221,784✔
1121
        *ppWin = pWin;
7,221,784✔
1122
        *startPos = r;
7,221,784✔
1123
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
7,221,784✔
1124

1125
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
7,221,784✔
1126
            GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, r, tsCol[r], tsCol[r + *winRows - 1]);
1127
        
1128
        return TSDB_CODE_SUCCESS;
7,221,397✔
1129
      }
1130

1131
      break;
25,776✔
1132
    }
1133

1134
    if (r == pInfo->rows) {
25,776✔
1135
      break;
×
1136
    }
1137
  }
1138

1139
  qDebug("%s %s no more ext win in block, TR[%" PRId64 ", %" PRId64 "), skip it", 
×
1140
      GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1141

1142
  *ppWin = NULL;
×
1143
  return TSDB_CODE_SUCCESS;
×
1144
}
1145

1146
static int32_t extWinGetFirstWinFromTs(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW, int64_t* tsCol,
1,860,000✔
1147
                                       int64_t rowNum, int32_t* startPos) {
1148
  SExtWinTimeWindow* pWin = NULL;
1,860,000✔
1149
  int32_t            idx = taosArraySearchIdx(pExtW->pWins, tsCol, extWinTsWinCompare, TD_EQ);
1,860,000✔
1150
  if (idx >= 0) {
1,860,000✔
1151
    for (int i = idx - 1; i >= 0; --i) {
1,740,700✔
1152
      pWin = TARRAY_GET_ELEM(pExtW->pWins, i);
×
1153
      if (extWinTsWinCompare(tsCol, pWin) == 0) {
×
1154
        idx = i;
×
1155
      } else {
1156
        break;
×
1157
      }
1158
    }
1159
    *startPos = 0;
1,740,700✔
1160
    return idx;
1,740,700✔
1161
  }
1162

1163
  pWin = NULL;
119,300✔
1164
  int32_t w = 0;
119,300✔
1165
  for (int64_t i = 1; i < rowNum; ++i) {
239,396✔
1166
    for (; w < pExtW->pWins->size; ++w) {
279,196✔
1167
      pWin = TARRAY_GET_ELEM(pExtW->pWins, w);
279,196✔
1168
      if (tsCol[i] < pWin->tw.skey) {
279,196✔
1169
        break;
120,096✔
1170
      }
1171

1172
      if (tsCol[i] < pWin->tw.ekey) {
159,100✔
1173
        *startPos = i;
39,800✔
1174
        return w;
39,800✔
1175
      }
1176
    }
1177
  }
1178

1179
  return -1;
79,500✔
1180
}
1181

1182
static int32_t extWinGetMultiTbOvlpWin(SOperatorInfo* pOperator, int64_t* tsCol, int32_t* startPos, SDataBlockInfo* pInfo, SExtWinTimeWindow** ppWin, int32_t* winRows) {
4,432,204✔
1183
  SExternalWindowOperator* pExtW = pOperator->info;
4,432,204✔
1184
  if (pExtW->blkWinIdx < 0) {
4,432,600✔
1185
    pExtW->blkWinIdx = extWinGetFirstWinFromTs(pOperator, pExtW, tsCol, pInfo->rows, startPos);
1,860,000✔
1186
    if (pExtW->blkWinIdx < 0) {
1,860,000✔
1187
      qDebug("%s %s blk TR[%" PRId64 ", %" PRId64 ") not in any win, skip block", 
79,500✔
1188
          GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1189
      *ppWin = NULL;
79,500✔
1190
      return TSDB_CODE_SUCCESS;
79,500✔
1191
    }
1192

1193
    extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
1,780,500✔
1194
    *ppWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
1,780,500✔
1195
    *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, *startPos, (*ppWin)->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
1,780,500✔
1196
    
1197
    qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
1,780,500✔
1198
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (*ppWin)->tw.skey, (*ppWin)->tw.ekey, *winRows, *startPos, tsCol[*startPos], tsCol[*startPos + *winRows - 1]);
1199
    
1200
    return TSDB_CODE_SUCCESS;
1,780,500✔
1201
  } else {
1202
    pExtW->blkWinIdx++;
2,572,204✔
1203
  }
1204

1205
  if (pExtW->blkWinIdx >= pExtW->pWins->size) {
2,572,600✔
1206
    qDebug("%s %s ext win blk idx %d reach the end, size: %d, skip block", 
1,780,500✔
1207
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (int32_t)pExtW->pWins->size);
1208
    *ppWin = NULL;
1,780,500✔
1209
    return TSDB_CODE_SUCCESS;
1,780,500✔
1210
  }
1211
  
1212
  SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
792,100✔
1213
  if (tsCol[pInfo->rows - 1] < pWin->tw.skey) {
792,100✔
1214
    qDebug("%s %s block end ts %" PRId64 " is small than curr win %d skey %" PRId64 ", skip block", 
×
1215
        GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[pInfo->rows - 1], pExtW->blkWinIdx, pWin->tw.skey);
1216
    *ppWin = NULL;
×
1217
    return TSDB_CODE_SUCCESS;
×
1218
  }
1219

1220
  int64_t r = 0;
792,100✔
1221

1222
  qDebug("%s %s start to get movlp win from winIdx %d rowIdx %d", GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pExtW->blkRowStartIdx);
792,100✔
1223

1224
  // TODO handle desc order
1225
  for (; pExtW->blkWinIdx < pExtW->pWins->size; ++pExtW->blkWinIdx) {
910,900✔
1226
    pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
910,900✔
1227
    for (r = pExtW->blkRowStartIdx; r < pInfo->rows; ++r) {
11,881,776✔
1228
      if (tsCol[r] < pWin->tw.skey) {
11,881,776✔
1229
        pExtW->blkRowStartIdx = r + 1;
10,970,876✔
1230
        continue;
10,970,876✔
1231
      }
1232

1233
      if (tsCol[r] < pWin->tw.ekey) {
910,900✔
1234
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
792,100✔
1235
        *ppWin = pWin;
792,100✔
1236
        *startPos = r;
792,100✔
1237
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
792,100✔
1238

1239
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
792,100✔
1240
            GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, (int32_t)r, tsCol[r], tsCol[r + *winRows - 1]);
1241
        
1242
        return TSDB_CODE_SUCCESS;
792,100✔
1243
      }
1244

1245
      break;
118,800✔
1246
    }
1247

1248
    if (r >= pInfo->rows) {
118,800✔
1249
      break;
×
1250
    }
1251
  }
1252

1253
  qDebug("%s %s no more ext win in block, TR[%" PRId64 ", %" PRId64 "), skip it", 
×
1254
      GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1255

1256
  *ppWin = NULL;
×
1257
  return TSDB_CODE_SUCCESS;
×
1258
}
1259

1260

1261
static int32_t extWinGetWinStartPos(STimeWindow win, const SDataBlockInfo* pBlockInfo, int32_t lastEndPos, int32_t order, int32_t* nextPos, int64_t* tsCol) {
×
1262
  bool ascQuery = order == TSDB_ORDER_ASC;
×
1263

1264
  if (win.ekey <= pBlockInfo->window.skey && ascQuery) {
×
1265
    return -2;
×
1266
  }
1267
//if (win.skey > pBlockInfo->window.ekey && !ascQuery) return -2;
1268

1269
  if (win.skey > pBlockInfo->window.ekey && ascQuery) return -1;
×
1270
//if (win.ekey < pBlockInfo->window.skey && !ascQuery) return -1;
1271

1272
  while (true) {
1273
    if (win.ekey <= tsCol[lastEndPos + 1] && ascQuery) return -2;
×
1274
    if (win.skey <= tsCol[lastEndPos + 1] && ascQuery) break;
×
1275
    lastEndPos++;
×
1276
  }
1277

1278
  *nextPos = lastEndPos + 1;
×
1279
  return 0;
×
1280
}
1281

1282
static int32_t extWinAggSetWinOutputBuf(SOperatorInfo* pOperator, SExtWinTimeWindow* win, SExprSupp* pSupp, 
10,127,956✔
1283
                                     SAggSupporter* pAggSup, SExecTaskInfo* pTaskInfo) {
1284
  int32_t code = 0, lino = 0;
10,127,956✔
1285
  SResultRow* pResultRow = NULL;
10,127,956✔
1286
  SExternalWindowOperator* pExtW = (SExternalWindowOperator*)pOperator->info;
10,127,956✔
1287
  
1288
#if 0
1289
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
1290
                                                  true, tableGroupId, pTaskInfo, true, pAggSup, true);
1291
  if (pResultRow == NULL) {
1292
    qError("failed to set result output buffer, error:%s", tstrerror(pTaskInfo->code));
1293
    return pTaskInfo->code;
1294
  }
1295

1296
  qDebug("current result rows num:%d", tSimpleHashGetSize(pAggSup->pResultRowHashTable));
1297

1298
#else
1299
  if (win->winOutIdx >= 0) {
10,127,956✔
1300
    pResultRow = (SResultRow*)((char*)pExtW->pResultRow + win->winOutIdx * pAggSup->resultRowSize);
9,851,850✔
1301
  } else {
1302
    win->winOutIdx = pExtW->outWinIdx++;
276,106✔
1303
    
1304
    qDebug("set window [%" PRId64 ", %" PRId64 "] outIdx:%d", win->tw.skey, win->tw.ekey, win->winOutIdx);
276,106✔
1305

1306
    pResultRow = (SResultRow*)((char*)pExtW->pResultRow + win->winOutIdx * pAggSup->resultRowSize);
276,106✔
1307
    
1308
    memset(pResultRow, 0, pAggSup->resultRowSize);
276,106✔
1309

1310
    pResultRow->winIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
276,106✔
1311
    TAOS_SET_POBJ_ALIGNED(&pResultRow->win, &win->tw);
276,106✔
1312
  }
1313
#endif
1314

1315
  // set time window for current result
1316
  TAOS_CHECK_EXIT(setResultRowInitCtx(pResultRow, pSupp->pCtx, pSupp->numOfExprs, pSupp->rowEntryInfoOffset));
10,127,956✔
1317

1318
_exit:
10,127,956✔
1319
  
1320
  if (code) {
10,127,956✔
1321
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1322
  }
1323

1324
  return code;
10,127,956✔
1325
}
1326

1327
static int32_t extWinAggDo(SOperatorInfo* pOperator, int32_t startPos, int32_t forwardRows,
12,315,909✔
1328
                                  SSDataBlock* pInputBlock) {
1329
  if (forwardRows == 0) return 0;
12,315,909✔
1330
  SExprSupp*               pSup = &pOperator->exprSupp;
12,315,909✔
1331
  SExternalWindowOperator* pExtW = pOperator->info;
12,315,909✔
1332
  return applyAggFunctionOnPartialTuples(pOperator->pTaskInfo, pSup->pCtx, &pExtW->twAggSup.timeWindowData, startPos,
24,620,598✔
1333
                                         forwardRows, pInputBlock->info.rows, pSup->numOfExprs);
12,316,306✔
1334

1335
}
1336

1337
static bool extWinLastWinClosed(SExternalWindowOperator* pExtW) {
836✔
1338
  if (pExtW->outWinIdx <= 0 || (pExtW->multiTableMode && !pExtW->inputHasOrder)) {
836✔
1339
    return false;
836✔
1340
  }
1341

1342
  if (NULL == pExtW->timeRangeExpr || !pExtW->timeRangeExpr->needCalc) {
×
1343
    return true;
×
1344
  }
1345

1346
  SList* pList = taosArrayGetP(pExtW->pOutputBlocks, pExtW->outWinIdx - 1);
×
1347
  if (0 == listNEles(pList)) {
×
1348
    return true;
×
1349
  }
1350

1351
  SListNode* pNode = listTail(pList);
×
1352
  SArray* pBlkWinIdx = *((SArray**)pNode->data + 1);
×
1353
  int64_t* pIdx = taosArrayGetLast(pBlkWinIdx);
×
1354
  if (pIdx && *(int32_t*)pIdx < pExtW->blkWinStartIdx) {
×
1355
    return true;
×
1356
  }
1357

1358
  return false;
×
1359
}
1360

1361
static int32_t extWinGetWinResBlock(SOperatorInfo* pOperator, int32_t rows, SExtWinTimeWindow* pWin, SSDataBlock** ppRes, SArray** ppIdx) {
2,090✔
1362
  SExternalWindowOperator* pExtW = pOperator->info;
2,090✔
1363
  SList*                   pList = NULL;
2,090✔
1364
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
2,090✔
1365
  
1366
  if (pWin->winOutIdx >= 0) {
2,090✔
1367
    pList = taosArrayGetP(pExtW->pOutputBlocks, pWin->winOutIdx);
1,254✔
1368
  } else {
1369
    if (extWinLastWinClosed(pExtW)) {
836✔
1370
      pWin->winOutIdx = pExtW->outWinIdx - 1;
×
1371
      pList = taosArrayGetP(pExtW->pOutputBlocks, pWin->winOutIdx);
×
1372
    } else {
1373
      pWin->winOutIdx = pExtW->outWinIdx++;
836✔
1374
      pList = tdListNew(POINTER_BYTES * 2);
836✔
1375
      TSDB_CHECK_NULL(pList, code, lino, _exit, terrno);
836✔
1376
      SList** ppList = taosArrayGet(pExtW->pOutputBlocks, pWin->winOutIdx);
836✔
1377
      extWinRecycleBlockList(pExtW, ppList);
836✔
1378
      *ppList = pList;
836✔
1379
    }
1380
  }
1381
  
1382
  TAOS_CHECK_EXIT(extWinGetLastBlockFromList(pExtW, pList, rows, ppRes, ppIdx));
2,090✔
1383

1384
_exit:
2,090✔
1385

1386
  if (code) {
2,090✔
1387
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1388
  }
1389

1390
  return code;
2,090✔
1391
}
1392

1393
static int32_t extWinProjectDo(SOperatorInfo* pOperator, SSDataBlock* pInputBlock, int32_t startPos, int32_t rows, SExtWinTimeWindow* pWin) {
2,090✔
1394
  SExternalWindowOperator* pExtW = pOperator->info;
2,090✔
1395
  SExprSupp*               pExprSup = &pExtW->scalarSupp;
2,090✔
1396
  SSDataBlock*             pResBlock = NULL;
2,090✔
1397
  SArray*                  pIdx = NULL;
2,090✔
1398
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
2,090✔
1399
  
1400
  TAOS_CHECK_EXIT(extWinGetWinResBlock(pOperator, rows, pWin, &pResBlock, &pIdx));
2,090✔
1401

1402
  qDebug("%s %s win[%" PRId64 ", %" PRId64 "] got res block %p winRowIdx %p, winOutIdx:%d, capacity:%d", 
2,090✔
1403
      pOperator->pTaskInfo->id.str, __func__, pWin->tw.skey, pWin->tw.ekey, pResBlock, pIdx, pWin->winOutIdx, pResBlock->info.capacity);
1404
  
1405
  if (!pExtW->pTmpBlock) {
2,090✔
1406
    TAOS_CHECK_EXIT(createOneDataBlock(pInputBlock, false, &pExtW->pTmpBlock));
418✔
1407
  } else {
1408
    blockDataCleanup(pExtW->pTmpBlock);
1,672✔
1409
  }
1410
  
1411
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pExtW->pTmpBlock, TMAX(1, rows)));
2,090✔
1412

1413
  qDebug("%s %s start to copy %d rows to tmp blk", pOperator->pTaskInfo->id.str, __func__, rows);
2,090✔
1414
  TAOS_CHECK_EXIT(blockDataMergeNRows(pExtW->pTmpBlock, pInputBlock, startPos, rows));
2,090✔
1415

1416
  qDebug("%s %s start to apply project to tmp blk", pOperator->pTaskInfo->id.str, __func__);
2,090✔
1417
  TAOS_CHECK_EXIT(projectApplyFunctionsWithSelect(pExprSup->pExprInfo, pResBlock, pExtW->pTmpBlock, pExprSup->pCtx, pExprSup->numOfExprs,
2,090✔
1418
        NULL, GET_STM_RTINFO(pOperator->pTaskInfo), true, pExprSup->hasIndefRowsFunc));
1419

1420
  TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pIdx, pResBlock, extWinGetCurWinIdx(pOperator->pTaskInfo), rows));
2,090✔
1421

1422
_exit:
2,090✔
1423

1424
  if (code) {
2,090✔
1425
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1426
  } else {
1427
    qDebug("%s %s project succeed", pOperator->pTaskInfo->id.str, __func__);
2,090✔
1428
  }
1429
  
1430
  return code;
2,090✔
1431
}
1432

1433
static int32_t extWinProjectOpen(SOperatorInfo* pOperator, SSDataBlock* pInputBlock) {
1,254✔
1434
  SExternalWindowOperator* pExtW = pOperator->info;
1,254✔
1435
  int64_t*                 tsCol = extWinExtractTsCol(pInputBlock, pExtW->primaryTsIndex, pOperator->pTaskInfo);
1,254✔
1436
  SExtWinTimeWindow*       pWin = NULL;
1,254✔
1437
  bool                     ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
1,254✔
1438
  int32_t                  startPos = 0, winRows = 0;
1,254✔
1439
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
1,254✔
1440
  
1441
  while (true) {
1442
    TAOS_CHECK_EXIT((*pExtW->getWinFp)(pOperator, tsCol, &startPos, &pInputBlock->info, &pWin, &winRows));
3,344✔
1443
    if (pWin == NULL) {
3,344✔
1444
      break;
1,254✔
1445
    }
1446

1447
    qDebug("%s ext window [%" PRId64 ", %" PRId64 ") project start, ascScan:%d, startPos:%d, winRows:%d",
2,090✔
1448
           GET_TASKID(pOperator->pTaskInfo), pWin->tw.skey, pWin->tw.ekey, ascScan, startPos, winRows);        
1449
    
1450
    TAOS_CHECK_EXIT(extWinProjectDo(pOperator, pInputBlock, startPos, winRows, pWin));
2,090✔
1451
    
1452
    startPos += winRows;
2,090✔
1453
  }
1454
  
1455
_exit:
1,254✔
1456

1457
  if (code) {
1,254✔
1458
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
1459
  }
1460

1461
  return code;
1,254✔
1462
}
1463

1464
static int32_t extWinIndefRowsDoImpl(SOperatorInfo* pOperator, SSDataBlock* pRes, SSDataBlock* pBlock) {
×
1465
  SExternalWindowOperator* pExtW = pOperator->info;
×
1466
  SOptrBasicInfo*     pInfo = &pExtW->binfo;
×
1467
  SExprSupp*          pSup = &pOperator->exprSupp;
×
1468
  SExecTaskInfo*      pTaskInfo = pOperator->pTaskInfo;
×
1469
  int32_t order = pInfo->inputTsOrder;
×
1470
  int32_t scanFlag = pBlock->info.scanFlag;
×
1471
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
×
1472

1473
  SExprSupp* pScalarSup = &pExtW->scalarSupp;
×
1474
  if (pScalarSup->pExprInfo != NULL) {
×
1475
    TAOS_CHECK_EXIT(projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
×
1476
                                 pExtW->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo)));
1477
  }
1478

1479
  TAOS_CHECK_EXIT(setInputDataBlock(pSup, pBlock, order, scanFlag, false));
×
1480

1481
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pRes, pRes->info.rows + pBlock->info.rows));
×
1482

1483
  TAOS_CHECK_EXIT(projectApplyFunctions(pSup->pExprInfo, pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
×
1484
                               pExtW->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo)));
1485

1486
_exit:
×
1487

1488
  if (code) {
×
1489
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __FUNCTION__, lino, tstrerror(code));
×
1490
  }
1491

1492
  return code;
×
1493
}
1494

1495
static int32_t extWinIndefRowsSetWinOutputBuf(SExternalWindowOperator* pExtW, SExtWinTimeWindow* win, SExprSupp* pSupp, 
×
1496
                                     SAggSupporter* pAggSup, SExecTaskInfo* pTaskInfo, bool reset) {
1497
  int32_t code = 0, lino = 0;
×
1498
  SResultRow* pResultRow = NULL;
×
1499

1500
  pResultRow = (SResultRow*)((char*)pExtW->pResultRow + win->winOutIdx * pAggSup->resultRowSize);
×
1501
  
1502
  qDebug("set window [%" PRId64 ", %" PRId64 "] outIdx:%d", win->tw.skey, win->tw.ekey, win->winOutIdx);
×
1503

1504
  if (reset) {
×
1505
    memset(pResultRow, 0, pAggSup->resultRowSize);
×
1506
    for (int32_t k = 0; k < pSupp->numOfExprs; ++k) {
×
1507
      SqlFunctionCtx* pCtx = &pSupp->pCtx[k];
×
1508
      pCtx->pOutput = NULL;
×
1509
    }
1510
  }
1511

1512
  TAOS_SET_POBJ_ALIGNED(&pResultRow->win, &win->tw);
×
1513

1514
  // set time window for current result
1515
  TAOS_CHECK_EXIT(setResultRowInitCtx(pResultRow, pSupp->pCtx, pSupp->numOfExprs, pSupp->rowEntryInfoOffset));
×
1516

1517
_exit:
×
1518
  
1519
  if (code) {
×
1520
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1521
  }
1522

1523
  return code;
×
1524
}
1525

1526
static int32_t extWinGetSetWinResBlockBuf(SOperatorInfo* pOperator, int32_t rows, SExtWinTimeWindow* pWin, SSDataBlock** ppRes, SArray** ppIdx) {
×
1527
  SExternalWindowOperator* pExtW = pOperator->info;
×
1528
  SList*                   pList = NULL;
×
1529
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
×
1530
  
1531
  if (pWin->winOutIdx >= 0) {
×
1532
    pList = taosArrayGetP(pExtW->pOutputBlocks, pWin->winOutIdx);
×
1533
    TAOS_CHECK_EXIT(extWinIndefRowsSetWinOutputBuf(pExtW, pWin, &pOperator->exprSupp, &pExtW->aggSup, pOperator->pTaskInfo, false));
×
1534
  } else {
1535
    if (extWinLastWinClosed(pExtW)) {
×
1536
      pWin->winOutIdx = pExtW->outWinIdx - 1;
×
1537
      pList = taosArrayGetP(pExtW->pOutputBlocks, pWin->winOutIdx);
×
1538
    } else {
1539
      pWin->winOutIdx = pExtW->outWinIdx++;
×
1540
      pList = tdListNew(POINTER_BYTES * 2);
×
1541
      TSDB_CHECK_NULL(pList, code, lino, _exit, terrno);
×
1542
      SList** ppList = taosArrayGet(pExtW->pOutputBlocks, pWin->winOutIdx);
×
1543
      extWinRecycleBlockList(pExtW, ppList);
×
1544
      *ppList = pList;
×
1545
    }
1546
    TAOS_CHECK_EXIT(extWinIndefRowsSetWinOutputBuf(pExtW, pWin, &pOperator->exprSupp, &pExtW->aggSup, pOperator->pTaskInfo, true));
×
1547
  }
1548
  
1549
  TAOS_CHECK_EXIT(extWinGetLastBlockFromList(pExtW, pList, rows, ppRes, ppIdx));
×
1550

1551
_exit:
×
1552

1553
  if (code) {
×
1554
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1555
  }
1556

1557
  return code;
×
1558
}
1559

1560

1561
static int32_t extWinIndefRowsDo(SOperatorInfo* pOperator, SSDataBlock* pInputBlock, int32_t startPos, int32_t rows, SExtWinTimeWindow* pWin) {
×
1562
  SExternalWindowOperator* pExtW = pOperator->info;
×
1563
  SSDataBlock*             pResBlock = NULL;
×
1564
  SArray*                  pIdx = NULL;
×
1565
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
×
1566
  
1567
  TAOS_CHECK_EXIT(extWinGetSetWinResBlockBuf(pOperator, rows, pWin, &pResBlock, &pIdx));
×
1568
  
1569
  if (!pExtW->pTmpBlock) {
×
1570
    TAOS_CHECK_EXIT(createOneDataBlock(pInputBlock, false, &pExtW->pTmpBlock));
×
1571
  } else {
1572
    blockDataCleanup(pExtW->pTmpBlock);
×
1573
  }
1574
  
1575
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pExtW->pTmpBlock, TMAX(1, rows)));
×
1576

1577
  TAOS_CHECK_EXIT(blockDataMergeNRows(pExtW->pTmpBlock, pInputBlock, startPos, rows));
×
1578
  TAOS_CHECK_EXIT(extWinIndefRowsDoImpl(pOperator, pResBlock, pExtW->pTmpBlock));
×
1579

1580
  TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pIdx, pResBlock, extWinGetCurWinIdx(pOperator->pTaskInfo), rows));
×
1581

1582
_exit:
×
1583

1584
  if (code) {
×
1585
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1586
  }
1587
  
1588
  return code;
×
1589
}
1590

1591

1592
static int32_t extWinIndefRowsOpen(SOperatorInfo* pOperator, SSDataBlock* pInputBlock) {
×
1593
  SExternalWindowOperator* pExtW = pOperator->info;
×
1594
  int64_t*                 tsCol = extWinExtractTsCol(pInputBlock, pExtW->primaryTsIndex, pOperator->pTaskInfo);
×
1595
  SExtWinTimeWindow*       pWin = NULL;
×
1596
  bool                     ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
×
1597
  int32_t                  startPos = 0, winRows = 0;
×
1598
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
×
1599
  
1600
  while (true) {
1601
    TAOS_CHECK_EXIT((*pExtW->getWinFp)(pOperator, tsCol, &startPos, &pInputBlock->info, &pWin, &winRows));
×
1602
    if (pWin == NULL) {
×
1603
      break;
×
1604
    }
1605

1606
    qDebug("%s ext window [%" PRId64 ", %" PRId64 ") indefRows start, ascScan:%d, startPos:%d, winRows:%d",
×
1607
           GET_TASKID(pOperator->pTaskInfo), pWin->tw.skey, pWin->tw.ekey, ascScan, startPos, winRows);        
1608
    
1609
    TAOS_CHECK_EXIT(extWinIndefRowsDo(pOperator, pInputBlock, startPos, winRows, pWin));
×
1610
    
1611
    startPos += winRows;
×
1612
  }
1613
  
1614
_exit:
×
1615

1616
  if (code) {
×
1617
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
1618
  }
1619

1620
  return code;
×
1621
}
1622

1623
static int32_t extWinNonAggOutputRes(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
836✔
1624
  SExternalWindowOperator* pExtW = pOperator->info;
836✔
1625
  int32_t                  numOfWin = pExtW->outWinIdx;
836✔
1626
  int32_t                  code = TSDB_CODE_SUCCESS;
836✔
1627
  int32_t                  lino = 0;
836✔
1628
  SSDataBlock*             pRes = NULL;
836✔
1629

1630
  for (; pExtW->outputWinId < numOfWin; pExtW->outputWinId++, extWinIncCurWinOutIdx(pOperator->pTaskInfo->pStreamRuntimeInfo)) {
836✔
1631
    SList* pList = taosArrayGetP(pExtW->pOutputBlocks, pExtW->outputWinId);
836✔
1632
    if (listNEles(pList) <= 0) {
836✔
1633
      continue;
×
1634
    }
1635

1636
    SListNode* pNode = tdListPopHead(pList);
836✔
1637
    pRes = *(SSDataBlock**)pNode->data;
836✔
1638
    pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamBlkWinIdx = *(SArray**)((SArray**)pNode->data + 1);
836✔
1639
    pExtW->pLastBlkNode = pNode;
836✔
1640

1641
    if (listNEles(pList) <= 0) {
836✔
1642
      pExtW->outputWinId++;
836✔
1643
      extWinIncCurWinOutIdx(pOperator->pTaskInfo->pStreamRuntimeInfo);
836✔
1644
    }
1645

1646
    break;
836✔
1647
  }
1648

1649
  if (pRes) {
836✔
1650
    qDebug("%s result generated, rows:%" PRId64 , GET_TASKID(pOperator->pTaskInfo), pRes->info.rows);
836✔
1651
    pRes->info.version = pOperator->pTaskInfo->version;
836✔
1652
    pRes->info.dataLoad = 1;
836✔
1653
  } else {
1654
    pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamBlkWinIdx = NULL;
×
1655
    qDebug("%s ext window done", GET_TASKID(pOperator->pTaskInfo));
×
1656
  }
1657

1658
  *ppRes = (pRes && pRes->info.rows > 0) ? pRes : NULL;
836✔
1659

1660
_exit:
836✔
1661

1662
  if (code != TSDB_CODE_SUCCESS) {
836✔
1663
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1664
  }
1665

1666
  return code;
836✔
1667
}
1668

1669
static int32_t extWinAggHandleEmptyWins(SOperatorInfo* pOperator, SSDataBlock* pBlock, bool allRemains, SExtWinTimeWindow* pWin) {
12,246,695✔
1670
  int32_t code = 0, lino = 0;
12,246,695✔
1671
  SExternalWindowOperator* pExtW = (SExternalWindowOperator*)pOperator->info;
12,246,695✔
1672
  SExprSupp* pSup = &pOperator->exprSupp;
12,246,695✔
1673
  int32_t currIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
12,245,921✔
1674

1675
  if (NULL == pExtW->pEmptyInputBlock || (pWin && pWin->tw.skey == pExtW->lastSKey)) {
12,246,308✔
1676
    goto _exit;
2,266,689✔
1677
  }
1678

1679
  bool ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
9,980,006✔
1680
  int32_t endIdx = allRemains ? (pExtW->pWins->size - 1) : (currIdx - 1);
9,980,006✔
1681
  SResultRowInfo* pResultRowInfo = &pExtW->binfo.resultRowInfo;
9,980,006✔
1682
  SSDataBlock* pInput = pExtW->pEmptyInputBlock;
9,980,006✔
1683

1684
  if ((pExtW->lastWinId + 1) <= endIdx) {
9,980,393✔
1685
    TAOS_CHECK_EXIT(setInputDataBlock(pSup, pExtW->pEmptyInputBlock, pExtW->binfo.inputTsOrder, MAIN_SCAN, true));
133,474✔
1686
  }
1687
  
1688
  for (int32_t i = pExtW->lastWinId + 1; i <= endIdx; ++i) {
10,153,105✔
1689
    SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, i);
172,712✔
1690

1691
    extWinSetCurWinIdx(pOperator, i);
172,712✔
1692
    qDebug("%s %dth ext empty window start:%" PRId64 ", end:%" PRId64 ", ascScan:%d",
172,712✔
1693
           GET_TASKID(pOperator->pTaskInfo), i, pWin->tw.skey, pWin->tw.ekey, ascScan);
1694

1695
    TAOS_CHECK_EXIT(extWinAggSetWinOutputBuf(pOperator, pWin, pSup, &pExtW->aggSup, pOperator->pTaskInfo));
172,712✔
1696

1697
    updateTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pWin->tw, 1);
172,712✔
1698
    code = extWinAggDo(pOperator, 0, 1, pInput);
172,712✔
1699
    pExtW->lastWinId = i;  
172,712✔
1700
    TAOS_CHECK_EXIT(code);
172,712✔
1701
  }
1702

1703
  
1704
_exit:
9,980,393✔
1705

1706
  if (code) {
12,247,082✔
1707
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __FUNCTION__, lino, tstrerror(code));
×
1708
  } else {
1709
    if (pBlock) {
12,247,082✔
1710
      TAOS_CHECK_EXIT(setInputDataBlock(pSup, pBlock, pExtW->binfo.inputTsOrder, pBlock->info.scanFlag, true));
12,143,594✔
1711
    }
1712

1713
    if (!allRemains) {
12,246,297✔
1714
      extWinSetCurWinIdx(pOperator, currIdx);  
12,142,809✔
1715
    }
1716
  }
1717

1718
  return code;
12,246,297✔
1719
}
1720

1721
static int32_t extWinAggOpen(SOperatorInfo* pOperator, SSDataBlock* pInputBlock) {
4,195,394✔
1722
  SExternalWindowOperator* pExtW = (SExternalWindowOperator*)pOperator->info;
4,195,394✔
1723
  int32_t                  startPos = 0, winRows = 0;
4,195,394✔
1724
  int64_t*                 tsCol = extWinExtractTsCol(pInputBlock, pExtW->primaryTsIndex, pOperator->pTaskInfo);
4,195,394✔
1725
  bool                     ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
4,195,394✔
1726
  int32_t                  code = 0, lino = 0;
4,195,394✔
1727
  SExtWinTimeWindow*       pWin = NULL;
4,195,394✔
1728
  bool                     scalarCalc = false;
4,195,394✔
1729

1730
  while (true) {
1731
    TAOS_CHECK_EXIT((*pExtW->getWinFp)(pOperator, tsCol, &startPos, &pInputBlock->info, &pWin, &winRows));
16,337,431✔
1732
    if (pWin == NULL) {
16,338,988✔
1733
      break;
4,195,394✔
1734
    }
1735

1736
    TAOS_CHECK_EXIT(extWinAggHandleEmptyWins(pOperator, pInputBlock, false, pWin));
12,143,594✔
1737

1738
    qDebug("%s ext window [%" PRId64 ", %" PRId64 ") agg start, ascScan:%d, startPos:%d, winRows:%d",
12,142,422✔
1739
           GET_TASKID(pOperator->pTaskInfo), pWin->tw.skey, pWin->tw.ekey, ascScan, startPos, winRows);        
1740

1741
    if (!scalarCalc) {
12,143,594✔
1742
      if (pExtW->scalarSupp.pExprInfo) {
4,115,034✔
1743
        SExprSupp* pScalarSup = &pExtW->scalarSupp;
×
1744
        TAOS_CHECK_EXIT(projectApplyFunctions(pScalarSup->pExprInfo, pInputBlock, pInputBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
×
1745
                                     pExtW->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo)));
1746
      }
1747
      
1748
      scalarCalc = true;
4,115,034✔
1749
    }
1750

1751
    if (pWin->tw.skey != pExtW->lastSKey) {
12,143,594✔
1752
      TAOS_CHECK_EXIT(extWinAggSetWinOutputBuf(pOperator, pWin, &pOperator->exprSupp, &pExtW->aggSup, pOperator->pTaskInfo));
9,955,244✔
1753
    }
1754
    
1755
    updateTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pWin->tw, 1);
12,143,594✔
1756
    TAOS_CHECK_EXIT(extWinAggDo(pOperator, startPos, winRows, pInputBlock));
12,143,197✔
1757
    
1758
    pExtW->lastSKey = pWin->tw.skey;
12,143,207✔
1759
    pExtW->lastWinId = extWinGetCurWinIdx(pOperator->pTaskInfo);
12,142,811✔
1760
    startPos += winRows;
12,142,037✔
1761
  }
1762

1763
_exit:
4,195,394✔
1764

1765
  if (code) {
4,195,394✔
1766
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
1767
  }
1768

1769
  return code;
4,195,394✔
1770
}
1771

1772
static int32_t extWinAggOutputRes(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
103,488✔
1773
  SExternalWindowOperator* pExtW = pOperator->info;
103,488✔
1774
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
103,488✔
1775
  SSDataBlock*             pBlock = pExtW->binfo.pRes;
103,488✔
1776
  int32_t                  code = TSDB_CODE_SUCCESS;
103,488✔
1777
  int32_t                  lino = 0;
103,488✔
1778
  SExprInfo*               pExprInfo = pOperator->exprSupp.pExprInfo;
103,488✔
1779
  int32_t                  numOfExprs = pOperator->exprSupp.numOfExprs;
103,488✔
1780
  int32_t*                 rowEntryOffset = pOperator->exprSupp.rowEntryInfoOffset;
103,488✔
1781
  SqlFunctionCtx*          pCtx = pOperator->exprSupp.pCtx;
103,488✔
1782
  int32_t                  numOfWin = pExtW->outWinIdx;
103,488✔
1783

1784
  pBlock->info.version = pTaskInfo->version;
103,488✔
1785
  blockDataCleanup(pBlock);
103,488✔
1786
  taosArrayClear(pExtW->pWinRowIdx);
103,488✔
1787

1788
  for (; pExtW->outputWinId < pExtW->pWins->size; pExtW->outputWinId += 1) {
395,188✔
1789
    SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->outputWinId);
291,700✔
1790
    int32_t            winIdx = pWin->winOutIdx;
291,700✔
1791
    if (winIdx < 0) {
291,700✔
1792
      continue;
15,594✔
1793
    }
1794

1795
    pExtW->outputWinNum++;
276,106✔
1796
    SResultRow* pRow = (SResultRow*)((char*)pExtW->pResultRow + winIdx * pExtW->aggSup.resultRowSize);
276,106✔
1797

1798
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
276,106✔
1799

1800
    // no results, continue to check the next one
1801
    if (pRow->numOfRows == 0) {
276,106✔
1802
      continue;
×
1803
    }
1804

1805
    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
276,106✔
1806
      uint32_t newSize = pBlock->info.rows + pRow->numOfRows + numOfWin - pExtW->outputWinNum;
79,659✔
1807
      TAOS_CHECK_EXIT(blockDataEnsureCapacity(pBlock, newSize));
79,659✔
1808
      qDebug("datablock capacity not sufficient, expand to required:%d, current capacity:%d, %s", newSize,
79,659✔
1809
             pBlock->info.capacity, GET_TASKID(pTaskInfo));
1810
    }
1811

1812
    TAOS_CHECK_EXIT(copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo));
276,106✔
1813

1814
    pBlock->info.rows += pRow->numOfRows;
276,106✔
1815

1816
    TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pExtW->pWinRowIdx, pBlock, pRow->winIdx, pRow->numOfRows));
276,106✔
1817

1818
    if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
276,106✔
1819
      break;
×
1820
    }
1821
  }
1822

1823
  qDebug("%s result generated, rows:%" PRId64 ", groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
103,488✔
1824
         pBlock->info.id.groupId);
1825
         
1826
  pBlock->info.dataLoad = 1;
103,488✔
1827

1828
  *ppRes = (pBlock->info.rows > 0) ? pBlock : NULL;
103,488✔
1829
  pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamBlkWinIdx = pExtW->pWinRowIdx;
103,488✔
1830

1831
_exit:
103,488✔
1832

1833
  if (code != TSDB_CODE_SUCCESS) {
103,488✔
1834
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1835
  }
1836

1837
  return code;
103,488✔
1838
}
1839

1840
static int32_t extWinInitResultRow(SExecTaskInfo*        pTaskInfo, SExternalWindowOperator* pExtW, int32_t winNum) {
102,714✔
1841
  if (EEXT_MODE_SCALAR == pExtW->mode) {
102,714✔
1842
    return TSDB_CODE_SUCCESS;
418✔
1843
  }
1844

1845
  if (winNum <= pExtW->resultRowCapacity) {
103,488✔
1846
    return TSDB_CODE_SUCCESS;
21,839✔
1847
  }
1848
  
1849
  taosMemoryFreeClear(pExtW->pResultRow);
81,649✔
1850
  pExtW->resultRowCapacity = -1;
81,649✔
1851

1852
  int32_t code = 0, lino = 0;
81,649✔
1853
  
1854
  pExtW->pResultRow = taosMemoryCalloc(winNum, pExtW->aggSup.resultRowSize);
81,649✔
1855
  QUERY_CHECK_NULL(pExtW->pResultRow, code, lino, _exit, terrno);
81,649✔
1856

1857
  pExtW->resultRowCapacity = winNum;
81,649✔
1858

1859
_exit:
81,649✔
1860

1861
  if (code) {
81,649✔
1862
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1863
  }
1864

1865
  return code;
81,649✔
1866
}
1867

1868
static void extWinFreeResultRow(SExternalWindowOperator* pExtW) {
103,488✔
1869
  if (pExtW->resultRowCapacity * pExtW->aggSup.resultRowSize >= 1048576) {
103,488✔
1870
    taosMemoryFreeClear(pExtW->pResultRow);
×
1871
    pExtW->resultRowCapacity = -1;
×
1872
  }
1873
  if (pExtW->binfo.pRes && pExtW->binfo.pRes->info.rows * pExtW->aggSup.resultRowSize >= 1048576) {
103,488✔
1874
    blockDataFreeCols(pExtW->binfo.pRes);
×
1875
  }
1876
}
103,488✔
1877

1878
static int32_t extWinInitWindowList(SExternalWindowOperator* pExtW, SExecTaskInfo*        pTaskInfo) {
103,906✔
1879
  if (taosArrayGetSize(pExtW->pWins) > 0) {
103,906✔
1880
    return TSDB_CODE_SUCCESS;
×
1881
  }
1882
  
1883
  int32_t code = 0, lino = 0;
103,906✔
1884
  SStreamRuntimeFuncInfo* pInfo = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
103,906✔
1885
  size_t size = taosArrayGetSize(pInfo->pStreamPesudoFuncVals);
103,906✔
1886
  SExtWinTimeWindow* pWin = taosArrayReserve(pExtW->pWins, size);
103,906✔
1887
  TSDB_CHECK_NULL(pWin, code, lino, _exit, terrno);
103,906✔
1888

1889
  TAOS_CHECK_EXIT(extWinInitResultRow(pTaskInfo, pExtW, size));
103,906✔
1890

1891
  if (pExtW->timeRangeExpr && pExtW->timeRangeExpr->needCalc) {
103,906✔
1892
    TAOS_CHECK_EXIT(scalarCalculateExtWinsTimeRange(pExtW->timeRangeExpr, pInfo, pWin));
19,746✔
1893
    if (qDebugFlag & DEBUG_DEBUG) {
19,348✔
1894
      for (int32_t i = 0; i < size; ++i) {
49,778✔
1895
        qDebug("%s the %d/%d ext window calced initialized, TR[%" PRId64 ", %" PRId64 ")", 
30,032✔
1896
            pTaskInfo->id.str, i, (int32_t)size, pWin[i].tw.skey, pWin[i].tw.ekey);
1897
      }
1898
    }
1899
  } else {
1900
    for (int32_t i = 0; i < size; ++i) {
346,266✔
1901
      SSTriggerCalcParam* pParam = taosArrayGet(pInfo->pStreamPesudoFuncVals, i);
262,106✔
1902

1903
      pWin[i].tw.skey = pParam->wstart;
261,676✔
1904
      pWin[i].tw.ekey = pParam->wend + ((pInfo->triggerType != STREAM_TRIGGER_SLIDING) ? 1 : 0);
261,676✔
1905
      pWin[i].winOutIdx = -1;
262,106✔
1906

1907
      qDebug("%s the %d/%d ext window initialized, TR[%" PRId64 ", %" PRId64 ")", 
262,106✔
1908
          pTaskInfo->id.str, i, (int32_t)size, pWin[i].tw.skey, pWin[i].tw.ekey);
1909
    }
1910
  }
1911
  
1912
  pExtW->outputWinId = pInfo->curIdx;
103,906✔
1913
  pExtW->lastWinId = -1;
103,906✔
1914
  pExtW->blkWinStartIdx = pInfo->curIdx;
103,906✔
1915

1916
_exit:
103,906✔
1917

1918
  if (code) {
103,906✔
1919
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1920
  }
1921

1922
  return code;
103,906✔
1923
}
1924

1925
static bool extWinNonAggGotResBlock(SExternalWindowOperator* pExtW) {
1,254✔
1926
  if (pExtW->multiTableMode && !pExtW->inputHasOrder) {
1,254✔
1927
    return false;
1,254✔
1928
  }
1929
  int32_t remainWin = pExtW->outWinIdx - pExtW->outputWinId;
×
1930
  if (remainWin > 1 && (NULL == pExtW->timeRangeExpr || !pExtW->timeRangeExpr->needCalc)) {
×
1931
    return true;
×
1932
  }
1933
  
1934
  SList* pList = taosArrayGetP(pExtW->pOutputBlocks, pExtW->outputWinId);
×
1935
  if (!pList || listNEles(pList) <= 0) {
×
1936
    return false;
×
1937
  }
1938
  if (listNEles(pList) > 1) {
×
1939
    return true;
×
1940
  }
1941

1942
  SListNode* pNode = listHead(pList);
×
1943
  SArray* pIdx = *(SArray**)((SArray**)pNode->data + 1);
×
1944
  int32_t* winIdx = taosArrayGetLast(pIdx);
×
1945
  if (winIdx && *winIdx < pExtW->blkWinStartIdx) {
×
1946
    return true;
×
1947
  }
1948

1949
  return false;
×
1950
}
1951

1952
static int32_t extWinOpen(SOperatorInfo* pOperator) {
104,324✔
1953
  if (OPTR_IS_OPENED(pOperator)) {
104,324✔
1954
    return TSDB_CODE_SUCCESS;
418✔
1955
  }
1956
  
1957
  int32_t                  code = 0;
103,906✔
1958
  int32_t                  lino = 0;
103,906✔
1959
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
103,906✔
1960
  SOperatorInfo*           pDownstream = pOperator->pDownstream[0];
103,906✔
1961
  SExternalWindowOperator* pExtW = pOperator->info;
103,906✔
1962
  SExprSupp*               pSup = &pOperator->exprSupp;
103,906✔
1963
  
1964
  TAOS_CHECK_EXIT(extWinInitWindowList(pExtW, pTaskInfo));
103,906✔
1965

1966
  while (1) {
4,196,648✔
1967
    pExtW->blkWinIdx = -1;
4,300,554✔
1968
    pExtW->blkWinStartSet = false;
4,300,554✔
1969
    pExtW->blkRowStartIdx = 0;
4,300,554✔
1970
    
1971
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
4,300,554✔
1972
    if (pBlock == NULL) {
4,300,554✔
1973
      if (EEXT_MODE_AGG == pExtW->mode) {
103,906✔
1974
        TAOS_CHECK_EXIT(extWinAggHandleEmptyWins(pOperator, pBlock, true, NULL));
103,488✔
1975
      }
1976
      pExtW->blkWinStartIdx = pExtW->pWins->size;
103,906✔
1977
      break;
103,906✔
1978
    }
1979

1980
    printDataBlock(pBlock, __func__, pTaskInfo->id.str, pTaskInfo->id.queryId);
4,196,648✔
1981

1982
    qDebug("ext window mode:%d got %" PRId64 " rows from downstream", pExtW->mode, pBlock->info.rows);
4,196,648✔
1983
    
1984
    switch (pExtW->mode) {
4,197,035✔
1985
      case EEXT_MODE_SCALAR:
1,254✔
1986
        TAOS_CHECK_EXIT(extWinProjectOpen(pOperator, pBlock));
1,254✔
1987
        if (extWinNonAggGotResBlock(pExtW)) {
1,254✔
1988
          return code;
×
1989
        }
1990
        break;
1,254✔
1991
      case EEXT_MODE_AGG:
4,195,394✔
1992
        TAOS_CHECK_EXIT(extWinAggOpen(pOperator, pBlock));
4,195,394✔
1993
        break;
4,195,394✔
1994
      case EEXT_MODE_INDEFR_FUNC:
×
1995
        TAOS_CHECK_EXIT(extWinIndefRowsOpen(pOperator, pBlock));
×
1996
        if (extWinNonAggGotResBlock(pExtW)) {
×
1997
          return code;
×
1998
        }
1999
        break;
×
2000
      default:
×
2001
        break;
×
2002
    }
2003
  }
2004

2005
  OPTR_SET_OPENED(pOperator);
103,906✔
2006

2007
#if 0
2008
  if (pExtW->mode == EEXT_MODE_AGG) {
2009
    qDebug("ext window before dump final rows num:%d", tSimpleHashGetSize(pExtW->aggSup.pResultRowHashTable));
2010

2011
    code = initGroupedResultInfo(&pExtW->groupResInfo, pExtW->aggSup.pResultRowHashTable, pExtW->binfo.inputTsOrder);
2012
    QUERY_CHECK_CODE(code, lino, _exit);
2013

2014
    qDebug("ext window after dump final rows num:%d", tSimpleHashGetSize(pExtW->aggSup.pResultRowHashTable));
2015
  }
2016
#endif
2017

2018
_exit:
103,906✔
2019

2020
  if (code != 0) {
103,906✔
2021
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
2022
    pTaskInfo->code = code;
×
2023
    T_LONG_JMP(pTaskInfo->env, code);
×
2024
  }
2025
  
2026
  return code;
103,906✔
2027
}
2028

2029
static int32_t extWinNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
202,120✔
2030
  int32_t                  code = 0;
202,120✔
2031
  int32_t                  lino = 0;
202,120✔
2032
  SExternalWindowOperator* pExtW = pOperator->info;
202,120✔
2033
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
202,120✔
2034

2035
  if (pOperator->status == OP_EXEC_DONE) {
202,120✔
2036
    *ppRes = NULL;
97,796✔
2037
    return code;
97,796✔
2038
  }
2039

2040
  extWinRecycleBlkNode(pExtW, &pExtW->pLastBlkNode);
104,324✔
2041

2042
  TAOS_CHECK_EXIT(pOperator->fpSet._openFn(pOperator));
104,324✔
2043

2044
  if (pExtW->mode == EEXT_MODE_SCALAR || pExtW->mode == EEXT_MODE_INDEFR_FUNC) {
104,324✔
2045
    TAOS_CHECK_EXIT(extWinNonAggOutputRes(pOperator, ppRes));
836✔
2046
    if (NULL == *ppRes) {
836✔
2047
      setOperatorCompleted(pOperator);
×
2048
      extWinFreeResultRow(pExtW);
×
2049
    }
2050
  } else {
2051
#if 0    
2052
    doBuildResultDatablock(pOperator, &pExtW->binfo, &pExtW->groupResInfo, pExtW->aggSup.pResultBuf);
2053
    bool hasRemain = hasRemainResults(&pExtW->groupResInfo);
2054
    if (!hasRemain) {
2055
      setOperatorCompleted(pOperator);
2056
      break;
2057
    }
2058
    if (pExtW->binfo.pRes->info.rows > 0) break;
2059
#else
2060
    TAOS_CHECK_EXIT(extWinAggOutputRes(pOperator, ppRes));
103,488✔
2061
    setOperatorCompleted(pOperator);
103,488✔
2062
    extWinFreeResultRow(pExtW);
103,488✔
2063
#endif      
2064
  }
2065

2066
  if (*ppRes) {
104,324✔
2067
    pOperator->resultInfo.totalRows += (*ppRes)->info.rows;
102,334✔
2068
  }
2069
  
2070
_exit:
1,990✔
2071

2072
  if (code) {
104,324✔
2073
    qError("%s %s failed at line %d since %s", GET_TASKID(pTaskInfo), __func__, lino, tstrerror(code));
×
2074
    pTaskInfo->code = code;
×
2075
    T_LONG_JMP(pTaskInfo->env, code);
×
2076
  }
2077

2078
  if ((*ppRes) && (*ppRes)->info.rows <= 0) {
104,324✔
2079
    *ppRes = NULL;
×
2080
  }
2081

2082
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM && (*ppRes)) {
104,324✔
2083
    printDataBlock(*ppRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo), pTaskInfo->id.queryId);
102,334✔
2084
  }
2085
  
2086
  return code;
104,324✔
2087
}
2088

2089

2090
int32_t createExternalWindowOperator(SOperatorInfo* pDownstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo,
66,463✔
2091
                                     SOperatorInfo** pOptrOut) {
2092
  SExternalWindowPhysiNode* pPhynode = (SExternalWindowPhysiNode*)pNode;
66,463✔
2093
  QRY_PARAM_CHECK(pOptrOut);
66,463✔
2094
  int32_t                  code = 0;
66,463✔
2095
  int32_t                  lino = 0;
66,463✔
2096
  SExternalWindowOperator* pExtW = taosMemoryCalloc(1, sizeof(SExternalWindowOperator));
66,463✔
2097
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
66,463✔
2098
  pOperator->pPhyNode = pNode;
66,463✔
2099
  if (!pExtW || !pOperator) {
66,463✔
UNCOV
2100
    code = terrno;
×
2101
    lino = __LINE__;
×
2102
    goto _error;
×
2103
  }
2104
  
2105
  setOperatorInfo(pOperator, "ExternalWindowOperator", QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW, true, OP_NOT_OPENED,
66,463✔
2106
                  pExtW, pTaskInfo);
2107
                  
2108
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhynode->window.node.pOutputDataBlockDesc);
66,463✔
2109
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
66,463✔
2110
  initBasicInfo(&pExtW->binfo, pResBlock);
66,463✔
2111

2112
  pExtW->primaryTsIndex = ((SColumnNode*)pPhynode->window.pTspk)->slotId;
66,463✔
2113
  pExtW->mode = pPhynode->window.pProjs ? EEXT_MODE_SCALAR : (pPhynode->window.indefRowsFunc ? EEXT_MODE_INDEFR_FUNC : EEXT_MODE_AGG);
66,463✔
2114
  pExtW->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC;
66,463✔
2115
  pExtW->binfo.outputTsOrder = pExtW->binfo.inputTsOrder;
66,463✔
2116

2117
  if (pTaskInfo->pStreamRuntimeInfo != NULL){
66,463✔
2118
    pTaskInfo->pStreamRuntimeInfo->funcInfo.withExternalWindow = true;
66,463✔
2119
  }
2120

2121
  // pExtW->limitInfo = (SLimitInfo){0};
2122
  // initLimitInfo(pPhynode->window.node.pLimit, pPhynode->window.node.pSlimit, &pExtW->limitInfo);
2123

2124
  if (pPhynode->window.pProjs) {
66,463✔
2125
    int32_t    numOfScalarExpr = 0;
418✔
2126
    SExprInfo* pScalarExprInfo = NULL;
418✔
2127
    code = createExprInfo(pPhynode->window.pProjs, NULL, &pScalarExprInfo, &numOfScalarExpr);
418✔
2128
    QUERY_CHECK_CODE(code, lino, _error);
418✔
2129

2130
    code = initExprSupp(&pExtW->scalarSupp, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
418✔
2131
    QUERY_CHECK_CODE(code, lino, _error);
418✔
2132

2133
  //if (pExtW->multiTableMode) {
2134
    pExtW->pOutputBlocks = taosArrayInit_s(POINTER_BYTES, STREAM_CALC_REQ_MAX_WIN_NUM);
418✔
2135
    if (!pExtW->pOutputBlocks) QUERY_CHECK_CODE(terrno, lino, _error);
418✔
2136
  //}
2137
    pExtW->pFreeBlocks = tdListNew(POINTER_BYTES * 2);
418✔
2138
    QUERY_CHECK_NULL(pExtW->pFreeBlocks, code, lino, _error, terrno);
418✔
2139
  } else if (pExtW->mode == EEXT_MODE_AGG) {
66,045✔
2140
    if (pPhynode->window.pExprs != NULL) {
66,045✔
2141
      int32_t    num = 0;
×
2142
      SExprInfo* pSExpr = NULL;
×
2143
      code = createExprInfo(pPhynode->window.pExprs, NULL, &pSExpr, &num);
×
2144
      QUERY_CHECK_CODE(code, lino, _error);
×
2145
    
2146
      code = initExprSupp(&pExtW->scalarSupp, pSExpr, num, &pTaskInfo->storageAPI.functionStore);
×
2147
      if (code != TSDB_CODE_SUCCESS) {
×
2148
        goto _error;
×
2149
      }
2150
      checkIndefRowsFuncs(&pExtW->scalarSupp);
×
2151
    }
2152
    
2153
    size_t keyBufSize = sizeof(int64_t) * 2 + POINTER_BYTES;
66,045✔
2154
    initResultSizeInfo(&pOperator->resultInfo, 4096);
66,045✔
2155
    //code = blockDataEnsureCapacity(pExtW->binfo.pRes, pOperator->resultInfo.capacity);
2156
    //QUERY_CHECK_CODE(code, lino, _error);
2157

2158
    pExtW->pWinRowIdx = taosArrayInit(4096, sizeof(int64_t));
66,045✔
2159
    TSDB_CHECK_NULL(pExtW->pWinRowIdx, code, lino, _error, terrno);
66,045✔
2160
    
2161
    int32_t num = 0;
66,045✔
2162
    SExprInfo* pExprInfo = NULL;
66,045✔
2163
    code = createExprInfo(pPhynode->window.pFuncs, NULL, &pExprInfo, &num);
66,045✔
2164
    QUERY_CHECK_CODE(code, lino, _error);
66,045✔
2165
    code = initAggSup(&pOperator->exprSupp, &pExtW->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, 0, 0);
66,045✔
2166
    QUERY_CHECK_CODE(code, lino, _error);
65,687✔
2167

2168
    nodesWalkExprs(pPhynode->window.pFuncs, extWinHasCountLikeFunc, &pExtW->hasCountFunc);
65,687✔
2169
    if (pExtW->hasCountFunc) {
66,045✔
2170
      code = extWinCreateEmptyInputBlock(pOperator, &pExtW->pEmptyInputBlock);
48,975✔
2171
      QUERY_CHECK_CODE(code, lino, _error);
48,975✔
2172
      qDebug("%s ext window has CountLikeFunc", pOperator->pTaskInfo->id.str);
48,975✔
2173
    } else {
2174
      qDebug("%s ext window doesn't have CountLikeFunc", pOperator->pTaskInfo->id.str);
17,070✔
2175
    }
2176

2177
    code = initExecTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pTaskInfo->window);
66,045✔
2178
    QUERY_CHECK_CODE(code, lino, _error);
66,045✔
2179

2180
    pExtW->lastSKey = INT64_MIN;
66,045✔
2181
  } else {
2182
    size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
×
2183
    
2184
    if (pPhynode->window.pExprs != NULL) {
×
2185
      int32_t    num = 0;
×
2186
      SExprInfo* pSExpr = NULL;
×
2187
      code = createExprInfo(pPhynode->window.pExprs, NULL, &pSExpr, &num);
×
2188
      QUERY_CHECK_CODE(code, lino, _error);
×
2189
    
2190
      code = initExprSupp(&pExtW->scalarSupp, pSExpr, num, &pTaskInfo->storageAPI.functionStore);
×
2191
      if (code != TSDB_CODE_SUCCESS) {
×
2192
        goto _error;
×
2193
      }
2194
    }
2195
    
2196
    int32_t    numOfExpr = 0;
×
2197
    SExprInfo* pExprInfo = NULL;
×
2198
    code = createExprInfo(pPhynode->window.pFuncs, NULL, &pExprInfo, &numOfExpr);
×
2199
    TSDB_CHECK_CODE(code, lino, _error);
×
2200
    
2201
    code = initAggSup(&pOperator->exprSupp, &pExtW->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str,
×
2202
                              pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
×
2203
    TSDB_CHECK_CODE(code, lino, _error);
×
2204
    pOperator->exprSupp.hasWindowOrGroup = false;
×
2205
    
2206
    //code = setFunctionResultOutput(pOperator, &pExtW->binfo, &pExtW->aggSup, MAIN_SCAN, numOfExpr);
2207
    //TSDB_CHECK_CODE(code, lino, _error);
2208
    
2209
    code = filterInitFromNode((SNode*)pNode->pConditions, &pOperator->exprSupp.pFilterInfo, 0,
×
2210
                              pTaskInfo->pStreamRuntimeInfo);
×
2211
    TSDB_CHECK_CODE(code, lino, _error);
×
2212
    
2213
    pExtW->binfo.inputTsOrder = pNode->inputTsOrder;
×
2214
    pExtW->binfo.outputTsOrder = pNode->outputTsOrder;
×
2215
    code = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfExpr, &pExtW->pPseudoColInfo);
×
2216
    TSDB_CHECK_CODE(code, lino, _error);
×
2217

2218
  //if (pExtW->multiTableMode) {
2219
    pExtW->pOutputBlocks = taosArrayInit_s(POINTER_BYTES, STREAM_CALC_REQ_MAX_WIN_NUM);
×
2220
    if (!pExtW->pOutputBlocks) QUERY_CHECK_CODE(terrno, lino, _error);
×
2221
  //}
2222
    pExtW->pFreeBlocks = tdListNew(POINTER_BYTES * 2);
×
2223
    QUERY_CHECK_NULL(pExtW->pFreeBlocks, code, lino, _error, terrno);  
×
2224
  }
2225

2226
  pExtW->pWins = taosArrayInit(4096, sizeof(SExtWinTimeWindow));
66,463✔
2227
  if (!pExtW->pWins) QUERY_CHECK_CODE(terrno, lino, _error);
66,463✔
2228
  
2229
  //initResultRowInfo(&pExtW->binfo.resultRowInfo);
2230

2231
  pExtW->timeRangeExpr = (STimeRangeNode*)pPhynode->pTimeRange;
66,463✔
2232
  if (pExtW->timeRangeExpr) {
66,463✔
2233
    QUERY_CHECK_NULL(pExtW->timeRangeExpr->pStart, code, lino, _error, TSDB_CODE_STREAM_INTERNAL_ERROR);
66,463✔
2234
    QUERY_CHECK_NULL(pExtW->timeRangeExpr->pEnd, code, lino, _error, TSDB_CODE_STREAM_INTERNAL_ERROR);
66,463✔
2235
  }
2236

2237
  if (pPhynode->isSingleTable) {
66,463✔
2238
    pExtW->getWinFp = (pExtW->timeRangeExpr && (pExtW->timeRangeExpr->needCalc || (pTaskInfo->pStreamRuntimeInfo->funcInfo.addOptions & CALC_SLIDING_OVERLAP))) ? extWinGetOvlpWin : extWinGetNoOvlpWin;
12,628✔
2239
    pExtW->multiTableMode = false;
12,628✔
2240
  } else {
2241
    pExtW->getWinFp = (pExtW->timeRangeExpr && (pExtW->timeRangeExpr->needCalc || (pTaskInfo->pStreamRuntimeInfo->funcInfo.addOptions & CALC_SLIDING_OVERLAP))) ? extWinGetMultiTbOvlpWin : extWinGetMultiTbNoOvlpWin;
53,835✔
2242
    pExtW->multiTableMode = true;
53,835✔
2243
  }
2244
  pExtW->inputHasOrder = pPhynode->inputHasOrder;
66,463✔
2245

2246
  pOperator->fpSet = createOperatorFpSet(extWinOpen, extWinNext, NULL, destroyExternalWindowOperatorInfo,
66,463✔
2247
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2248
  setOperatorResetStateFn(pOperator, resetExternalWindowOperator);
66,463✔
2249
  code = appendDownstream(pOperator, &pDownstream, 1);
66,463✔
2250
  if (code != 0) {
66,463✔
2251
    goto _error;
×
2252
  }
2253

2254
  *pOptrOut = pOperator;
66,463✔
2255
  return code;
66,463✔
2256

2257
_error:
×
2258

2259
  if (pExtW != NULL) {
×
2260
    destroyExternalWindowOperatorInfo(pExtW);
×
2261
  }
2262

2263
  destroyOperatorAndDownstreams(pOperator, &pDownstream, 1);
×
2264
  pTaskInfo->code = code;
×
2265
  qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code));
×
2266
  return code;
×
2267
}
2268

2269

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc