• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4996

19 Mar 2026 02:16AM UTC coverage: 72.069% (+0.07%) from 71.996%
#4996

push

travis-ci

web-flow
feat: SQL firewall black/white list (#34798)

461 of 618 new or added lines in 4 files covered. (74.6%)

380 existing lines in 128 files now uncovered.

245359 of 340448 relevant lines covered (72.07%)

135732617.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.87
/source/libs/executor/src/externalwindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "operator.h"
18
#include "querytask.h"
19
#include "tdatablock.h"
20
#include "stream.h"
21
#include "filter.h"
22
#include "cmdnodes.h"
23

24
typedef struct SBlockList {
25
  const SSDataBlock* pSrcBlock;
26
  SList*             pBlocks;
27
  int32_t            blockRowNumThreshold;
28
} SBlockList;
29

30

31
typedef int32_t (*extWinGetWinFp)(SOperatorInfo*, int64_t*, int32_t*, SDataBlockInfo*, SExtWinTimeWindow**, int32_t*);
32

33
typedef struct SExtWindowStat {
34
  int64_t resBlockCreated;
35
  int64_t resBlockDestroyed;
36
  int64_t resBlockRecycled;
37
  int64_t resBlockReused;
38
  int64_t resBlockAppend;
39
} SExtWindowStat;
40

41
typedef struct SExternalWindowOperator {
42
  SOptrBasicInfo     binfo;
43
  SExprSupp          scalarSupp;
44
  int32_t            primaryTsIndex;
45
  EExtWinMode        mode;
46
  bool               multiTableMode;
47
  bool               inputHasOrder;
48
  SArray*            pWins;           // SArray<SExtWinTimeWindow>
49
  SArray*            pPseudoColInfo;  
50
  STimeRangeNode*    timeRangeExpr;
51
  
52
  extWinGetWinFp     getWinFp;
53

54
  bool               blkWinStartSet;
55
  int32_t            blkWinStartIdx;
56
  int32_t            blkWinIdx;
57
  int32_t            blkRowStartIdx;
58
  int32_t            outputWinId;
59
  int32_t            outputWinNum;
60
  int32_t            outWinIdx;
61

62
  // for project&indefRows
63
  SList*             pFreeBlocks;    // SList<SSDatablock*+SAarray*>
64
  SArray*            pOutputBlocks;  // SArray<SList*>, for each window, we have a list of blocks
65
  SListNode*         pLastBlkNode; 
66
  SSDataBlock*       pTmpBlock;
67
  
68
  // for agg
69
  SAggSupporter      aggSup;
70
  STimeWindowAggSupp twAggSup;
71

72
  int32_t            resultRowCapacity;
73
  SResultRow*        pResultRow;
74

75
  int64_t            lastSKey;
76
  int64_t            lastEKey;
77
  int32_t            lastWinId;
78
  SSDataBlock*       pEmptyInputBlock;
79
  bool               hasCountFunc;
80
  SExtWindowStat     stat;
81
  SArray*            pWinRowIdx;
82

83
  // for vtable window query
84
  bool               isDynWindow;
85
  int32_t            orgTableVgId;
86
  tb_uid_t           orgTableUid;
87
  STimeWindow        orgTableTimeRange;
88
} SExternalWindowOperator;
89

90

91
static int32_t extWinBlockListAddBlock(SExternalWindowOperator* pExtW, SList* pList, int32_t rows, SSDataBlock** ppBlock, SArray** ppIdx) {
1,238✔
92
  SSDataBlock* pRes = NULL;
1,238✔
93
  int32_t code = 0, lino = 0;
1,238✔
94

95
  if (listNEles(pExtW->pFreeBlocks) > 0) {
1,238✔
96
    SListNode* pNode = tdListPopHead(pExtW->pFreeBlocks);
×
97
    *ppBlock = *(SSDataBlock**)pNode->data;
×
98
    *ppIdx = *(SArray**)((SArray**)pNode->data + 1);
×
99
    tdListAppendNode(pList, pNode);
×
100
    pExtW->stat.resBlockReused++;
×
101
  } else {
102
    TAOS_CHECK_EXIT(createOneDataBlock(pExtW->binfo.pRes, false, &pRes));
1,238✔
103
    TAOS_CHECK_EXIT(blockDataEnsureCapacity(pRes, TMAX(rows, 4096)));
1,238✔
104
    SArray* pIdx = taosArrayInit(10, sizeof(int64_t));
1,238✔
105
    TSDB_CHECK_NULL(pIdx, code, lino, _exit, terrno);
1,238✔
106
    void* res[2] = {pRes, pIdx};
1,238✔
107
    TAOS_CHECK_EXIT(tdListAppend(pList, res));
1,238✔
108

109
    *ppBlock = pRes;
1,238✔
110
    *ppIdx = pIdx;
1,238✔
111
    pExtW->stat.resBlockCreated++;
1,238✔
112
  }
113
  
114
_exit:
1,238✔
115

116
  if (code) {
1,238✔
117
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
118
    blockDataDestroy(pRes);
×
119
  }
120
  
121
  return code;
1,238✔
122
}
123

124
static int32_t extWinGetLastBlockFromList(SExternalWindowOperator* pExtW, SList* pList, int32_t rows, SSDataBlock** ppBlock, SArray** ppIdx) {
2,651✔
125
  int32_t    code = 0, lino = 0;
2,651✔
126
  SSDataBlock* pRes = NULL;
2,651✔
127

128
  SListNode* pNode = TD_DLIST_TAIL(pList);
2,651✔
129
  if (NULL == pNode) {
2,651✔
130
    TAOS_CHECK_EXIT(extWinBlockListAddBlock(pExtW, pList, rows, ppBlock, ppIdx));
1,238✔
131
    return code;
1,238✔
132
  }
133

134
  pRes = *(SSDataBlock**)pNode->data;
1,413✔
135
  if ((pRes->info.rows + rows) > pRes->info.capacity) {
1,413✔
136
    TAOS_CHECK_EXIT(extWinBlockListAddBlock(pExtW, pList, rows, ppBlock, ppIdx));
×
137
    return code;
×
138
  }
139

140
  *ppIdx = *(SArray**)((SSDataBlock**)pNode->data + 1);
1,413✔
141
  *ppBlock = pRes;
1,413✔
142
  pExtW->stat.resBlockAppend++;
1,413✔
143

144
_exit:
1,413✔
145

146
  if (code) {
1,413✔
147
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
148
  }
149
  
150
  return code;
1,413✔
151
}
152

153
static void extWinDestroyBlockList(void* p) {
40,837,120✔
154
  if (NULL == p) {
40,837,120✔
155
    return;
×
156
  }
157

158
  SListNode* pTmp = NULL;
40,837,120✔
159
  SList** ppList = (SList**)p;
40,837,120✔
160
  if ((*ppList) && TD_DLIST_NELES(*ppList) > 0) {
40,837,120✔
161
    SListNode* pNode = TD_DLIST_HEAD(*ppList);
×
162
    while (pNode) {
×
163
      SSDataBlock* pBlock = *(SSDataBlock**)pNode->data;
×
164
      blockDataDestroy(pBlock);
×
165
      SArray* pIdx = *(SArray**)((SArray**)pNode->data + 1);
×
166
      taosArrayDestroy(pIdx);
×
167
      pTmp = pNode;
×
168
      pNode = pNode->dl_next_;
×
169
      taosMemoryFree(pTmp);
×
170
    }
171
  }
172
  taosMemoryFree(*ppList);
40,837,120✔
173
}
174

175

176
static void extWinRecycleBlkNode(SExternalWindowOperator* pExtW, SListNode** ppNode) {
1,879,103✔
177
  if (NULL == ppNode || NULL == *ppNode) {
1,879,103✔
178
    return;
1,879,326✔
179
  }
180

181
  SSDataBlock* pBlock = *(SSDataBlock**)(*ppNode)->data;
7✔
182
  SArray* pIdx = *(SArray**)((SArray**)(*ppNode)->data + 1);
241✔
183
  
184
  if (listNEles(pExtW->pFreeBlocks) >= 10) {
241✔
185
    blockDataDestroy(pBlock);
×
186
    taosArrayDestroy(pIdx);
×
187
    taosMemoryFreeClear(*ppNode);
×
188
    pExtW->stat.resBlockDestroyed++;
×
189
    return;
×
190
  }
191
  
192
  blockDataCleanup(pBlock);
241✔
193
  taosArrayClear(pIdx);
241✔
194
  tdListPrependNode(pExtW->pFreeBlocks, *ppNode);
241✔
195
  *ppNode = NULL;
241✔
196
  pExtW->stat.resBlockRecycled++;
241✔
197
}
198

199
static void extWinRecycleBlockList(SExternalWindowOperator* pExtW, void* p) {
1,238✔
200
  if (NULL == p) {
1,238✔
201
    return;
×
202
  }
203

204
  SListNode* pTmp = NULL;
1,238✔
205
  SList** ppList = (SList**)p;
1,238✔
206
  if ((*ppList) && TD_DLIST_NELES(*ppList) > 0) {
1,238✔
207
    SListNode* pNode = TD_DLIST_HEAD(*ppList);
×
208
    while (pNode) {
×
209
      pTmp = pNode;
×
210
      pNode = pNode->dl_next_;
×
211
      extWinRecycleBlkNode(pExtW, &pTmp);
×
212
    }
213
  }
214
  taosMemoryFree(*ppList);
1,238✔
215
}
216
static void extWinDestroyBlkNode(SExternalWindowOperator* pInfo, SListNode* pNode) {
1,087,631✔
217
  if (NULL == pNode) {
1,087,631✔
218
    return;
1,086,393✔
219
  }
220

221
  SSDataBlock* pBlock = *(SSDataBlock**)pNode->data;
1,238✔
222
  SArray* pIdx = *(SArray**)((SArray**)pNode->data + 1);
1,238✔
223
  
224
  blockDataDestroy(pBlock);
1,238✔
225
  taosArrayDestroy(pIdx);
1,238✔
226

227
  taosMemoryFree(pNode);
1,238✔
228

229
  pInfo->stat.resBlockDestroyed++;
1,238✔
230
}
231

232

233
void destroyExternalWindowOperatorInfo(void* param) {
1,087,390✔
234
  if (NULL == param) {
1,087,390✔
235
    return;
×
236
  }
237
  SExternalWindowOperator* pInfo = (SExternalWindowOperator*)param;
1,087,390✔
238
  cleanupBasicInfo(&pInfo->binfo);
1,087,390✔
239

240
  taosArrayDestroyEx(pInfo->pOutputBlocks, extWinDestroyBlockList);
1,087,390✔
241
  taosArrayDestroy(pInfo->pWins);
1,087,390✔
242
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
1,087,390✔
243
  taosArrayDestroy(pInfo->pWinRowIdx);
1,087,390✔
244
  
245
  taosArrayDestroy(pInfo->pPseudoColInfo);
1,087,390✔
246
  blockDataDestroy(pInfo->pTmpBlock);
1,087,390✔
247
  blockDataDestroy(pInfo->pEmptyInputBlock);
1,087,390✔
248

249
  extWinDestroyBlkNode(pInfo, pInfo->pLastBlkNode);
1,087,390✔
250
  if (pInfo->pFreeBlocks) {
1,087,390✔
251
    SListNode *node;
252
    while ((node = TD_DLIST_HEAD(pInfo->pFreeBlocks)) != NULL) {
1,238✔
253
      TD_DLIST_POP(pInfo->pFreeBlocks, node);
241✔
254
      extWinDestroyBlkNode(pInfo, node);
241✔
255
    }
256
    taosMemoryFree(pInfo->pFreeBlocks);
997✔
257
  }
258
  
259
  cleanupAggSup(&pInfo->aggSup);
1,087,390✔
260
  cleanupExprSupp(&pInfo->scalarSupp);
1,087,390✔
261
  taosMemoryFreeClear(pInfo->pResultRow);
1,087,390✔
262

263
  pInfo->binfo.resultRowInfo.openWindow = tdListFree(pInfo->binfo.resultRowInfo.openWindow);
1,087,390✔
264

265
  qDebug("ext window stat at destroy, created:%" PRId64 ", destroyed:%" PRId64 ", recycled:%" PRId64 ", reused:%" PRId64 ", append:%" PRId64, 
1,087,390✔
266
      pInfo->stat.resBlockCreated, pInfo->stat.resBlockDestroyed, pInfo->stat.resBlockRecycled, 
267
      pInfo->stat.resBlockReused, pInfo->stat.resBlockAppend);
268

269
  taosMemoryFreeClear(pInfo);
1,087,390✔
270
}
271

272
static int32_t extWinOpen(SOperatorInfo* pOperator);
273
static int32_t extWinNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
274

275
typedef struct SMergeAlignedExternalWindowOperator {
276
  SExternalWindowOperator* pExtW;
277
  int64_t curTs;
278
  SResultRow*  pResultRow;
279
} SMergeAlignedExternalWindowOperator;
280

281
void destroyMergeAlignedExternalWindowOperator(void* pOperator) {
17,345✔
282
  SMergeAlignedExternalWindowOperator* pMlExtInfo = (SMergeAlignedExternalWindowOperator*)pOperator;
17,345✔
283
  destroyExternalWindowOperatorInfo(pMlExtInfo->pExtW);
17,345✔
284
  taosMemoryFreeClear(pMlExtInfo);
17,345✔
285
}
17,345✔
286

287
int64_t* extWinExtractTsCol(SSDataBlock* pBlock, int32_t primaryTsIndex, SExecTaskInfo* pTaskInfo) {
5,509,261✔
288
  TSKEY* tsCols = NULL;
5,509,261✔
289

290
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
5,509,261✔
291
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, primaryTsIndex);
5,509,261✔
292
    if (!pColDataInfo) {
5,509,026✔
293
      pTaskInfo->code = terrno;
×
294
      T_LONG_JMP(pTaskInfo->env, terrno);
×
295
    }
296

297
    tsCols = (int64_t*)pColDataInfo->pData;
5,509,026✔
298
    if (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0) {
5,509,026✔
299
      int32_t code = blockDataUpdateTsWindow(pBlock, primaryTsIndex);
5,509,026✔
300
      if (code != TSDB_CODE_SUCCESS) {
5,509,261✔
301
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
1,169✔
302
        pTaskInfo->code = code;
1,169✔
303
        T_LONG_JMP(pTaskInfo->env, code);
×
304
      }
305
    }
306
  }
307

308
  return tsCols;
5,508,092✔
309
}
310

311
static int32_t extWinGetCurWinIdx(SExecTaskInfo* pTaskInfo) {
2,147,483,647✔
312
  if (!pTaskInfo->pStreamRuntimeInfo) {
2,147,483,647✔
313
    return 0;
2,147,483,647✔
314
  }
315
  return pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx;
287,690,724✔
316
}
317

318
static void extWinIncCurWinIdx(SOperatorInfo* pOperator) {
×
319
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
×
320
  pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx++;
×
321
}
×
322

323
static void extWinSetCurWinIdx(SOperatorInfo* pOperator, int32_t idx) {
2,147,483,647✔
324
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2,147,483,647✔
325
  if (pTaskInfo->pStreamRuntimeInfo) {
2,147,483,647✔
326
    pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx = idx;
195,617,177✔
327
  }
328
}
2,147,483,647✔
329

330

331
static void extWinIncCurWinOutIdx(SStreamRuntimeInfo* pStreamRuntimeInfo) {
1,238✔
332
  pStreamRuntimeInfo->funcInfo.curOutIdx++;
1,238✔
333
}
1,238✔
334

335

336
static const STimeWindow* extWinGetNextWin(SExternalWindowOperator* pExtW, SExecTaskInfo* pTaskInfo) {
×
337
  int32_t curIdx = extWinGetCurWinIdx(pTaskInfo);
×
338
  if (curIdx + 1 >= pExtW->pWins->size) return NULL;
×
339
  return taosArrayGet(pExtW->pWins, curIdx + 1);
×
340
}
341

342

343
static int32_t extWinAppendWinIdx(SExecTaskInfo*       pTaskInfo, SArray* pIdx, SSDataBlock* pBlock, int32_t currWinIdx, int32_t rows) {
2,147,483,647✔
344
  int32_t  code = 0, lino = 0;
2,147,483,647✔
345
  int64_t* lastRes = taosArrayGetLast(pIdx);
2,147,483,647✔
346
  int32_t* lastWinIdx = (int32_t*)lastRes;
2,147,483,647✔
347
  int32_t* lastRowIdx = lastWinIdx ? (lastWinIdx + 1) : NULL;
2,147,483,647✔
348
  int64_t  res = 0;
2,147,483,647✔
349
  int32_t* pWinIdx = (int32_t*)&res;
2,147,483,647✔
350
  int32_t* pRowIdx = pWinIdx + 1;
2,147,483,647✔
351

352
  if (lastWinIdx && *lastWinIdx == currWinIdx) {
2,147,483,647✔
353
    return code;
2,147,483,647✔
354
  }
355

356
  *pWinIdx = currWinIdx;
93,400,955✔
357
  *pRowIdx = pBlock->info.rows - rows;
93,401,575✔
358

359
  TSDB_CHECK_NULL(taosArrayPush(pIdx, &res), code, lino, _exit, terrno);
93,394,675✔
360

361
_exit:
93,394,675✔
362

363
  if (code) {
93,394,909✔
364
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
365
  }
366

367
  return code;
93,393,915✔
368
}
369

370
static int32_t extWinRebuildWinIdxByFilter(SExecTaskInfo* pTaskInfo, SArray* pIdx, int32_t rowsBeforeFilter,
235✔
371
                                           SColumnInfoData* pFilterRes) {
372
  int32_t code = TSDB_CODE_SUCCESS;
235✔
373
  int32_t lino = 0;
235✔
374

375
  if (pIdx == NULL || pFilterRes == NULL || rowsBeforeFilter <= 0 || taosArrayGetSize(pIdx) == 0) {
235✔
376
    return code;
×
377
  }
378

379
  int32_t idxSize = (int32_t)taosArrayGetSize(pIdx);
235✔
380
  SArray* pNewIdx = taosArrayInit(idxSize, sizeof(int64_t));
235✔
381
  TSDB_CHECK_NULL(pNewIdx, code, lino, _exit, terrno);
235✔
382

383
  int8_t* pIndicator = (int8_t*)pFilterRes->pData;
235✔
384
  int32_t newRowStart = 0;
235✔
385
  for (int32_t i = 0; i < idxSize; ++i) {
470✔
386
    int64_t cur = *(int64_t*)taosArrayGet(pIdx, i);
235✔
387
    int32_t* pCurWinIdx = (int32_t*)&cur;
235✔
388
    int32_t* pCurRowIdx = pCurWinIdx + 1;
235✔
389

390
    int32_t startRow = *pCurRowIdx;
235✔
391
    int32_t endRow = rowsBeforeFilter;
235✔
392
    if (i + 1 < idxSize) {
235✔
393
      int64_t next = *(int64_t*)taosArrayGet(pIdx, i + 1);
×
394
      int32_t* pNextWinIdx = (int32_t*)&next;
×
395
      int32_t* pNextRowIdx = pNextWinIdx + 1;
×
396
      endRow = *pNextRowIdx;
×
397
    }
398

399
    startRow = TMIN(TMAX(startRow, 0), rowsBeforeFilter);
235✔
400
    endRow = TMIN(TMAX(endRow, 0), rowsBeforeFilter);
235✔
401
    if (endRow <= startRow) {
235✔
402
      continue;
×
403
    }
404

405
    int32_t survivedRows = 0;
235✔
406
    for (int32_t r = startRow; r < endRow; ++r) {
470✔
407
      if (pIndicator[r]) {
235✔
408
        survivedRows++;
×
409
      }
410
    }
411

412
    if (survivedRows <= 0) {
235✔
413
      continue;
235✔
414
    }
415

416
    int64_t out = 0;
×
417
    int32_t* pOutWinIdx = (int32_t*)&out;
×
418
    int32_t* pOutRowIdx = pOutWinIdx + 1;
×
419
    *pOutWinIdx = *pCurWinIdx;
×
420
    *pOutRowIdx = newRowStart;
×
421
    TSDB_CHECK_NULL(taosArrayPush(pNewIdx, &out), code, lino, _exit, terrno);
×
422

423
    newRowStart += survivedRows;
×
424
  }
425

426
  taosArrayClear(pIdx);
235✔
427
  int32_t newSize = (int32_t)taosArrayGetSize(pNewIdx);
235✔
428
  if (newSize > 0) {
235✔
429
    void* dest = taosArrayReserve(pIdx, newSize);
×
430
    TSDB_CHECK_NULL(dest, code, lino, _exit, terrno);
×
431
    memcpy(dest, pNewIdx->pData, (size_t)newSize * sizeof(int64_t));
×
432
  }
433

434
_exit:
235✔
435
  taosArrayDestroy(pNewIdx);
235✔
436
  if (code != TSDB_CODE_SUCCESS) {
235✔
437
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
438
  }
439
  return code;
235✔
440
}
441

442

443
static int32_t mergeAlignExtWinSetOutputBuf(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, const STimeWindow* pWin, SResultRow** pResult,
3,017,458✔
444
                                       SExprSupp* pExprSup, SAggSupporter* pAggSup) {
445
  if (*pResult == NULL) {
3,017,458✔
446
    *pResult = getNewResultRow(pAggSup->pResultBuf, &pAggSup->currentPageId, pAggSup->resultRowSize);
17,805✔
447
    if (!*pResult) {
17,805✔
448
      qError("get new resultRow failed, err:%s", tstrerror(terrno));
×
449
      return terrno;
×
450
    }
451
    pResultRowInfo->cur = (SResultRowPosition){.pageId = (*pResult)->pageId, .offset = (*pResult)->offset};
17,805✔
452
  }
453
  
454
  (*pResult)->win = *pWin;
3,017,458✔
455
  (*pResult)->winIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
3,017,458✔
456
  
457
  return setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
3,016,756✔
458
}
459

460

461
static int32_t mergeAlignExtWinGetWinFromTs(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW, TSKEY ts, STimeWindow** ppWin) {
3,012,778✔
462
  int32_t blkWinIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
3,012,778✔
463
  
464
  // TODO handle desc order
465
  for (int32_t i = blkWinIdx; i < pExtW->pWins->size; ++i) {
6,002,603✔
466
    STimeWindow* pWin = taosArrayGet(pExtW->pWins, i);
5,995,817✔
467
    if (ts == pWin->skey) {
5,994,647✔
468
      extWinSetCurWinIdx(pOperator, i);
3,011,842✔
469
      *ppWin = pWin;
3,010,906✔
470
      return TSDB_CODE_SUCCESS;
3,010,906✔
471
    } else if (ts < pWin->skey) {
2,989,825✔
472
      qError("invalid ts %" PRId64 " for current window idx %d skey %" PRId64, ts, i, pWin->skey);
×
473
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
474
    }
475
  }
476
  
477
  qError("invalid ts %" PRId64 " to find merge aligned ext window, size:%d", ts, (int32_t)pExtW->pWins->size);
×
478
  return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
479
}
480

481
static int32_t mergeAlignExtWinFinalizeResult(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, SSDataBlock* pResultBlock) {
3,011,140✔
482
  int32_t        code = 0, lino = 0;
3,011,140✔
483
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
3,011,140✔
484
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
3,011,374✔
485
  SExprSupp*     pSup = &pOperator->exprSupp;
3,011,374✔
486
  SResultRow*  pResultRow = pMlExtInfo->pResultRow;
3,011,374✔
487
  
488
  finalizeResultRows(pExtW->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pOperator->pTaskInfo);
3,011,374✔
489
  
490
  if (pResultRow->numOfRows > 0) {
3,008,566✔
491
    TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pExtW->pWinRowIdx, pResultBlock, pResultRow->winIdx, pResultRow->numOfRows));
3,008,566✔
492
  }
493

494
_exit:
3,005,290✔
495

496
  if (code) {
3,005,290✔
497
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
498
  }
499

500
  return code;
3,004,120✔
501
}
502

503
static int32_t mergeAlignExtWinAggDo(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock, SSDataBlock* pResultBlock) {
25,527✔
504
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
25,527✔
505
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
25,527✔
506

507
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
25,527✔
508
  SExprSupp*     pSup = &pOperator->exprSupp;
25,527✔
509
  int32_t        code = 0, lino = 0;
25,527✔
510
  STimeWindow *pWin = NULL;
25,527✔
511

512
  int32_t startPos = 0;
25,527✔
513
  int64_t* tsCols = extWinExtractTsCol(pBlock, pExtW->primaryTsIndex, pTaskInfo);
25,527✔
514
  TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
25,527✔
515
  
516
  code = mergeAlignExtWinGetWinFromTs(pOperator, pExtW, ts, &pWin);
25,527✔
517
  if (code) {
25,527✔
518
    qError("failed to get time window for ts:%" PRId64 ", prim ts index:%d, error:%s", ts, pExtW->primaryTsIndex, tstrerror(code));
×
519
    TAOS_CHECK_EXIT(code);
×
520
  }
521

522
  if (pMlExtInfo->curTs != INT64_MIN && pMlExtInfo->curTs != pWin->skey) {
25,527✔
523
    TAOS_CHECK_EXIT(mergeAlignExtWinFinalizeResult(pOperator, pResultRowInfo, pResultBlock));
2,106✔
524
    resetResultRow(pMlExtInfo->pResultRow, pExtW->aggSup.resultRowSize - sizeof(SResultRow));
2,106✔
525
  }
526
  
527
  TAOS_CHECK_EXIT(mergeAlignExtWinSetOutputBuf(pOperator, pResultRowInfo, pWin, &pMlExtInfo->pResultRow, pSup, &pExtW->aggSup));
25,527✔
528

529
  int32_t currPos = startPos;
25,527✔
530
  pMlExtInfo->curTs = pWin->skey;
25,527✔
531
  
532
  while (++currPos < pBlock->info.rows) {
9,041,086✔
533
    if (tsCols[currPos] == pMlExtInfo->curTs) continue;
9,014,857✔
534

535
    qDebug("current ts:%" PRId64 ", startPos:%d, currPos:%d, tsCols[currPos]:%" PRId64,
2,991,697✔
536
      pMlExtInfo->curTs, startPos, currPos, tsCols[currPos]); 
537
    TAOS_CHECK_EXIT(applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pExtW->twAggSup.timeWindowData, startPos,
2,991,931✔
538
                                           currPos - startPos, pBlock->info.rows, pSup->numOfExprs));
539

540
    TAOS_CHECK_EXIT(mergeAlignExtWinFinalizeResult(pOperator, pResultRowInfo, pResultBlock));
2,991,463✔
541
    resetResultRow(pMlExtInfo->pResultRow, pExtW->aggSup.resultRowSize - sizeof(SResultRow));
2,985,145✔
542

543
    TAOS_CHECK_EXIT(mergeAlignExtWinGetWinFromTs(pOperator, pExtW, tsCols[currPos], &pWin));
2,986,315✔
544
    
545
    qDebug("ext window align2 start:%" PRId64 ", end:%" PRId64, pWin->skey, pWin->ekey);
2,985,145✔
546
    startPos = currPos;
2,992,399✔
547
    
548
    TAOS_CHECK_EXIT(mergeAlignExtWinSetOutputBuf(pOperator, pResultRowInfo, pWin, &pMlExtInfo->pResultRow, pSup, &pExtW->aggSup));
2,992,399✔
549

550
    pMlExtInfo->curTs = pWin->skey;
2,991,931✔
551
  }
552

553
  code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pExtW->twAggSup.timeWindowData, startPos,
51,054✔
554
                                         currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
25,527✔
555

556
_exit:
25,527✔
557

558
  if (code != 0) {
25,527✔
559
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
560
    T_LONG_JMP(pTaskInfo->env, code);
×
561
  }
562
  
563
  return code;
25,527✔
564
}
565

566
static int32_t mergeAlignExtWinBuildWinRowIdx(SOperatorInfo* pOperator, SSDataBlock* pInput, SSDataBlock* pResult) {
×
567
  SExternalWindowOperator* pExtW = pOperator->info;
×
568
  int64_t* tsCols = extWinExtractTsCol(pInput, pExtW->primaryTsIndex, pOperator->pTaskInfo);
×
569
  STimeWindow* pWin = NULL;
×
570
  int32_t code = 0, lino = 0;
×
571
  int64_t prevTs = INT64_MIN;
×
572
  
573
  for (int32_t i = 0; i < pInput->info.rows; ++i) {
×
574
    if (prevTs == tsCols[i]) {
×
575
      continue;
×
576
    }
577
    
578
    TAOS_CHECK_EXIT(mergeAlignExtWinGetWinFromTs(pOperator, pExtW, tsCols[i], &pWin));
×
579
    TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pExtW->pWinRowIdx, pResult, extWinGetCurWinIdx(pOperator->pTaskInfo), pInput->info.rows - i));
×
580

581
    prevTs = tsCols[i];
×
582
  }
583

584
_exit:
×
585

586
  if (code != 0) {
×
587
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
588
  }
589

590
  return code;  
×
591
}
592

593
static int32_t mergeAlignExtWinProjectDo(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
×
594
                                            SSDataBlock* pResultBlock) {
595
  SExternalWindowOperator* pExtW = pOperator->info;
×
596
  SExprSupp*               pExprSup = &pExtW->scalarSupp;
×
597
  int32_t                  code = 0, lino = 0;
×
598
  
599
  TAOS_CHECK_EXIT(projectApplyFunctions(pExprSup->pExprInfo, pResultBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
×
600
                        GET_STM_RTINFO(pOperator->pTaskInfo)));
601

602
  TAOS_CHECK_EXIT(mergeAlignExtWinBuildWinRowIdx(pOperator, pBlock, pResultBlock));
×
603

604
_exit:
×
605

606
  if (code != 0) {
×
607
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
608
  }
609

610
  return code;
×
611
}
612

613
void mergeAlignExtWinDo(SOperatorInfo* pOperator) {
18,967✔
614
  SExecTaskInfo*                       pTaskInfo = pOperator->pTaskInfo;
18,967✔
615
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
18,967✔
616
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
18,967✔
617
  SResultRow*                          pResultRow = NULL;
18,967✔
618
  int32_t                              code = 0;
18,967✔
619
  SSDataBlock*                         pRes = pExtW->binfo.pRes;
18,967✔
620
  SExprSupp*                           pSup = &pOperator->exprSupp;
18,967✔
621
  int32_t                              lino = 0;
18,967✔
622

623
  taosArrayClear(pExtW->pWinRowIdx);
18,967✔
624
  blockDataCleanup(pRes);
18,967✔
625

626
  while (1) {
24,825✔
627
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
43,792✔
628

629
    if (pBlock == NULL) {
43,792✔
630
      // close last time window
631
      if (pMlExtInfo->curTs != INT64_MIN && EEXT_MODE_AGG == pExtW->mode) {
18,265✔
632
        TAOS_CHECK_EXIT(mergeAlignExtWinFinalizeResult(pOperator, &pExtW->binfo.resultRowInfo, pRes));
17,805✔
633
      }
634
      setOperatorCompleted(pOperator);
18,265✔
635
      break;
18,265✔
636
    }
637

638
    pRes->info.scanFlag = pBlock->info.scanFlag;
25,527✔
639
    code = setInputDataBlock(pSup, pBlock, pExtW->binfo.inputTsOrder, pBlock->info.scanFlag, true);
25,527✔
640
    QUERY_CHECK_CODE(code, lino, _exit);
25,527✔
641

642
    printDataBlock(pBlock, __func__, "externalwindowAlign", pTaskInfo->id.queryId);
25,527✔
643
    qDebug("ext windowpExtWAlign->scalarMode:%d", pExtW->mode);
25,527✔
644

645
    if (EEXT_MODE_SCALAR == pExtW->mode) {
25,527✔
646
      TAOS_CHECK_EXIT(mergeAlignExtWinProjectDo(pOperator, &pExtW->binfo.resultRowInfo, pBlock, pRes));
×
647
    } else {
648
      TAOS_CHECK_EXIT(mergeAlignExtWinAggDo(pOperator, &pExtW->binfo.resultRowInfo, pBlock, pRes));
25,527✔
649
    }
650

651
    if (pRes->info.rows >= pOperator->resultInfo.threshold) {
25,527✔
652
      break;
702✔
653
    }
654
  }
655

656
  pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamBlkWinIdx = pExtW->pWinRowIdx;
18,967✔
657
  
658
_exit:
18,967✔
659

660
  if (code != 0) {
18,967✔
661
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
662
    pTaskInfo->code = code;
×
663
    T_LONG_JMP(pTaskInfo->env, code);
×
664
  }
665
}
18,967✔
666

667
static int32_t mergeAlignExtWinNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
36,772✔
668
  SExecTaskInfo*                       pTaskInfo = pOperator->pTaskInfo;
36,772✔
669
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
36,772✔
670
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
36,772✔
671
  int32_t                              code = 0;
36,772✔
672
  int32_t lino = 0;
36,772✔
673

674
  if (pOperator->status == OP_EXEC_DONE) {
36,772✔
675
    (*ppRes) = NULL;
17,805✔
676
    return TSDB_CODE_SUCCESS;
17,805✔
677
  }
678

679
  SSDataBlock* pRes = pExtW->binfo.pRes;
18,967✔
680
  blockDataCleanup(pRes);
18,967✔
681

682
  if (taosArrayGetSize(pExtW->pWins) <= 0) {
18,967✔
683
    size_t size = taosArrayGetSize(pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPesudoFuncVals);
18,265✔
684
    STimeWindow* pWin = taosArrayReserve(pExtW->pWins, size);
18,265✔
685
    TSDB_CHECK_NULL(pWin, code, lino, _exit, terrno);
18,265✔
686

687
    for (int32_t i = 0; i < size; ++i) {
3,030,801✔
688
      SSTriggerCalcParam* pParam = taosArrayGet(pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPesudoFuncVals, i);
3,012,536✔
689
      pWin[i].skey = pParam->wstart;
3,012,536✔
690
      pWin[i].ekey = pParam->wstart + 1;
3,012,536✔
691
    }
692
    
693
    pExtW->outputWinId = pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx;
18,265✔
694
  }
695

696
  mergeAlignExtWinDo(pOperator);
18,967✔
697
  
698
  size_t rows = pRes->info.rows;
18,967✔
699
  pOperator->resultInfo.totalRows += rows;
18,967✔
700
  (*ppRes) = (rows == 0) ? NULL : pRes;
18,967✔
701

702
_exit:
18,967✔
703

704
  if (code != 0) {
18,967✔
705
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
706
    pTaskInfo->code = code;
×
707
    T_LONG_JMP(pTaskInfo->env, code);
×
708
  }
709
  return code;
18,967✔
710
}
711

712
int32_t resetMergeAlignedExtWinOperator(SOperatorInfo* pOperator) {
19,185✔
713
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
19,185✔
714
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
19,185✔
715
  SExecTaskInfo*                       pTaskInfo = pOperator->pTaskInfo;
19,185✔
716
  SMergeAlignedIntervalPhysiNode * pPhynode = (SMergeAlignedIntervalPhysiNode*)pOperator->pPhyNode;
19,185✔
717
  pOperator->status = OP_NOT_OPENED;
19,185✔
718

719
  taosArrayClear(pExtW->pWins);
19,185✔
720

721
  resetBasicOperatorState(&pExtW->binfo);
19,185✔
722
  pMlExtInfo->pResultRow = NULL;
19,185✔
723
  pMlExtInfo->curTs = INT64_MIN;
19,185✔
724

725
  int32_t code = resetAggSup(&pOperator->exprSupp, &pExtW->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
38,370✔
726
                             sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, NULL,
19,185✔
727
                             &pTaskInfo->storageAPI.functionStore);
728
  if (code == 0) {
19,185✔
729
    colDataDestroy(&pExtW->twAggSup.timeWindowData);
19,185✔
730
    code = initExecTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pTaskInfo->window);
19,185✔
731
  }
732
  return code;
19,185✔
733
}
734

735
int32_t createMergeAlignedExternalWindowOperator(SOperatorInfo* pDownstream, SPhysiNode* pNode,
17,345✔
736
                                                 SExecTaskInfo* pTaskInfo, SOperatorInfo** ppOptrOut) {
737
  SMergeAlignedIntervalPhysiNode* pPhynode = (SMergeAlignedIntervalPhysiNode*)pNode;
17,345✔
738
  int32_t code = 0;
17,345✔
739
  int32_t lino = 0;
17,345✔
740
  SMergeAlignedExternalWindowOperator* pMlExtInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedExternalWindowOperator));
17,345✔
741
  SOperatorInfo*                       pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
17,345✔
742

743
  if (pTaskInfo->pStreamRuntimeInfo != NULL){
17,345✔
744
    pTaskInfo->pStreamRuntimeInfo->funcInfo.withExternalWindow = true;
17,345✔
745
  }
746
  pOperator->pPhyNode = pNode;
17,345✔
747
  if (!pMlExtInfo || !pOperator) {
17,345✔
748
    code = terrno;
×
749
    goto _error;
×
750
  }
751

752
  pMlExtInfo->pExtW = taosMemoryCalloc(1, sizeof(SExternalWindowOperator));
17,345✔
753
  if (!pMlExtInfo->pExtW) {
17,345✔
754
    code = terrno;
×
755
    goto _error;
×
756
  }
757

758
  SExternalWindowOperator* pExtW = pMlExtInfo->pExtW;
17,345✔
759
  SExprSupp* pSup = &pOperator->exprSupp;
17,345✔
760
  pSup->hasWindowOrGroup = true;
17,345✔
761
  pSup->hasWindow = true;
17,345✔
762
  pMlExtInfo->curTs = INT64_MIN;
17,345✔
763

764
  pExtW->primaryTsIndex = ((SColumnNode*)pPhynode->window.pTspk)->slotId;
17,345✔
765
  pExtW->mode = pPhynode->window.pProjs ? EEXT_MODE_SCALAR : EEXT_MODE_AGG;
17,345✔
766
  pExtW->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC;
17,345✔
767
  pExtW->binfo.outputTsOrder = pExtW->binfo.inputTsOrder;
17,345✔
768

769
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
17,345✔
770
  initResultSizeInfo(&pOperator->resultInfo, 4096);
17,345✔
771

772
  int32_t num = 0;
17,345✔
773
  SExprInfo* pExprInfo = NULL;
17,345✔
774
  code = createExprInfo(pPhynode->window.pFuncs, NULL, &pExprInfo, &num);
17,345✔
775
  QUERY_CHECK_CODE(code, lino, _error);
17,345✔
776

777
  if (pExtW->mode == EEXT_MODE_AGG) {
17,345✔
778
    code = initAggSup(pSup, &pExtW->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, NULL,
17,345✔
779
                      &pTaskInfo->storageAPI.functionStore);
780
    QUERY_CHECK_CODE(code, lino, _error);
17,345✔
781
  }
782

783
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhynode->window.node.pOutputDataBlockDesc);
17,345✔
784
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
17,345✔
785
  initBasicInfo(&pExtW->binfo, pResBlock);
17,345✔
786

787
  pExtW->pWins = taosArrayInit(4096, sizeof(STimeWindow));
17,345✔
788
  if (!pExtW->pWins) QUERY_CHECK_CODE(terrno, lino, _error);
17,345✔
789

790
  pExtW->pWinRowIdx = taosArrayInit(4096, sizeof(int64_t));
17,345✔
791
  TSDB_CHECK_NULL(pExtW->pWinRowIdx, code, lino, _error, terrno);
17,345✔
792

793
  initResultRowInfo(&pExtW->binfo.resultRowInfo);
17,345✔
794
  code = blockDataEnsureCapacity(pExtW->binfo.pRes, pOperator->resultInfo.capacity);
17,345✔
795
  QUERY_CHECK_CODE(code, lino, _error);
17,345✔
796
  setOperatorInfo(pOperator, "MergeAlignedExternalWindowOperator", QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW, false, OP_NOT_OPENED, pMlExtInfo, pTaskInfo);
17,345✔
797
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignExtWinNext, NULL,
17,345✔
798
                                         destroyMergeAlignedExternalWindowOperator, optrDefaultBufFn, NULL,
799
                                         optrDefaultGetNextExtFn, NULL);
800
  setOperatorResetStateFn(pOperator, resetMergeAlignedExtWinOperator);
17,345✔
801

802
  code = appendDownstream(pOperator, &pDownstream, 1);
17,345✔
803
  QUERY_CHECK_CODE(code, lino, _error);
17,345✔
804
  *ppOptrOut = pOperator;
17,345✔
805
  return code;
17,345✔
806
  
807
_error:
×
808
  if (pMlExtInfo) destroyMergeAlignedExternalWindowOperator(pMlExtInfo);
×
809
  destroyOperatorAndDownstreams(pOperator, &pDownstream, 1);
×
810
  pTaskInfo->code = code;
×
811
  return code;
×
812
}
813

814
static int32_t resetExternalWindowExprSupp(SExternalWindowOperator* pExtW, SExecTaskInfo* pTaskInfo,
348,180✔
815
                                           SExternalWindowPhysiNode* pPhynode) {
816
  int32_t    code = 0, lino = 0, num = 0;
348,180✔
817
  SExprInfo* pExprInfo = NULL;
348,180✔
818
  cleanupExprSuppWithoutFilter(&pExtW->scalarSupp);
348,180✔
819

820
  SNodeList* pNodeList = NULL;
348,180✔
821
  if (pPhynode->window.pProjs) {
348,180✔
822
    pNodeList = pPhynode->window.pProjs;
×
823
  } else {
824
    pNodeList = pPhynode->window.pExprs;
348,180✔
825
  }
826

827
  code = createExprInfo(pNodeList, NULL, &pExprInfo, &num);
348,180✔
828
  QUERY_CHECK_CODE(code, lino, _error);
348,180✔
829
  code = initExprSupp(&pExtW->scalarSupp, pExprInfo, num, &pTaskInfo->storageAPI.functionStore);
348,180✔
830
  QUERY_CHECK_CODE(code, lino, _error);
348,180✔
831
  return code;
348,180✔
832
_error:
×
833
  if (code != TSDB_CODE_SUCCESS) {
×
834
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
835
    pTaskInfo->code = code;
×
836
  }
837
  return code;
×
838
}
839

840
static int32_t resetExternalWindowOperator(SOperatorInfo* pOperator) {
348,180✔
841
  int32_t code = 0, lino = 0;
348,180✔
842
  SExternalWindowOperator* pExtW = pOperator->info;
348,180✔
843
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
348,180✔
844
  SExternalWindowPhysiNode* pPhynode = (SExternalWindowPhysiNode*)pOperator->pPhyNode;
348,180✔
845
  pOperator->status = OP_NOT_OPENED;
348,180✔
846

847
  //resetBasicOperatorState(&pExtW->binfo);
848
  initResultRowInfo(&pExtW->binfo.resultRowInfo);
348,180✔
849

850
  pExtW->outputWinId = 0;
348,180✔
851
  pExtW->lastWinId = -1;
348,180✔
852
  pExtW->outputWinNum = 0;
348,063✔
853
  taosArrayClear(pExtW->pWins);
348,063✔
854
  extWinRecycleBlkNode(pExtW, &pExtW->pLastBlkNode);
348,063✔
855

856
/*
857
  int32_t code = blockDataEnsureCapacity(pExtW->binfo.pRes, pOperator->resultInfo.capacity);
858
  if (code == 0) {
859
    code = resetAggSup(&pOperator->exprSupp, &pExtW->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
860
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
861
                       &pTaskInfo->storageAPI.functionStore);
862
  }
863
*/
864
  TAOS_CHECK_EXIT(resetExternalWindowExprSupp(pExtW, pTaskInfo, pPhynode));
348,180✔
865
  colDataDestroy(&pExtW->twAggSup.timeWindowData);
348,180✔
866
  TAOS_CHECK_EXIT(initExecTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pTaskInfo->window));
348,180✔
867

868
  pExtW->outWinIdx = 0;
347,828✔
869
  pExtW->lastSKey = INT64_MIN;
347,945✔
870
  pExtW->lastEKey = INT64_MIN;
348,180✔
871
  pExtW->isDynWindow = false;
347,945✔
872

873
  qDebug("%s ext window stat at reset, created:%" PRId64 ", destroyed:%" PRId64 ", recycled:%" PRId64 ", reused:%" PRId64 ", append:%" PRId64, 
347,593✔
874
      pTaskInfo->id.str, pExtW->stat.resBlockCreated, pExtW->stat.resBlockDestroyed, pExtW->stat.resBlockRecycled, 
875
      pExtW->stat.resBlockReused, pExtW->stat.resBlockAppend);
876

877
_exit:
5,405✔
878

879
  if (code) {
348,180✔
880
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
881
  }
882
  
883
  return code;
348,180✔
884
}
885

886
static EDealRes extWinHasCountLikeFunc(SNode* pNode, void* res) {
5,446,308✔
887
  if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
5,446,308✔
888
    SFunctionNode* pFunc = (SFunctionNode*)pNode;
1,961,060✔
889
    if (fmIsCountLikeFunc(pFunc->funcId) || (pFunc->hasOriginalFunc && fmIsCountLikeFunc(pFunc->originalFuncId))) {
1,961,060✔
890
      *(bool*)res = true;
613,529✔
891
      return DEAL_RES_END;
614,254✔
892
    }
893
  }
894
  return DEAL_RES_CONTINUE;
4,834,508✔
895
}
896

897

898
static int32_t extWinCreateEmptyInputBlock(SOperatorInfo* pOperator, SSDataBlock** ppBlock) {
614,254✔
899
  int32_t code = TSDB_CODE_SUCCESS;
614,254✔
900
  int32_t lino = 0;
614,254✔
901
  SSDataBlock* pBlock = NULL;
614,254✔
902
  if (!tsCountAlwaysReturnValue) {
614,254✔
903
    return TSDB_CODE_SUCCESS;
×
904
  }
905

906
  SExternalWindowOperator* pExtW = pOperator->info;
614,254✔
907

908
  if (!pExtW->hasCountFunc) {
614,129✔
909
    return TSDB_CODE_SUCCESS;
×
910
  }
911

912
  code = createDataBlock(&pBlock);
614,026✔
913
  if (code) {
614,254✔
914
    return code;
×
915
  }
916

917
  pBlock->info.rows = 1;
614,254✔
918
  pBlock->info.capacity = 0;
614,254✔
919

920
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
2,786,725✔
921
    SColumnInfoData colInfo = {0};
2,172,596✔
922
    colInfo.hasNull = true;
2,172,366✔
923
    colInfo.info.type = TSDB_DATA_TYPE_NULL;
2,172,366✔
924
    colInfo.info.bytes = 1;
2,172,366✔
925

926
    SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
2,172,366✔
927
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
4,503,967✔
928
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
2,331,621✔
929
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
2,331,621✔
930
        int32_t slotId = pFuncParam->pCol->slotId;
2,107,494✔
931
        int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
2,107,494✔
932
        if (slotId >= numOfCols) {
2,107,349✔
933
          code = taosArrayEnsureCap(pBlock->pDataBlock, slotId + 1);
1,487,395✔
934
          QUERY_CHECK_CODE(code, lino, _end);
1,487,290✔
935

936
          for (int32_t k = numOfCols; k < slotId + 1; ++k) {
3,473,305✔
937
            void* tmp = taosArrayPush(pBlock->pDataBlock, &colInfo);
1,985,660✔
938
            QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1,986,015✔
939
          }
940
        }
941
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
223,897✔
942
        // do nothing
943
      }
944
    }
945
  }
946

947
  code = blockDataEnsureCapacity(pBlock, pBlock->info.rows);
614,254✔
948
  QUERY_CHECK_CODE(code, lino, _end);
614,254✔
949

950
  for (int32_t i = 0; i < blockDataGetNumOfCols(pBlock); ++i) {
2,600,269✔
951
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
1,986,015✔
952
    QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
1,986,015✔
953
    colDataSetNULL(pColInfoData, 0);
954
  }
955
  *ppBlock = pBlock;
614,254✔
956

957
_end:
614,254✔
958
  if (code != TSDB_CODE_SUCCESS) {
614,254✔
959
    blockDataDestroy(pBlock);
×
960
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
961
  }
962
  return code;
614,254✔
963
}
964

965

966

967
static int extWinTsWinCompare(const void* pLeft, const void* pRight) {
17,463,166✔
968
  int64_t ts = *(int64_t*)pLeft;
17,463,166✔
969
  SExtWinTimeWindow* pWin = (SExtWinTimeWindow*)pRight;
17,463,624✔
970
  if (ts < pWin->tw.skey) {
17,463,624✔
971
    return -1;
9,930,636✔
972
  }
973
  if (ts >= pWin->tw.ekey) {
7,533,446✔
974
    return 1;
3,887,168✔
975
  }
976

977
  return 0;
3,646,278✔
978
}
979

980

981
static int32_t extWinGetMultiTbWinFromTs(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW, int64_t* tsCol, int64_t rowNum, int32_t* startPos) {
2,782,278✔
982
  int32_t idx = taosArraySearchIdx(pExtW->pWins, tsCol, extWinTsWinCompare, TD_EQ);
2,782,278✔
983
  if (idx >= 0) {
2,782,278✔
984
    *startPos = 0;
2,624,388✔
985
    return idx;
2,624,388✔
986
  }
987

988
  SExtWinTimeWindow* pWin = NULL;
157,890✔
989
  int32_t w = 0;
157,890✔
990
  for (int64_t i = 1; i < rowNum; ++i) {
60,096,846✔
991
    for (; w < pExtW->pWins->size; ++w) {
297,886,938✔
992
      pWin = TARRAY_GET_ELEM(pExtW->pWins, w);
297,886,938✔
993
      if (tsCol[i] < pWin->tw.skey) {
297,886,938✔
994
        break;
59,938,956✔
995
      }
996
      
997
      if (tsCol[i] < pWin->tw.ekey) {
237,947,982✔
998
        *startPos = i;
136,668✔
999
        return w;
136,668✔
1000
      }
1001
    }
1002
  }
1003

1004
  return -1;
21,222✔
1005
}
1006

1007
static int32_t extWinGetNoOvlpWin(SOperatorInfo* pOperator, int64_t* tsCol, int32_t* startPos, SDataBlockInfo* pInfo, SExtWinTimeWindow** ppWin, int32_t* winRows) {
2,147,483,647✔
1008
  SExternalWindowOperator* pExtW = pOperator->info;
2,147,483,647✔
1009
  if ((*startPos) >= pInfo->rows) {
2,147,483,647✔
1010
    qDebug("%s %s blk rowIdx %d reach the end, size: %d, skip block", 
1,357,465✔
1011
        GET_TASKID(pOperator->pTaskInfo), __func__, *startPos, (int32_t)pInfo->rows);
1012
    *ppWin = NULL;
1,357,465✔
1013
    return TSDB_CODE_SUCCESS;
1,357,465✔
1014
  }
1015
  
1016
  if (pExtW->blkWinIdx < 0) {
2,147,483,647✔
1017
    pExtW->blkWinIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
1,593,811✔
1018
  } else {
1019
    pExtW->blkWinIdx++;
2,147,483,647✔
1020
  }
1021

1022
  if (pExtW->blkWinIdx >= pExtW->pWins->size) {
2,147,483,647✔
1023
    qDebug("%s %s ext win blk idx %d reach the end, size: %d, skip block", 
149,048✔
1024
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (int32_t)pExtW->pWins->size);
1025
    *ppWin = NULL;
149,048✔
1026
    return TSDB_CODE_SUCCESS;
149,048✔
1027
  }
1028
  
1029
  SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
2,147,483,647✔
1030
  if (tsCol[pInfo->rows - 1] < pWin->tw.skey) {
2,147,483,647✔
1031
    qDebug("%s %s block end ts %" PRId64 " is small than curr win %d skey %" PRId64 ", skip block", 
86,544✔
1032
        GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[pInfo->rows - 1], pExtW->blkWinIdx, pWin->tw.skey);
1033
    *ppWin = NULL;
86,544✔
1034
    return TSDB_CODE_SUCCESS;
86,544✔
1035
  }
1036

1037
  int32_t r = *startPos;
2,147,483,647✔
1038

1039
  qDebug("%s %s start to get novlp win from winIdx %d rowIdx %d", GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, r);
2,147,483,647✔
1040

1041
  // TODO handle desc order
1042
  for (; pExtW->blkWinIdx < pExtW->pWins->size; ++pExtW->blkWinIdx) {
2,147,483,647✔
1043
    pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
2,147,483,647✔
1044
    for (; r < pInfo->rows; ++r) {
2,147,483,647✔
1045
      if (tsCol[r] < pWin->tw.skey) {
2,147,483,647✔
1046
        continue;
329,887,698✔
1047
      }
1048

1049
      if (tsCol[r] < pWin->tw.ekey) {
2,147,483,647✔
1050
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
1,997,025,608✔
1051
        *ppWin = pWin;
1,997,025,608✔
1052
        *startPos = r;
1,997,025,608✔
1053
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
1,997,025,608✔
1054

1055
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
1,997,025,608✔
1056
            GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, r, tsCol[r], tsCol[r + *winRows - 1]);
1057
        
1058
        return TSDB_CODE_SUCCESS;
1,997,025,608✔
1059
      }
1060

1061
      if (!pOperator->pTaskInfo->pStreamRuntimeInfo && tsCol[r] >= pWin->tw.ekey) {
1,899,776,354✔
1062
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
1,899,770,616✔
1063
        *ppWin = pWin;
1,899,770,616✔
1064
        *startPos = r;
1,899,770,616✔
1065
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
1,899,770,616✔
1066

1067
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk",
1,899,770,616✔
1068
               GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, r, tsCol[r], tsCol[r + *winRows - 1]);
1069

1070
        return TSDB_CODE_SUCCESS;
1,899,770,616✔
1071
      }
1072

1073
      break;
5,738✔
1074
    }
1075

1076
    if (r == pInfo->rows) {
6,492✔
1077
      break;
754✔
1078
    }
1079
  }
1080

1081
  qDebug("%s %s no more ext win in block, TR[%" PRId64 ", %" PRId64 "), skip it", 
754✔
1082
      GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1083

1084
  *ppWin = NULL;
754✔
1085
  return TSDB_CODE_SUCCESS;
754✔
1086
}
1087

1088
static int32_t extWinGetOvlpWin(SOperatorInfo* pOperator, int64_t* tsCol, int32_t* startPos, SDataBlockInfo* pInfo, SExtWinTimeWindow** ppWin, int32_t* winRows) {
228,734✔
1089
  SExternalWindowOperator* pExtW = pOperator->info;
228,734✔
1090
  if (pExtW->blkWinIdx < 0) {
228,734✔
1091
    pExtW->blkWinIdx = pExtW->blkWinStartIdx;
16,206✔
1092
  } else {
1093
    pExtW->blkWinIdx++;
212,528✔
1094
  }
1095

1096
  if (pExtW->blkWinIdx >= pExtW->pWins->size) {
228,734✔
1097
    qDebug("%s %s ext win blk idx %d reach the end, size: %d, skip block", 
15,452✔
1098
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (int32_t)pExtW->pWins->size);
1099
    *ppWin = NULL;
15,452✔
1100
    return TSDB_CODE_SUCCESS;
15,452✔
1101
  }
1102
  
1103
  SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
213,282✔
1104
  if (tsCol[pInfo->rows - 1] < pWin->tw.skey) {
213,282✔
1105
    qDebug("%s %s block end ts %" PRId64 " is small than curr win %d skey %" PRId64 ", skip block", 
×
1106
        GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[pInfo->rows - 1], pExtW->blkWinIdx, pWin->tw.skey);
1107
    *ppWin = NULL;
×
1108
    return TSDB_CODE_SUCCESS;
×
1109
  }
1110

1111
  int64_t r = 0;
213,282✔
1112

1113
  qDebug("%s %s start to get ovlp win from winIdx %d rowIdx %d", GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pExtW->blkRowStartIdx);
213,282✔
1114
  
1115
  // TODO handle desc order
1116
  for (; pExtW->blkWinIdx < pExtW->pWins->size; ++pExtW->blkWinIdx) {
214,036✔
1117
    pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
214,036✔
1118
    for (r = pExtW->blkRowStartIdx; r < pInfo->rows; ++r) {
437,340✔
1119
      if (tsCol[r] < pWin->tw.skey) {
436,586✔
1120
        pExtW->blkRowStartIdx = r + 1;
223,304✔
1121
        continue;
223,304✔
1122
      }
1123

1124
      if (tsCol[r] < pWin->tw.ekey) {
213,282✔
1125
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
212,528✔
1126
        *ppWin = pWin;
212,528✔
1127
        *startPos = r;
212,528✔
1128
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
212,528✔
1129

1130
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
212,528✔
1131
            GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, (int32_t)r, tsCol[r], tsCol[r + *winRows - 1]);
1132
        
1133
        if ((r + *winRows) < pInfo->rows) {
212,528✔
1134
          pExtW->blkWinStartIdx = pExtW->blkWinIdx + 1;
197,076✔
1135
          pExtW->blkWinStartSet = true;
197,076✔
1136
        }
1137
        
1138
        return TSDB_CODE_SUCCESS;
212,528✔
1139
      }
1140

1141
      break;
754✔
1142
    }
1143

1144
    if (r >= pInfo->rows) {
1,508✔
1145
      if (!pExtW->blkWinStartSet) {
754✔
1146
        pExtW->blkWinStartIdx = pExtW->blkWinIdx;
754✔
1147
      }
1148
      
1149
      break;
754✔
1150
    }
1151
  }
1152

1153
  qDebug("%s %s no more ext win in block, TR[%" PRId64 ", %" PRId64 "), skip it", 
754✔
1154
      GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1155

1156
  *ppWin = NULL;
754✔
1157
  return TSDB_CODE_SUCCESS;
754✔
1158
}
1159

1160

1161
static int32_t extWinGetMultiTbNoOvlpWin(SOperatorInfo* pOperator, int64_t* tsCol, int32_t* startPos, SDataBlockInfo* pInfo, SExtWinTimeWindow** ppWin, int32_t* winRows) {
1,392,533,179✔
1162
  SExternalWindowOperator* pExtW = pOperator->info;
1,392,533,179✔
1163
  if ((*startPos) >= pInfo->rows) {
1,392,535,998✔
1164
    qDebug("%s %s blk rowIdx %d reach the end, size: %d, skip block", 
2,555,828✔
1165
        GET_TASKID(pOperator->pTaskInfo), __func__, *startPos, (int32_t)pInfo->rows);
1166
    *ppWin = NULL;
2,555,828✔
1167
    return TSDB_CODE_SUCCESS;
2,556,063✔
1168
  }
1169
  
1170
  if (pExtW->blkWinIdx < 0) {
1,389,980,174✔
1171
    pExtW->blkWinIdx = extWinGetMultiTbWinFromTs(pOperator, pExtW, tsCol, pInfo->rows, startPos);
2,782,278✔
1172
    if (pExtW->blkWinIdx < 0) {
2,782,278✔
1173
      qDebug("%s %s blk TR[%" PRId64 ", %" PRId64 ") not in any win, skip block", 
21,222✔
1174
          GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1175
      *ppWin = NULL;
21,222✔
1176
      return TSDB_CODE_SUCCESS;
21,222✔
1177
    }
1178

1179
    extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
2,759,411✔
1180
    *ppWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
2,761,056✔
1181
    *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, *startPos, (*ppWin)->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
2,761,056✔
1182

1183
    qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
2,761,056✔
1184
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (*ppWin)->tw.skey, (*ppWin)->tw.ekey, *winRows, *startPos, tsCol[*startPos], tsCol[*startPos + *winRows - 1]);
1185
    
1186
    return TSDB_CODE_SUCCESS;
2,760,821✔
1187
  } else {
1188
    pExtW->blkWinIdx++;
1,387,197,895✔
1189
  }
1190

1191
  if (pExtW->blkWinIdx >= pExtW->pWins->size) {
1,387,199,067✔
1192
    qDebug("%s %s ext win blk idx %d reach the end, size: %d, skip block", 
82,530✔
1193
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (int32_t)pExtW->pWins->size);
1194
    *ppWin = NULL;
82,530✔
1195
    return TSDB_CODE_SUCCESS;
82,530✔
1196
  }
1197
  
1198
  SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
1,387,115,366✔
1199
  if (tsCol[pInfo->rows - 1] < pWin->tw.skey) {
1,387,116,303✔
1200
    qDebug("%s %s block end ts %" PRId64 " is small than curr win %d skey %" PRId64 ", skip block", 
122,463✔
1201
        GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[pInfo->rows - 1], pExtW->blkWinIdx, pWin->tw.skey);
1202
    *ppWin = NULL;
122,463✔
1203
    return TSDB_CODE_SUCCESS;
122,463✔
1204
  }
1205

1206
  int32_t r = *startPos;
1,386,991,500✔
1207

1208
  qDebug("%s %s start to get mnovlp win from winIdx %d rowIdx %d", GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, r);
1,386,990,798✔
1209

1210
  // TODO handle desc order
1211
  for (; pExtW->blkWinIdx < pExtW->pWins->size; ++pExtW->blkWinIdx) {
1,387,002,967✔
1212
    pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
1,387,003,671✔
1213
    for (; r < pInfo->rows; ++r) {
1,836,451,624✔
1214
      if (tsCol[r] < pWin->tw.skey) {
1,836,451,624✔
1215
        continue;
449,447,953✔
1216
      }
1217

1218
      if (tsCol[r] < pWin->tw.ekey) {
1,387,003,202✔
1219
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
1,274,939,315✔
1220
        *ppWin = pWin;
1,274,937,909✔
1221
        *startPos = r;
1,274,937,204✔
1222
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
1,274,937,909✔
1223

1224
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
1,274,939,316✔
1225
            GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, r, tsCol[r], tsCol[r + *winRows - 1]);
1226
        
1227
        return TSDB_CODE_SUCCESS;
1,274,939,315✔
1228
      }
1229

1230
      if (!pOperator->pTaskInfo->pStreamRuntimeInfo && tsCol[r] >= pWin->tw.ekey) {
112,064,121✔
1231
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
112,055,697✔
1232
        *ppWin = pWin;
112,055,697✔
1233
        *startPos = r;
112,055,697✔
1234
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
112,055,697✔
1235

1236
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk",
112,055,697✔
1237
               GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, r, tsCol[r], tsCol[r + *winRows - 1]);
1238

1239
        return TSDB_CODE_SUCCESS;
112,055,697✔
1240
      }
1241

1242
      break;
8,424✔
1243
    }
1244

1245
    if (r == pInfo->rows) {
8,190✔
1246
      break;
×
1247
    }
1248
  }
1249

UNCOV
1250
  qDebug("%s %s no more ext win in block, TR[%" PRId64 ", %" PRId64 "), skip it", 
×
1251
      GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1252

UNCOV
1253
  *ppWin = NULL;
×
1254
  return TSDB_CODE_SUCCESS;
×
1255
}
1256

1257
static int32_t extWinGetFirstWinFromTs(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW, int64_t* tsCol,
1,091,439✔
1258
                                       int64_t rowNum, int32_t* startPos) {
1259
  SExtWinTimeWindow* pWin = NULL;
1,091,439✔
1260
  int32_t            idx = taosArraySearchIdx(pExtW->pWins, tsCol, extWinTsWinCompare, TD_EQ);
1,091,439✔
1261
  if (idx >= 0) {
1,091,439✔
1262
    for (int i = idx - 1; i >= 0; --i) {
1,021,890✔
1263
      pWin = TARRAY_GET_ELEM(pExtW->pWins, i);
15,702✔
1264
      if (extWinTsWinCompare(tsCol, pWin) == 0) {
15,702✔
1265
        idx = i;
251✔
1266
      } else {
1267
        break;
15,451✔
1268
      }
1269
    }
1270
    *startPos = 0;
1,021,639✔
1271
    return idx;
1,021,639✔
1272
  }
1273

1274
  pWin = NULL;
69,800✔
1275
  int32_t w = 0;
69,800✔
1276
  for (int64_t i = 1; i < rowNum; ++i) {
140,068✔
1277
    for (; w < pExtW->pWins->size; ++w) {
163,468✔
1278
      pWin = TARRAY_GET_ELEM(pExtW->pWins, w);
163,468✔
1279
      if (tsCol[i] < pWin->tw.skey) {
163,468✔
1280
        break;
70,268✔
1281
      }
1282

1283
      if (tsCol[i] < pWin->tw.ekey) {
93,200✔
1284
        *startPos = i;
23,400✔
1285
        return w;
23,400✔
1286
      }
1287
    }
1288
  }
1289

1290
  return -1;
46,400✔
1291
}
1292

1293
static int32_t extWinGetMultiTbOvlpWin(SOperatorInfo* pOperator, int64_t* tsCol, int32_t* startPos, SDataBlockInfo* pInfo, SExtWinTimeWindow** ppWin, int32_t* winRows) {
82,235,099✔
1294
  SExternalWindowOperator* pExtW = pOperator->info;
82,235,099✔
1295
  if (pExtW->blkWinIdx < 0) {
82,275,165✔
1296
    pExtW->blkWinIdx = extWinGetFirstWinFromTs(pOperator, pExtW, tsCol, pInfo->rows, startPos);
1,091,439✔
1297
    if (pExtW->blkWinIdx < 0) {
1,091,439✔
1298
      qDebug("%s %s blk TR[%" PRId64 ", %" PRId64 ") not in any win, skip block", 
46,400✔
1299
          GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1300
      *ppWin = NULL;
46,400✔
1301
      return TSDB_CODE_SUCCESS;
46,400✔
1302
    }
1303

1304
    extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
1,045,039✔
1305
    *ppWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
1,045,039✔
1306
    *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, *startPos, (*ppWin)->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
1,045,039✔
1307
    
1308
    qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
1,045,039✔
1309
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (*ppWin)->tw.skey, (*ppWin)->tw.ekey, *winRows, *startPos, tsCol[*startPos], tsCol[*startPos + *winRows - 1]);
1310
    
1311
    return TSDB_CODE_SUCCESS;
1,045,039✔
1312
  } else {
1313
    pExtW->blkWinIdx++;
81,160,524✔
1314
  }
1315

1316
  if (pExtW->blkWinIdx >= pExtW->pWins->size) {
81,179,980✔
1317
    qDebug("%s %s ext win blk idx %d reach the end, size: %d, skip block", 
1,029,839✔
1318
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (int32_t)pExtW->pWins->size);
1319
    *ppWin = NULL;
1,029,839✔
1320
    return TSDB_CODE_SUCCESS;
1,029,839✔
1321
  }
1322
  
1323
  SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
80,166,905✔
1324
  if (tsCol[pInfo->rows - 1] < pWin->tw.skey) {
80,167,057✔
1325
    qDebug("%s %s block end ts %" PRId64 " is small than curr win %d skey %" PRId64 ", skip block", 
15,200✔
1326
        GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[pInfo->rows - 1], pExtW->blkWinIdx, pWin->tw.skey);
1327
    *ppWin = NULL;
15,200✔
1328
    return TSDB_CODE_SUCCESS;
15,200✔
1329
  }
1330

1331
  int64_t r = 0;
80,124,263✔
1332

1333
  qDebug("%s %s start to get movlp win from winIdx %d rowIdx %d", GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pExtW->blkRowStartIdx);
80,124,263✔
1334

1335
  // TODO handle desc order
1336
  for (; pExtW->blkWinIdx < pExtW->pWins->size; ++pExtW->blkWinIdx) {
80,201,319✔
1337
    pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
80,239,705✔
1338
    for (r = pExtW->blkRowStartIdx; r < pInfo->rows; ++r) {
166,294,561✔
1339
      if (tsCol[r] < pWin->tw.skey) {
166,299,729✔
1340
        pExtW->blkRowStartIdx = r + 1;
86,066,408✔
1341
        continue;
86,056,832✔
1342
      }
1343

1344
      if (tsCol[r] < pWin->tw.ekey) {
80,239,401✔
1345
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
80,170,553✔
1346
        *ppWin = pWin;
80,169,711✔
1347
        *startPos = r;
80,169,711✔
1348
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
80,169,641✔
1349

1350
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
80,167,045✔
1351
            GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, (int32_t)r, tsCol[r], tsCol[r + *winRows - 1]);
1352
        
1353
        return TSDB_CODE_SUCCESS;
80,164,625✔
1354
      }
1355

1356
      break;
69,000✔
1357
    }
1358

1359
    if (r >= pInfo->rows) {
64,896✔
1360
      break;
×
1361
    }
1362
  }
1363

1364
  qDebug("%s %s no more ext win in block, TR[%" PRId64 ", %" PRId64 "), skip it", 
456✔
1365
      GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1366

1367
  *ppWin = NULL;
456✔
1368
  return TSDB_CODE_SUCCESS;
×
1369
}
1370

1371

1372
static int32_t extWinGetWinStartPos(STimeWindow win, const SDataBlockInfo* pBlockInfo, int32_t lastEndPos, int32_t order, int32_t* nextPos, int64_t* tsCol) {
×
1373
  bool ascQuery = order == TSDB_ORDER_ASC;
×
1374

1375
  if (win.ekey <= pBlockInfo->window.skey && ascQuery) {
×
1376
    return -2;
×
1377
  }
1378
//if (win.skey > pBlockInfo->window.ekey && !ascQuery) return -2;
1379

1380
  if (win.skey > pBlockInfo->window.ekey && ascQuery) return -1;
×
1381
//if (win.ekey < pBlockInfo->window.skey && !ascQuery) return -1;
1382

1383
  while (true) {
1384
    if (win.ekey <= tsCol[lastEndPos + 1] && ascQuery) return -2;
×
1385
    if (win.skey <= tsCol[lastEndPos + 1] && ascQuery) break;
×
1386
    lastEndPos++;
×
1387
  }
1388

1389
  *nextPos = lastEndPos + 1;
×
1390
  return 0;
×
1391
}
1392

1393
static int32_t extWinAggSetWinOutputBuf(SOperatorInfo* pOperator, SExtWinTimeWindow* win, SExprSupp* pSupp, 
2,147,483,647✔
1394
                                     SAggSupporter* pAggSup, SExecTaskInfo* pTaskInfo) {
1395
  int32_t code = 0, lino = 0;
2,147,483,647✔
1396
  SResultRow* pResultRow = NULL;
2,147,483,647✔
1397
  SExternalWindowOperator* pExtW = (SExternalWindowOperator*)pOperator->info;
2,147,483,647✔
1398
  
1399
#if 0
1400
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
1401
                                                  true, tableGroupId, pTaskInfo, true, pAggSup, true);
1402
  if (pResultRow == NULL) {
1403
    qError("failed to set result output buffer, error:%s", tstrerror(pTaskInfo->code));
1404
    return pTaskInfo->code;
1405
  }
1406

1407
  qDebug("current result rows num:%d", tSimpleHashGetSize(pAggSup->pResultRowHashTable));
1408

1409
#else
1410
  if (win->winOutIdx >= 0) {
2,147,483,647✔
1411
    pResultRow = (SResultRow*)((char*)pExtW->pResultRow + win->winOutIdx * pAggSup->resultRowSize);
2,147,483,647✔
1412
  } else {
1413
    win->winOutIdx = pExtW->outWinIdx++;
2,147,483,647✔
1414
    
1415
    qDebug("set window [%" PRId64 ", %" PRId64 "] outIdx:%d", win->tw.skey, win->tw.ekey, win->winOutIdx);
2,147,483,647✔
1416

1417
    pResultRow = (SResultRow*)((char*)pExtW->pResultRow + win->winOutIdx * pAggSup->resultRowSize);
2,147,483,647✔
1418
    
1419
    memset(pResultRow, 0, pAggSup->resultRowSize);
2,147,483,647✔
1420

1421
    pResultRow->winIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
2,147,483,647✔
1422
    TAOS_SET_POBJ_ALIGNED(&pResultRow->win, &win->tw);
2,147,483,647✔
1423
  }
1424
#endif
1425

1426
  // set time window for current result
1427
  TAOS_CHECK_EXIT(setResultRowInitCtx(pResultRow, pSupp->pCtx, pSupp->numOfExprs, pSupp->rowEntryInfoOffset));
2,147,483,647✔
1428

1429
_exit:
2,147,483,647✔
1430
  
1431
  if (code) {
2,147,483,647✔
1432
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1433
  }
1434

1435
  return code;
2,147,483,647✔
1436
}
1437

1438
static int32_t extWinAggDo(SOperatorInfo* pOperator, int32_t startPos, int32_t forwardRows,
2,147,483,647✔
1439
                                  SSDataBlock* pInputBlock) {
1440
  if (pOperator->pTaskInfo->pStreamRuntimeInfo && forwardRows == 0) {
2,147,483,647✔
1441
    return TSDB_CODE_SUCCESS;
×
1442
  }
1443

1444
  SExprSupp*               pSup = &pOperator->exprSupp;
2,147,483,647✔
1445
  SExternalWindowOperator* pExtW = pOperator->info;
2,147,483,647✔
1446
  return applyAggFunctionOnPartialTuples(pOperator->pTaskInfo, pSup->pCtx, &pExtW->twAggSup.timeWindowData, startPos,
2,147,483,647✔
1447
                                         forwardRows, pInputBlock->info.rows, pSup->numOfExprs);
2,147,483,647✔
1448

1449
}
1450

1451
static bool extWinLastWinClosed(SExternalWindowOperator* pExtW) {
1,238✔
1452
  if (pExtW->outWinIdx <= 0 || (pExtW->multiTableMode && !pExtW->inputHasOrder)) {
1,238✔
1453
    return false;
1,238✔
1454
  }
1455

1456
  if (NULL == pExtW->timeRangeExpr || !pExtW->timeRangeExpr->needCalc) {
×
1457
    return true;
×
1458
  }
1459

1460
  SList* pList = taosArrayGetP(pExtW->pOutputBlocks, pExtW->outWinIdx - 1);
×
1461
  if (0 == listNEles(pList)) {
×
1462
    return true;
×
1463
  }
1464

1465
  SListNode* pNode = listTail(pList);
×
1466
  SArray* pBlkWinIdx = *((SArray**)pNode->data + 1);
×
1467
  int64_t* pIdx = taosArrayGetLast(pBlkWinIdx);
×
1468
  if (pIdx && *(int32_t*)pIdx < pExtW->blkWinStartIdx) {
×
1469
    return true;
×
1470
  }
1471

1472
  return false;
×
1473
}
1474

1475
static int32_t extWinGetWinResBlock(SOperatorInfo* pOperator, int32_t rows, SExtWinTimeWindow* pWin, SSDataBlock** ppRes, SArray** ppIdx) {
2,651✔
1476
  SExternalWindowOperator* pExtW = pOperator->info;
2,651✔
1477
  SList*                   pList = NULL;
2,651✔
1478
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
2,651✔
1479
  
1480
  if (pWin->winOutIdx >= 0) {
2,651✔
1481
    pList = taosArrayGetP(pExtW->pOutputBlocks, pWin->winOutIdx);
1,413✔
1482
  } else {
1483
    if (extWinLastWinClosed(pExtW)) {
1,238✔
1484
      pWin->winOutIdx = pExtW->outWinIdx - 1;
×
1485
      pList = taosArrayGetP(pExtW->pOutputBlocks, pWin->winOutIdx);
×
1486
    } else {
1487
      pWin->winOutIdx = pExtW->outWinIdx++;
1,238✔
1488
      pList = tdListNew(POINTER_BYTES * 2);
1,238✔
1489
      TSDB_CHECK_NULL(pList, code, lino, _exit, terrno);
1,238✔
1490
      SList** ppList = taosArrayGet(pExtW->pOutputBlocks, pWin->winOutIdx);
1,238✔
1491
      extWinRecycleBlockList(pExtW, ppList);
1,238✔
1492
      *ppList = pList;
1,238✔
1493
    }
1494
  }
1495
  
1496
  TAOS_CHECK_EXIT(extWinGetLastBlockFromList(pExtW, pList, rows, ppRes, ppIdx));
2,651✔
1497

1498
_exit:
2,651✔
1499

1500
  if (code) {
2,651✔
1501
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1502
  }
1503

1504
  return code;
2,651✔
1505
}
1506

1507
static int32_t extWinProjectDo(SOperatorInfo* pOperator, SSDataBlock* pInputBlock, int32_t startPos, int32_t rows, SExtWinTimeWindow* pWin) {
2,651✔
1508
  SExternalWindowOperator* pExtW = pOperator->info;
2,651✔
1509
  SExprSupp*               pExprSup = &pExtW->scalarSupp;
2,651✔
1510
  SSDataBlock*             pResBlock = NULL;
2,651✔
1511
  SArray*                  pIdx = NULL;
2,651✔
1512
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
2,651✔
1513
  
1514
  TAOS_CHECK_EXIT(extWinGetWinResBlock(pOperator, rows, pWin, &pResBlock, &pIdx));
2,651✔
1515

1516
  qDebug("%s %s win[%" PRId64 ", %" PRId64 "] got res block %p winRowIdx %p, winOutIdx:%d, capacity:%d", 
2,651✔
1517
      pOperator->pTaskInfo->id.str, __func__, pWin->tw.skey, pWin->tw.ekey, pResBlock, pIdx, pWin->winOutIdx, pResBlock->info.capacity);
1518
  
1519
  if (!pExtW->pTmpBlock) {
2,651✔
1520
    TAOS_CHECK_EXIT(createOneDataBlock(pInputBlock, false, &pExtW->pTmpBlock));
997✔
1521
  } else {
1522
    blockDataCleanup(pExtW->pTmpBlock);
1,654✔
1523
  }
1524
  
1525
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pExtW->pTmpBlock, TMAX(1, rows)));
2,651✔
1526

1527
  qDebug("%s %s start to copy %d rows to tmp blk", pOperator->pTaskInfo->id.str, __func__, rows);
2,651✔
1528
  TAOS_CHECK_EXIT(blockDataMergeNRows(pExtW->pTmpBlock, pInputBlock, startPos, rows));
2,651✔
1529

1530
  qDebug("%s %s start to apply project to tmp blk", pOperator->pTaskInfo->id.str, __func__);
2,651✔
1531
  TAOS_CHECK_EXIT(projectApplyFunctionsWithSelect(pExprSup->pExprInfo, pResBlock, pExtW->pTmpBlock, pExprSup->pCtx, pExprSup->numOfExprs,
2,651✔
1532
        NULL, GET_STM_RTINFO(pOperator->pTaskInfo), true, pExprSup->hasIndefRowsFunc));
1533

1534
  TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pIdx, pResBlock, extWinGetCurWinIdx(pOperator->pTaskInfo), rows));
2,651✔
1535

1536
_exit:
2,651✔
1537

1538
  if (code) {
2,651✔
1539
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1540
  } else {
1541
    qDebug("%s %s project succeed", pOperator->pTaskInfo->id.str, __func__);
2,651✔
1542
  }
1543
  
1544
  return code;
2,651✔
1545
}
1546

1547
static int32_t extWinProjectOpen(SOperatorInfo* pOperator, SSDataBlock* pInputBlock) {
2,169✔
1548
  SExternalWindowOperator* pExtW = pOperator->info;
2,169✔
1549
  int64_t*                 tsCol = extWinExtractTsCol(pInputBlock, pExtW->primaryTsIndex, pOperator->pTaskInfo);
2,169✔
1550
  SExtWinTimeWindow*       pWin = NULL;
2,169✔
1551
  bool                     ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
2,169✔
1552
  int32_t                  startPos = 0, winRows = 0;
2,169✔
1553
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
2,169✔
1554
  
1555
  while (true) {
1556
    TAOS_CHECK_EXIT((*pExtW->getWinFp)(pOperator, tsCol, &startPos, &pInputBlock->info, &pWin, &winRows));
4,820✔
1557
    if (pWin == NULL) {
4,820✔
1558
      break;
2,169✔
1559
    }
1560

1561
    qDebug("%s ext window [%" PRId64 ", %" PRId64 ") project start, ascScan:%d, startPos:%d, winRows:%d",
2,651✔
1562
           GET_TASKID(pOperator->pTaskInfo), pWin->tw.skey, pWin->tw.ekey, ascScan, startPos, winRows);        
1563
    
1564
    TAOS_CHECK_EXIT(extWinProjectDo(pOperator, pInputBlock, startPos, winRows, pWin));
2,651✔
1565
    
1566
    startPos += winRows;
2,651✔
1567
  }
1568
  
1569
_exit:
2,169✔
1570

1571
  if (code) {
2,169✔
1572
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
1573
  }
1574

1575
  return code;
2,169✔
1576
}
1577

1578
static int32_t extWinIndefRowsDoImpl(SOperatorInfo* pOperator, SSDataBlock* pRes, SSDataBlock* pBlock) {
×
1579
  SExternalWindowOperator* pExtW = pOperator->info;
×
1580
  SOptrBasicInfo*     pInfo = &pExtW->binfo;
×
1581
  SExprSupp*          pSup = &pOperator->exprSupp;
×
1582
  SExecTaskInfo*      pTaskInfo = pOperator->pTaskInfo;
×
1583
  int32_t order = pInfo->inputTsOrder;
×
1584
  int32_t scanFlag = pBlock->info.scanFlag;
×
1585
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
×
1586

1587
  SExprSupp* pScalarSup = &pExtW->scalarSupp;
×
1588
  if (pScalarSup->pExprInfo != NULL) {
×
1589
    TAOS_CHECK_EXIT(projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
×
1590
                                 pExtW->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo)));
1591
  }
1592

1593
  TAOS_CHECK_EXIT(setInputDataBlock(pSup, pBlock, order, scanFlag, false));
×
1594

1595
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pRes, pRes->info.rows + pBlock->info.rows));
×
1596

1597
  TAOS_CHECK_EXIT(projectApplyFunctions(pSup->pExprInfo, pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
×
1598
                               pExtW->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo)));
1599

1600
_exit:
×
1601

1602
  if (code) {
×
1603
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __FUNCTION__, lino, tstrerror(code));
×
1604
  }
1605

1606
  return code;
×
1607
}
1608

1609
static int32_t extWinIndefRowsSetWinOutputBuf(SExternalWindowOperator* pExtW, SExtWinTimeWindow* win, SExprSupp* pSupp, 
×
1610
                                     SAggSupporter* pAggSup, SExecTaskInfo* pTaskInfo, bool reset) {
1611
  int32_t code = 0, lino = 0;
×
1612
  SResultRow* pResultRow = NULL;
×
1613

1614
  pResultRow = (SResultRow*)((char*)pExtW->pResultRow + win->winOutIdx * pAggSup->resultRowSize);
×
1615
  
1616
  qDebug("set window [%" PRId64 ", %" PRId64 "] outIdx:%d", win->tw.skey, win->tw.ekey, win->winOutIdx);
×
1617

1618
  if (reset) {
×
1619
    memset(pResultRow, 0, pAggSup->resultRowSize);
×
1620
    for (int32_t k = 0; k < pSupp->numOfExprs; ++k) {
×
1621
      SqlFunctionCtx* pCtx = &pSupp->pCtx[k];
×
1622
      pCtx->pOutput = NULL;
×
1623
    }
1624
  }
1625

1626
  TAOS_SET_POBJ_ALIGNED(&pResultRow->win, &win->tw);
×
1627

1628
  // set time window for current result
1629
  TAOS_CHECK_EXIT(setResultRowInitCtx(pResultRow, pSupp->pCtx, pSupp->numOfExprs, pSupp->rowEntryInfoOffset));
×
1630

1631
_exit:
×
1632
  
1633
  if (code) {
×
1634
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1635
  }
1636

1637
  return code;
×
1638
}
1639

1640
static int32_t extWinGetSetWinResBlockBuf(SOperatorInfo* pOperator, int32_t rows, SExtWinTimeWindow* pWin, SSDataBlock** ppRes, SArray** ppIdx) {
×
1641
  SExternalWindowOperator* pExtW = pOperator->info;
×
1642
  SList*                   pList = NULL;
×
1643
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
×
1644
  
1645
  if (pWin->winOutIdx >= 0) {
×
1646
    pList = taosArrayGetP(pExtW->pOutputBlocks, pWin->winOutIdx);
×
1647
    TAOS_CHECK_EXIT(extWinIndefRowsSetWinOutputBuf(pExtW, pWin, &pOperator->exprSupp, &pExtW->aggSup, pOperator->pTaskInfo, false));
×
1648
  } else {
1649
    if (extWinLastWinClosed(pExtW)) {
×
1650
      pWin->winOutIdx = pExtW->outWinIdx - 1;
×
1651
      pList = taosArrayGetP(pExtW->pOutputBlocks, pWin->winOutIdx);
×
1652
    } else {
1653
      pWin->winOutIdx = pExtW->outWinIdx++;
×
1654
      pList = tdListNew(POINTER_BYTES * 2);
×
1655
      TSDB_CHECK_NULL(pList, code, lino, _exit, terrno);
×
1656
      SList** ppList = taosArrayGet(pExtW->pOutputBlocks, pWin->winOutIdx);
×
1657
      extWinRecycleBlockList(pExtW, ppList);
×
1658
      *ppList = pList;
×
1659
    }
1660
    TAOS_CHECK_EXIT(extWinIndefRowsSetWinOutputBuf(pExtW, pWin, &pOperator->exprSupp, &pExtW->aggSup, pOperator->pTaskInfo, true));
×
1661
  }
1662
  
1663
  TAOS_CHECK_EXIT(extWinGetLastBlockFromList(pExtW, pList, rows, ppRes, ppIdx));
×
1664

1665
_exit:
×
1666

1667
  if (code) {
×
1668
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1669
  }
1670

1671
  return code;
×
1672
}
1673

1674

1675
static int32_t extWinIndefRowsDo(SOperatorInfo* pOperator, SSDataBlock* pInputBlock, int32_t startPos, int32_t rows, SExtWinTimeWindow* pWin) {
×
1676
  SExternalWindowOperator* pExtW = pOperator->info;
×
1677
  SSDataBlock*             pResBlock = NULL;
×
1678
  SArray*                  pIdx = NULL;
×
1679
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
×
1680
  
1681
  TAOS_CHECK_EXIT(extWinGetSetWinResBlockBuf(pOperator, rows, pWin, &pResBlock, &pIdx));
×
1682
  
1683
  if (!pExtW->pTmpBlock) {
×
1684
    TAOS_CHECK_EXIT(createOneDataBlock(pInputBlock, false, &pExtW->pTmpBlock));
×
1685
  } else {
1686
    blockDataCleanup(pExtW->pTmpBlock);
×
1687
  }
1688
  
1689
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pExtW->pTmpBlock, TMAX(1, rows)));
×
1690

1691
  TAOS_CHECK_EXIT(blockDataMergeNRows(pExtW->pTmpBlock, pInputBlock, startPos, rows));
×
1692
  TAOS_CHECK_EXIT(extWinIndefRowsDoImpl(pOperator, pResBlock, pExtW->pTmpBlock));
×
1693

1694
  TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pIdx, pResBlock, extWinGetCurWinIdx(pOperator->pTaskInfo), rows));
×
1695

1696
_exit:
×
1697

1698
  if (code) {
×
1699
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1700
  }
1701
  
1702
  return code;
×
1703
}
1704

1705

1706
static int32_t extWinIndefRowsOpen(SOperatorInfo* pOperator, SSDataBlock* pInputBlock) {
×
1707
  SExternalWindowOperator* pExtW = pOperator->info;
×
1708
  int64_t*                 tsCol = extWinExtractTsCol(pInputBlock, pExtW->primaryTsIndex, pOperator->pTaskInfo);
×
1709
  SExtWinTimeWindow*       pWin = NULL;
×
1710
  bool                     ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
×
1711
  int32_t                  startPos = 0, winRows = 0;
×
1712
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
×
1713
  
1714
  while (true) {
1715
    TAOS_CHECK_EXIT((*pExtW->getWinFp)(pOperator, tsCol, &startPos, &pInputBlock->info, &pWin, &winRows));
×
1716
    if (pWin == NULL) {
×
1717
      break;
×
1718
    }
1719

1720
    qDebug("%s ext window [%" PRId64 ", %" PRId64 ") indefRows start, ascScan:%d, startPos:%d, winRows:%d",
×
1721
           GET_TASKID(pOperator->pTaskInfo), pWin->tw.skey, pWin->tw.ekey, ascScan, startPos, winRows);        
1722
    
1723
    TAOS_CHECK_EXIT(extWinIndefRowsDo(pOperator, pInputBlock, startPos, winRows, pWin));
×
1724
    
1725
    startPos += winRows;
×
1726
  }
1727
  
1728
_exit:
×
1729

1730
  if (code) {
×
1731
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
1732
  }
1733

1734
  return code;
×
1735
}
1736

1737
static int32_t extWinNonAggOutputRes(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,238✔
1738
  SExternalWindowOperator* pExtW = pOperator->info;
1,238✔
1739
  int32_t                  numOfWin = pExtW->outWinIdx;
1,238✔
1740
  int32_t                  code = TSDB_CODE_SUCCESS;
1,238✔
1741
  int32_t                  lino = 0;
1,238✔
1742
  SSDataBlock*             pRes = NULL;
1,238✔
1743

1744
  for (; pExtW->outputWinId < numOfWin; pExtW->outputWinId++, extWinIncCurWinOutIdx(pOperator->pTaskInfo->pStreamRuntimeInfo)) {
1,238✔
1745
    SList* pList = taosArrayGetP(pExtW->pOutputBlocks, pExtW->outputWinId);
1,238✔
1746
    if (listNEles(pList) <= 0) {
1,238✔
1747
      continue;
×
1748
    }
1749

1750
    SListNode* pNode = tdListPopHead(pList);
1,238✔
1751
    pRes = *(SSDataBlock**)pNode->data;
1,238✔
1752
    pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamBlkWinIdx = *(SArray**)((SArray**)pNode->data + 1);
1,238✔
1753
    pExtW->pLastBlkNode = pNode;
1,238✔
1754

1755
    if (listNEles(pList) <= 0) {
1,238✔
1756
      pExtW->outputWinId++;
1,238✔
1757
      extWinIncCurWinOutIdx(pOperator->pTaskInfo->pStreamRuntimeInfo);
1,238✔
1758
    }
1759

1760
    break;
1,238✔
1761
  }
1762

1763
  if (pRes) {
1,238✔
1764
    qDebug("%s result generated, rows:%" PRId64 , GET_TASKID(pOperator->pTaskInfo), pRes->info.rows);
1,238✔
1765
    pRes->info.version = pOperator->pTaskInfo->version;
1,238✔
1766
    pRes->info.dataLoad = 1;
1,238✔
1767
  } else {
1768
    pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamBlkWinIdx = NULL;
×
1769
    qDebug("%s ext window done", GET_TASKID(pOperator->pTaskInfo));
×
1770
  }
1771

1772
  *ppRes = (pRes && pRes->info.rows > 0) ? pRes : NULL;
1,238✔
1773

1774
_exit:
1,238✔
1775

1776
  if (code != TSDB_CODE_SUCCESS) {
1,238✔
1777
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1778
  }
1779

1780
  return code;
1,238✔
1781
}
1782

1783
static int32_t extWinAggHandleEmptyWins(SOperatorInfo* pOperator, SSDataBlock* pBlock, bool allRemains, SExtWinTimeWindow* pWin) {
2,147,483,647✔
1784
  int32_t code = 0, lino = 0;
2,147,483,647✔
1785
  SExternalWindowOperator* pExtW = (SExternalWindowOperator*)pOperator->info;
2,147,483,647✔
1786
  SExprSupp* pSup = &pOperator->exprSupp;
2,147,483,647✔
1787
  int32_t currIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
2,147,483,647✔
1788

1789
  if (NULL == pExtW->pEmptyInputBlock ||
2,147,483,647✔
1790
      (pWin && pWin->tw.skey == pExtW->lastSKey && pWin->tw.ekey == pExtW->lastEKey)) {
2,147,483,647✔
1791
    goto _exit;
2,147,483,647✔
1792
  }
1793

1794
  bool ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
2,147,483,647✔
1795
  int32_t endIdx = allRemains ? (pExtW->pWins->size - 1) : (currIdx - 1);
2,147,483,647✔
1796
  SResultRowInfo* pResultRowInfo = &pExtW->binfo.resultRowInfo;
2,147,483,647✔
1797
  SSDataBlock* pInput = pExtW->pEmptyInputBlock;
2,147,483,647✔
1798

1799
  if ((pExtW->lastWinId + 1) <= endIdx) {
2,147,483,647✔
1800
    TAOS_CHECK_EXIT(setInputDataBlock(pSup, pExtW->pEmptyInputBlock, pExtW->binfo.inputTsOrder, MAIN_SCAN, true));
726,985✔
1801
  }
1802
  
1803
  for (int32_t i = pExtW->lastWinId + 1; i <= endIdx; ++i) {
2,147,483,647✔
1804
    SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, i);
1,572,850,733✔
1805

1806
    extWinSetCurWinIdx(pOperator, i);
1,572,850,733✔
1807
    qDebug("%s %dth ext empty window start:%" PRId64 ", end:%" PRId64 ", ascScan:%d",
1,572,850,733✔
1808
           GET_TASKID(pOperator->pTaskInfo), i, pWin->tw.skey, pWin->tw.ekey, ascScan);
1809

1810
    TAOS_CHECK_EXIT(extWinAggSetWinOutputBuf(pOperator, pWin, pSup, &pExtW->aggSup, pOperator->pTaskInfo));
1,572,850,733✔
1811

1812
    updateTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pWin->tw, 1);
1,572,850,733✔
1813
    code = extWinAggDo(pOperator, 0, 1, pInput);
1,572,850,498✔
1814
    pExtW->lastWinId = i;  
1,572,850,733✔
1815
    TAOS_CHECK_EXIT(code);
1,572,850,733✔
1816
  }
1817

1818
  
1819
_exit:
2,147,483,647✔
1820

1821
  if (code) {
2,147,483,647✔
1822
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __FUNCTION__, lino, tstrerror(code));
×
1823
  } else {
1824
    if (pBlock) {
2,147,483,647✔
1825
      TAOS_CHECK_EXIT(setInputDataBlock(pSup, pBlock, pExtW->binfo.inputTsOrder, pBlock->info.scanFlag, true));
2,147,483,647✔
1826
    }
1827

1828
    if (!allRemains) {
2,147,483,647✔
1829
      extWinSetCurWinIdx(pOperator, currIdx);  
2,147,483,647✔
1830
    }
1831
  }
1832

1833
  return code;
2,147,483,647✔
1834
}
1835

1836
static int32_t extWinAggOpen(SOperatorInfo* pOperator, SSDataBlock* pInputBlock) {
5,481,565✔
1837
  SExternalWindowOperator* pExtW = (SExternalWindowOperator*)pOperator->info;
5,481,565✔
1838
  int32_t                  startPos = 0, winRows = 0;
5,481,565✔
1839
  int64_t*                 tsCol = extWinExtractTsCol(pInputBlock, pExtW->primaryTsIndex, pOperator->pTaskInfo);
5,481,565✔
1840
  bool                     ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
5,481,565✔
1841
  int32_t                  code = 0, lino = 0;
5,481,565✔
1842
  SExtWinTimeWindow*       pWin = NULL;
5,481,565✔
1843
  bool                     scalarCalc = false;
5,481,565✔
1844

1845
  while (true) {
1846
    TAOS_CHECK_EXIT((*pExtW->getWinFp)(pOperator, tsCol, &startPos, &pInputBlock->info, &pWin, &winRows));
2,147,483,647✔
1847
    if (pWin == NULL) {
2,147,483,647✔
1848
      break;
5,481,565✔
1849
    }
1850

1851
    TAOS_CHECK_EXIT(extWinAggHandleEmptyWins(pOperator, pInputBlock, false, pWin));
2,147,483,647✔
1852

1853
    qDebug("%s ext window [%" PRId64 ", %" PRId64 ") agg start, ascScan:%d, startPos:%d, winRows:%d",
2,147,483,647✔
1854
           GET_TASKID(pOperator->pTaskInfo), pWin->tw.skey, pWin->tw.ekey, ascScan, startPos, winRows);        
1855

1856
    if (!scalarCalc) {
2,147,483,647✔
1857
      if (pExtW->scalarSupp.pExprInfo) {
5,412,724✔
1858
        SExprSupp* pScalarSup = &pExtW->scalarSupp;
6,318✔
1859
        TAOS_CHECK_EXIT(projectApplyFunctions(pScalarSup->pExprInfo, pInputBlock, pInputBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
6,318✔
1860
                                     pExtW->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo)));
1861
      }
1862
      
1863
      scalarCalc = true;
5,412,954✔
1864
    }
1865

1866
    if (pWin->tw.skey != pExtW->lastSKey || pWin->tw.ekey != pExtW->lastEKey || pWin->tw.skey == INT64_MIN) {
2,147,483,647✔
1867
      TAOS_CHECK_EXIT(
2,147,483,647✔
1868
          extWinAggSetWinOutputBuf(pOperator, pWin, &pOperator->exprSupp, &pExtW->aggSup, pOperator->pTaskInfo));
1869
    }
1870

1871
    updateTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pWin->tw, 1);
2,147,483,647✔
1872
    TAOS_CHECK_EXIT(extWinAggDo(pOperator, startPos, winRows, pInputBlock));
2,147,483,647✔
1873
    
1874
    pExtW->lastSKey = pWin->tw.skey;
2,147,483,647✔
1875
    pExtW->lastEKey = pWin->tw.ekey;
2,147,483,647✔
1876
    pExtW->lastWinId = extWinGetCurWinIdx(pOperator->pTaskInfo);
2,147,483,647✔
1877
    startPos += winRows;
2,147,483,647✔
1878
  }
1879

1880
_exit:
5,481,565✔
1881

1882
  if (code) {
5,481,565✔
1883
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
1884
  }
1885

1886
  return code;
5,481,565✔
1887
}
1888

1889
static int32_t extWinAggOutputRes(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,530,149✔
1890
  SExternalWindowOperator* pExtW = pOperator->info;
1,530,149✔
1891
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
1,530,149✔
1892
  SSDataBlock*             pBlock = pExtW->binfo.pRes;
1,530,149✔
1893
  int32_t                  code = TSDB_CODE_SUCCESS;
1,530,149✔
1894
  int32_t                  lino = 0;
1,530,149✔
1895
  SExprInfo*               pExprInfo = pOperator->exprSupp.pExprInfo;
1,530,149✔
1896
  int32_t                  numOfExprs = pOperator->exprSupp.numOfExprs;
1,530,149✔
1897
  int32_t*                 rowEntryOffset = pOperator->exprSupp.rowEntryInfoOffset;
1,530,149✔
1898
  SqlFunctionCtx*          pCtx = pOperator->exprSupp.pCtx;
1,530,149✔
1899
  int32_t                  numOfWin = pExtW->outWinIdx;
1,530,149✔
1900

1901
  pBlock->info.version = pTaskInfo->version;
1,530,149✔
1902
  blockDataCleanup(pBlock);
1,530,149✔
1903
  taosArrayClear(pExtW->pWinRowIdx);
1,530,149✔
1904

1905
  for (; pExtW->outputWinId < pExtW->pWins->size; ++pExtW->outputWinId) {
2,147,483,647✔
1906
    SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->outputWinId);
2,147,483,647✔
1907
    int32_t            winIdx = pWin->winOutIdx;
2,147,483,647✔
1908
    if (winIdx < 0) {
2,147,483,647✔
1909
      continue;
12,602,636✔
1910
    }
1911

1912
    pExtW->outputWinNum++;
2,147,483,647✔
1913
    SResultRow* pRow = (SResultRow*)((char*)pExtW->pResultRow + winIdx * pExtW->aggSup.resultRowSize);
2,147,483,647✔
1914

1915
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
2,147,483,647✔
1916

1917
    // no results, continue to check the next one
1918
    if (pRow->numOfRows == 0) {
2,147,483,647✔
1919
      continue;
×
1920
    }
1921

1922
    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
2,147,483,647✔
1923
      uint32_t newSize = pBlock->info.rows + pRow->numOfRows + numOfWin - pExtW->outputWinNum;
1,068,030✔
1924
      TAOS_CHECK_EXIT(blockDataEnsureCapacity(pBlock, newSize));
1,068,030✔
1925
      qDebug("datablock capacity not sufficient, expand to required:%d, current capacity:%d, %s", newSize,
1,068,030✔
1926
             pBlock->info.capacity, GET_TASKID(pTaskInfo));
1927
    }
1928

1929
    TAOS_CHECK_EXIT(copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo));
2,147,483,647✔
1930

1931
    pBlock->info.rows += pRow->numOfRows;
2,147,483,647✔
1932

1933
    TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pExtW->pWinRowIdx, pBlock, pRow->winIdx, pRow->numOfRows));
2,147,483,647✔
1934

1935
    if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
2,147,483,647✔
1936
      ++pExtW->outputWinId;
926,798✔
1937
      break;
926,798✔
1938
    }
1939
  }
1940

1941
  qDebug("%s result generated, rows:%" PRId64 ", groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
1,529,997✔
1942
         pBlock->info.id.groupId);
1943

1944
  int32_t rowsBeforeFilter = pBlock->info.rows;
1,529,997✔
1945
  SColumnInfoData* pFilterRes = NULL;
1,530,149✔
1946
  TAOS_CHECK_EXIT(doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL, &pFilterRes));
1,530,149✔
1947
  if (pBlock->info.rows < rowsBeforeFilter) {
1,530,149✔
1948
    if (pFilterRes != NULL) {
235✔
1949
      TAOS_CHECK_EXIT(extWinRebuildWinIdxByFilter(pTaskInfo, pExtW->pWinRowIdx, rowsBeforeFilter, pFilterRes));
235✔
1950
    } else {
1951
      // no indicator means all rows are filtered out by short-circuit path
1952
      taosArrayClear(pExtW->pWinRowIdx);
×
1953
    }
1954
  }
1955

1956
  pBlock->info.dataLoad = 1;
1,530,149✔
1957

1958
  *ppRes = (pBlock->info.rows > 0) ? pBlock : NULL;
1,530,149✔
1959

1960
  if (*ppRes) {
1,530,149✔
1961
    (*ppRes)->info.window.skey = pExtW->orgTableTimeRange.skey;
1,247,027✔
1962
    (*ppRes)->info.window.ekey = pExtW->orgTableTimeRange.ekey;
1,247,027✔
1963
  }
1964
  if (pOperator->pTaskInfo->pStreamRuntimeInfo) {
1,530,149✔
1965
    pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamBlkWinIdx = pExtW->pWinRowIdx;
570,701✔
1966
  }
1967

1968
_exit:
1,527,575✔
1969
  colDataDestroy(pFilterRes);
1,530,149✔
1970
  taosMemoryFree(pFilterRes);
1,530,149✔
1971

1972
  if (code != TSDB_CODE_SUCCESS) {
1,530,149✔
1973
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1974
  }
1975

1976
  return code;
1,530,149✔
1977
}
1978

1979
static int32_t extWinInitResultRow(SExecTaskInfo*        pTaskInfo, SExternalWindowOperator* pExtW, int32_t winNum) {
1,243,567✔
1980
  if (EEXT_MODE_SCALAR == pExtW->mode) {
1,243,567✔
1981
    return TSDB_CODE_SUCCESS;
997✔
1982
  }
1983

1984
  if (winNum <= pExtW->resultRowCapacity) {
1,242,570✔
1985
    return TSDB_CODE_SUCCESS;
150,578✔
1986
  }
1987
  
1988
  taosMemoryFreeClear(pExtW->pResultRow);
1,091,757✔
1989
  pExtW->resultRowCapacity = -1;
1,091,992✔
1990

1991
  int32_t code = 0, lino = 0;
1,091,992✔
1992
  
1993
  pExtW->pResultRow = taosMemoryCalloc(winNum, pExtW->aggSup.resultRowSize);
1,091,992✔
1994
  QUERY_CHECK_NULL(pExtW->pResultRow, code, lino, _exit, terrno);
1,091,992✔
1995

1996
  pExtW->resultRowCapacity = winNum;
1,091,992✔
1997

1998
_exit:
1,091,992✔
1999

2000
  if (code) {
1,091,992✔
2001
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
2002
  }
2003

2004
  return code;
1,091,992✔
2005
}
2006

2007
static void extWinFreeResultRow(SExternalWindowOperator* pExtW) {
283,122✔
2008
  if (pExtW->resultRowCapacity * pExtW->aggSup.resultRowSize >= 1048576) {
283,122✔
2009
    taosMemoryFreeClear(pExtW->pResultRow);
17,306✔
2010
    pExtW->resultRowCapacity = -1;
17,306✔
2011
  }
2012
  if (pExtW->binfo.pRes && pExtW->binfo.pRes->info.rows * pExtW->aggSup.resultRowSize >= 1048576) {
283,122✔
2013
    blockDataFreeCols(pExtW->binfo.pRes);
×
2014
  }
2015
}
283,122✔
2016

2017
static int32_t extWinInitWindowList(SExternalWindowOperator* pExtW, SExecTaskInfo*        pTaskInfo) {
283,420✔
2018
  if (taosArrayGetSize(pExtW->pWins) > 0) {
283,420✔
2019
    return TSDB_CODE_SUCCESS;
×
2020
  }
2021
  
2022
  int32_t code = 0, lino = 0;
284,119✔
2023
  SStreamRuntimeFuncInfo* pInfo = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
284,119✔
2024
  size_t size = taosArrayGetSize(pInfo->pStreamPesudoFuncVals);
284,119✔
2025
  SExtWinTimeWindow* pWin = taosArrayReserve(pExtW->pWins, size);
284,119✔
2026
  TSDB_CHECK_NULL(pWin, code, lino, _exit, terrno);
284,119✔
2027

2028
  TAOS_CHECK_EXIT(extWinInitResultRow(pTaskInfo, pExtW, size));
284,119✔
2029

2030
  if (pExtW->timeRangeExpr && pExtW->timeRangeExpr->needCalc) {
284,119✔
2031
    TAOS_CHECK_EXIT(scalarCalculateExtWinsTimeRange(pExtW->timeRangeExpr, pInfo, pWin));
97,405✔
2032
    if (qDebugFlag & DEBUG_DEBUG) {
97,639✔
2033
      for (int32_t i = 0; i < size; ++i) {
312,094✔
2034
        qDebug("%s the %d/%d ext window calced initialized, TR[%" PRId64 ", %" PRId64 ")", 
214,455✔
2035
            pTaskInfo->id.str, i, (int32_t)size, pWin[i].tw.skey, pWin[i].tw.ekey);
2036
      }
2037
    }
2038
  } else {
2039
    for (int32_t i = 0; i < size; ++i) {
89,607,432✔
2040
      SSTriggerCalcParam* pParam = taosArrayGet(pInfo->pStreamPesudoFuncVals, i);
89,422,320✔
2041

2042
      pWin[i].tw.skey = pParam->wstart;
89,421,864✔
2043
      pWin[i].tw.ekey = pParam->wend + ((pInfo->triggerType != STREAM_TRIGGER_SLIDING) ? 1 : 0);
89,421,630✔
2044
      pWin[i].winOutIdx = -1;
89,421,630✔
2045

2046
      qDebug("%s the %d/%d ext window initialized, TR[%" PRId64 ", %" PRId64 ")", 
89,421,396✔
2047
          pTaskInfo->id.str, i, (int32_t)size, pWin[i].tw.skey, pWin[i].tw.ekey);
2048
    }
2049
  }
2050
  
2051
  pExtW->outputWinId = pInfo->curIdx;
282,751✔
2052
  pExtW->lastWinId = -1;
284,119✔
2053
  pExtW->blkWinStartIdx = pInfo->curIdx;
284,119✔
2054

2055
_exit:
284,119✔
2056

2057
  if (code) {
284,119✔
2058
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
2059
  }
2060

2061
  return code;
284,119✔
2062
}
2063

2064
static bool extWinNonAggGotResBlock(SExternalWindowOperator* pExtW) {
2,169✔
2065
  if (pExtW->multiTableMode && !pExtW->inputHasOrder) {
2,169✔
2066
    return false;
2,103✔
2067
  }
2068
  int32_t remainWin = pExtW->outWinIdx - pExtW->outputWinId;
66✔
2069
  if (remainWin > 1 && (NULL == pExtW->timeRangeExpr || !pExtW->timeRangeExpr->needCalc)) {
66✔
2070
    return true;
×
2071
  }
2072
  
2073
  SList* pList = taosArrayGetP(pExtW->pOutputBlocks, pExtW->outputWinId);
66✔
2074
  if (!pList || listNEles(pList) <= 0) {
66✔
2075
    return false;
×
2076
  }
2077
  if (listNEles(pList) > 1) {
66✔
2078
    return true;
×
2079
  }
2080

2081
  SListNode* pNode = listHead(pList);
66✔
2082
  SArray* pIdx = *(SArray**)((SArray**)pNode->data + 1);
66✔
2083
  int32_t* winIdx = taosArrayGetLast(pIdx);
66✔
2084
  if (winIdx && *winIdx < pExtW->blkWinStartIdx) {
66✔
2085
    return true;
×
2086
  }
2087

2088
  return false;
66✔
2089
}
2090

2091
static int32_t getTimeWindowOfBlock(SSDataBlock *pBlock, col_id_t tsSlotId, int64_t *startTs, int64_t *endTs) {
2,882,137✔
2092
  int32_t code = TSDB_CODE_SUCCESS;
2,882,137✔
2093
  int32_t lino = 0;
2,882,137✔
2094
  int32_t tsIndex = -1;
2,882,137✔
2095
  for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
3,710,974✔
2096
    SColumnInfoData *pCol = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
3,710,974✔
2097
    QUERY_CHECK_NULL(pCol, code, lino, _return, terrno)
3,710,974✔
2098
    if (pCol->info.colId == tsSlotId) {
3,710,974✔
2099
      tsIndex = i;
2,882,137✔
2100
      break;
2,882,137✔
2101
    }
2102
  }
2103

2104
  if (tsIndex == -1) {
2,882,137✔
2105
    tsIndex = (int32_t)taosArrayGetSize(pBlock->pDataBlock) - 1;
×
2106
  }
2107

2108
  SColumnInfoData *pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, tsIndex);
2,882,137✔
2109
  QUERY_CHECK_NULL(pColData, code, lino, _return, terrno)
2,882,137✔
2110

2111
  GET_TYPED_DATA(*startTs, int64_t, TSDB_DATA_TYPE_TIMESTAMP, colDataGetNumData(pColData, 0), 0);
2,882,137✔
2112
  GET_TYPED_DATA(*endTs, int64_t, TSDB_DATA_TYPE_TIMESTAMP, colDataGetNumData(pColData, pBlock->info.rows - 1), 0);
2,882,137✔
2113

2114
  return code;
2,882,137✔
2115
_return:
×
2116
  qError("failed to get time window of block, %s code:%s, line:%d", __func__, tstrerror(code), lino);
×
2117
  return code;
×
2118
}
2119

2120
static int32_t extWinOpen(SOperatorInfo* pOperator) {
1,243,567✔
2121
  if (OPTR_IS_OPENED(pOperator) && !pOperator->pOperatorGetParam) {
1,243,567✔
2122
    return TSDB_CODE_SUCCESS;
×
2123
  }
2124
  
2125
  int32_t                  code = 0;
1,243,567✔
2126
  int32_t                  lino = 0;
1,243,567✔
2127
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
1,243,567✔
2128
  SOperatorInfo*           pDownstream = pOperator->pDownstream[0];
1,243,567✔
2129
  SExternalWindowOperator* pExtW = pOperator->info;
1,243,332✔
2130
  SExprSupp*               pSup = &pOperator->exprSupp;
1,243,567✔
2131

2132
  if (pOperator->pOperatorGetParam) {
1,243,567✔
2133
    SOperatorParam*               pParam = (SOperatorParam*)(pOperator->pOperatorGetParam);
959,448✔
2134
    SOperatorParam*               pDownParam = (SOperatorParam*)(pOperator->pDownstreamGetParams[0]);
959,448✔
2135
    SExchangeOperatorParam*       pExecParam = NULL;
959,448✔
2136
    SExternalWindowOperatorParam* pExtPram = (SExternalWindowOperatorParam*)pParam->value;
959,448✔
2137

2138
    if (pExtW->pWins) {
959,448✔
2139
      taosArrayDestroy(pExtW->pWins);
959,448✔
2140
    }
2141

2142
    pExtW->pWins = pExtPram->ExtWins;
959,448✔
2143

2144
    TAOS_CHECK_EXIT(extWinInitResultRow(pTaskInfo, pExtW, taosArrayGetSize(pExtW->pWins)));
959,448✔
2145
    pExtPram->ExtWins = NULL;
959,448✔
2146
    pExtW->outputWinId = 0;
959,448✔
2147
    pExtW->lastWinId = -1;
959,448✔
2148
    pExtW->blkWinStartIdx = 0;
959,448✔
2149
    pExtW->outWinIdx = 0;
959,448✔
2150
    pExtW->lastSKey = INT64_MIN;
959,448✔
2151
    pExtW->lastEKey = INT64_MIN;
959,448✔
2152
    pExtW->isDynWindow = true;
959,448✔
2153
    pExtW->orgTableTimeRange.skey = INT64_MAX;
959,448✔
2154
    pExtW->orgTableTimeRange.ekey = INT64_MIN;
959,448✔
2155

2156
    QUERY_CHECK_CONDITION(pOperator->numOfDownstream == 1, code, lino, _exit, TSDB_CODE_INVALID_PARA)
959,448✔
2157

2158
    switch (pDownParam->opType) {
959,448✔
2159
      case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: {
959,448✔
2160
        pExecParam = (SExchangeOperatorParam*)((SOperatorParam*)(pOperator->pDownstreamGetParams[0]))->value;
959,448✔
2161
        if (!pExecParam->multiParams) {
959,448✔
2162
          pExecParam->basic.vgId = pExtW->orgTableVgId;
728,412✔
2163
          taosArrayClear(pExecParam->basic.uidList);
728,412✔
2164
          QUERY_CHECK_NULL(taosArrayPush(pExecParam->basic.uidList, &pExtW->orgTableUid), code, lino, _exit, terrno)
1,456,824✔
2165
        }
2166
        break;
959,448✔
2167
      }
2168
      default:
×
2169
        break;
×
2170
    }
2171

2172
    freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
959,448✔
2173
    pOperator->pOperatorGetParam = NULL;
959,448✔
2174
  } else {
2175
    TAOS_CHECK_EXIT(extWinInitWindowList(pExtW, pTaskInfo));
283,649✔
2176
  }
2177

2178
  while (1) {
5,483,504✔
2179
    pExtW->blkWinIdx = -1;
6,727,071✔
2180
    pExtW->blkWinStartSet = false;
6,727,071✔
2181
    pExtW->blkRowStartIdx = 0;
6,726,836✔
2182

2183
    SSDataBlock* pBlock = getNextBlockFromDownstreamRemainDetach(pOperator, 0);
6,727,066✔
2184
    if (pBlock == NULL) {
6,727,071✔
2185
      if (EEXT_MODE_AGG == pExtW->mode) {
1,243,567✔
2186
        TAOS_CHECK_EXIT(extWinAggHandleEmptyWins(pOperator, pBlock, true, NULL));
1,242,570✔
2187
      }
2188
      pExtW->blkWinStartIdx = pExtW->pWins->size;
1,243,567✔
2189
      break;
1,243,567✔
2190
    }
2191

2192
    if (pExtW->isDynWindow) {
5,483,504✔
2193
      TSKEY skey = 0;
2,882,137✔
2194
      TSKEY ekey = 0;
2,882,137✔
2195
      code = getTimeWindowOfBlock(pBlock, pExtW->primaryTsIndex, &skey, &ekey);
2,882,137✔
2196
      QUERY_CHECK_CODE(code, lino, _exit);
2,882,137✔
2197
      pExtW->orgTableTimeRange.skey = TMIN(pExtW->orgTableTimeRange.skey, skey);
2,882,137✔
2198
      pExtW->orgTableTimeRange.ekey = TMAX(pExtW->orgTableTimeRange.ekey, ekey);
2,882,137✔
2199
    }
2200

2201
    printDataBlock(pBlock, __func__, pTaskInfo->id.str, pTaskInfo->id.queryId);
5,483,504✔
2202

2203
    qDebug("ext window mode:%d got %" PRId64 " rows from downstream", pExtW->mode, pBlock->info.rows);
5,483,504✔
2204
    
2205
    switch (pExtW->mode) {
5,483,734✔
2206
      case EEXT_MODE_SCALAR:
2,169✔
2207
        TAOS_CHECK_EXIT(extWinProjectOpen(pOperator, pBlock));
2,169✔
2208
        if (extWinNonAggGotResBlock(pExtW)) {
2,169✔
2209
          return code;
×
2210
        }
2211
        break;
2,169✔
2212
      case EEXT_MODE_AGG:
5,481,565✔
2213
        TAOS_CHECK_EXIT(extWinAggOpen(pOperator, pBlock));
5,481,565✔
2214
        break;
5,481,335✔
2215
      case EEXT_MODE_INDEFR_FUNC:
×
2216
        TAOS_CHECK_EXIT(extWinIndefRowsOpen(pOperator, pBlock));
×
2217
        if (extWinNonAggGotResBlock(pExtW)) {
×
2218
          return code;
×
2219
        }
2220
        break;
×
2221
      default:
×
2222
        break;
×
2223
    }
2224
  }
2225

2226
  if (pOperator->pOperatorGetParam) {
1,243,567✔
2227
    freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
×
2228
    pOperator->pOperatorGetParam = NULL;
×
2229
  }
2230
  OPTR_SET_OPENED(pOperator);
1,243,567✔
2231

2232
#if 0
2233
  if (pExtW->mode == EEXT_MODE_AGG) {
2234
    qDebug("ext window before dump final rows num:%d", tSimpleHashGetSize(pExtW->aggSup.pResultRowHashTable));
2235

2236
    code = initGroupedResultInfo(&pExtW->groupResInfo, pExtW->aggSup.pResultRowHashTable, pExtW->binfo.inputTsOrder);
2237
    QUERY_CHECK_CODE(code, lino, _exit);
2238

2239
    qDebug("ext window after dump final rows num:%d", tSimpleHashGetSize(pExtW->aggSup.pResultRowHashTable));
2240
  }
2241
#endif
2242

2243
_exit:
1,243,567✔
2244

2245
  if (code != 0) {
1,243,567✔
2246
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
2247
    pTaskInfo->code = code;
×
2248
    T_LONG_JMP(pTaskInfo->env, code);
×
2249
  }
2250
  
2251
  return code;
1,243,567✔
2252
}
2253

2254
static int32_t extWinNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,531,387✔
2255
  int32_t                  code = 0;
1,531,387✔
2256
  int32_t                  lino = 0;
1,531,387✔
2257
  SExternalWindowOperator* pExtW = pOperator->info;
1,531,387✔
2258
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
1,531,387✔
2259

2260
  if (pOperator->status == OP_EXEC_DONE && !pOperator->pOperatorGetParam) {
1,531,387✔
2261
    *ppRes = NULL;
×
2262
    return code;
×
2263
  }
2264

2265
  if (pOperator->pOperatorGetParam) {
1,531,387✔
2266
    if (pOperator->status == OP_EXEC_DONE) {
959,448✔
2267
      pOperator->status = OP_NOT_OPENED;
41,850✔
2268
    }
2269
  }
2270

2271
  extWinRecycleBlkNode(pExtW, &pExtW->pLastBlkNode);
1,531,387✔
2272

2273
  if (pOperator->status == OP_NOT_OPENED) {
1,531,387✔
2274
    TAOS_CHECK_EXIT(pOperator->fpSet._openFn(pOperator));
1,243,567✔
2275
  }
2276

2277
  if (pExtW->mode == EEXT_MODE_SCALAR || pExtW->mode == EEXT_MODE_INDEFR_FUNC) {
1,531,387✔
2278
    TAOS_CHECK_EXIT(extWinNonAggOutputRes(pOperator, ppRes));
1,238✔
2279
    if (NULL == *ppRes) {
1,238✔
2280
      setOperatorCompleted(pOperator);
×
2281
      extWinFreeResultRow(pExtW);
×
2282
    }
2283
  } else {
2284
#if 0    
2285
    doBuildResultDatablock(pOperator, &pExtW->binfo, &pExtW->groupResInfo, pExtW->aggSup.pResultBuf);
2286
    bool hasRemain = hasRemainResults(&pExtW->groupResInfo);
2287
    if (!hasRemain) {
2288
      setOperatorCompleted(pOperator);
2289
      break;
2290
    }
2291
    if (pExtW->binfo.pRes->info.rows > 0) break;
2292
#else
2293
    while (1) {
2294
      TAOS_CHECK_EXIT(extWinAggOutputRes(pOperator, ppRes));
1,530,149✔
2295
      if (NULL != *ppRes) {
1,530,149✔
2296
        break;
1,247,027✔
2297
      }
2298

2299
      if (pExtW->outputWinId >= pExtW->pWins->size) {
283,122✔
2300
        setOperatorCompleted(pOperator);
283,122✔
2301
        if (pTaskInfo->pStreamRuntimeInfo) {
283,122✔
2302
          extWinFreeResultRow(pExtW);
283,122✔
2303
        }
2304
        break;
283,122✔
2305
      }
2306
    }
2307
#endif      
2308
  }
2309

2310
  if (*ppRes) {
1,531,387✔
2311
    pOperator->resultInfo.totalRows += (*ppRes)->info.rows;
1,248,265✔
2312
    printDataBlock(*ppRes, __func__, GET_TASKID(pTaskInfo), pTaskInfo->id.queryId);
1,248,265✔
2313
  }
2314
  
2315
_exit:
283,122✔
2316

2317
  if (code) {
1,531,387✔
2318
    qError("%s %s failed at line %d since %s", GET_TASKID(pTaskInfo), __func__, lino, tstrerror(code));
×
2319
    pTaskInfo->code = code;
×
2320
    T_LONG_JMP(pTaskInfo->env, code);
×
2321
  }
2322

2323
  if ((*ppRes) && (*ppRes)->info.rows <= 0) {
1,531,387✔
2324
    *ppRes = NULL;
×
2325
  }
2326

2327
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM && (*ppRes)) {
1,531,387✔
2328
    printDataBlock(*ppRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo), pTaskInfo->id.queryId);
288,817✔
2329
  }
2330
  
2331
  return code;
1,531,387✔
2332
}
2333

2334

2335
int32_t createExternalWindowOperator(SOperatorInfo* pDownstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo,
1,070,045✔
2336
                                     SOperatorInfo** pOptrOut) {
2337
  SExternalWindowPhysiNode* pPhynode = (SExternalWindowPhysiNode*)pNode;
1,070,045✔
2338
  QRY_PARAM_CHECK(pOptrOut);
1,070,045✔
2339
  int32_t                  code = 0;
1,070,045✔
2340
  int32_t                  lino = 0;
1,070,045✔
2341
  SExternalWindowOperator* pExtW = taosMemoryCalloc(1, sizeof(SExternalWindowOperator));
1,070,045✔
2342
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1,070,045✔
2343
  pOperator->pPhyNode = pNode;
1,070,045✔
2344
  if (!pExtW || !pOperator) {
1,070,045✔
2345
    code = terrno;
×
2346
    lino = __LINE__;
×
2347
    goto _error;
×
2348
  }
2349
  
2350
  setOperatorInfo(pOperator, "ExternalWindowOperator", QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW, true, OP_NOT_OPENED,
1,070,045✔
2351
                  pExtW, pTaskInfo);
2352
                  
2353
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhynode->window.node.pOutputDataBlockDesc);
1,070,045✔
2354
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
1,070,045✔
2355
  initBasicInfo(&pExtW->binfo, pResBlock);
1,070,045✔
2356

2357
  pExtW->primaryTsIndex = ((SColumnNode*)pPhynode->window.pTspk)->slotId;
1,070,045✔
2358
  pExtW->mode = pPhynode->window.pProjs ? EEXT_MODE_SCALAR : (pPhynode->window.indefRowsFunc ? EEXT_MODE_INDEFR_FUNC : EEXT_MODE_AGG);
1,070,045✔
2359
  pExtW->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC;
1,070,045✔
2360
  pExtW->binfo.outputTsOrder = pExtW->binfo.inputTsOrder;
1,069,815✔
2361
  pExtW->isDynWindow = false;
1,069,815✔
2362

2363
  if (pTaskInfo->pStreamRuntimeInfo != NULL){
1,070,045✔
2364
    pTaskInfo->pStreamRuntimeInfo->funcInfo.withExternalWindow = true;
151,599✔
2365
  }
2366

2367
  // pExtW->limitInfo = (SLimitInfo){0};
2368
  // initLimitInfo(pPhynode->window.node.pLimit, pPhynode->window.node.pSlimit, &pExtW->limitInfo);
2369

2370
  if (pPhynode->window.pProjs) {
1,070,045✔
2371
    int32_t    numOfScalarExpr = 0;
997✔
2372
    SExprInfo* pScalarExprInfo = NULL;
997✔
2373
    code = createExprInfo(pPhynode->window.pProjs, NULL, &pScalarExprInfo, &numOfScalarExpr);
997✔
2374
    QUERY_CHECK_CODE(code, lino, _error);
997✔
2375

2376
    code = initExprSupp(&pExtW->scalarSupp, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
997✔
2377
    QUERY_CHECK_CODE(code, lino, _error);
997✔
2378

2379
  //if (pExtW->multiTableMode) {
2380
    pExtW->pOutputBlocks = taosArrayInit_s(POINTER_BYTES, STREAM_CALC_REQ_MAX_WIN_NUM);
997✔
2381
    if (!pExtW->pOutputBlocks) QUERY_CHECK_CODE(terrno, lino, _error);
997✔
2382
  //}
2383
    pExtW->pFreeBlocks = tdListNew(POINTER_BYTES * 2);
997✔
2384
    QUERY_CHECK_NULL(pExtW->pFreeBlocks, code, lino, _error, terrno);
997✔
2385
  } else if (pExtW->mode == EEXT_MODE_AGG) {
1,069,048✔
2386
    if (pPhynode->window.pExprs != NULL) {
1,069,048✔
2387
      int32_t    num = 0;
2,106✔
2388
      SExprInfo* pSExpr = NULL;
2,106✔
2389
      code = createExprInfo(pPhynode->window.pExprs, NULL, &pSExpr, &num);
2,106✔
2390
      QUERY_CHECK_CODE(code, lino, _error);
2,106✔
2391
    
2392
      code = initExprSupp(&pExtW->scalarSupp, pSExpr, num, &pTaskInfo->storageAPI.functionStore);
2,106✔
2393
      if (code != TSDB_CODE_SUCCESS) {
2,106✔
2394
        goto _error;
×
2395
      }
2396
      checkIndefRowsFuncs(&pExtW->scalarSupp);
2,106✔
2397
    }
2398
    
2399
    size_t keyBufSize = sizeof(int64_t) * 2 + POINTER_BYTES;
1,069,048✔
2400
    initResultSizeInfo(&pOperator->resultInfo, 4096);
1,069,048✔
2401
    //code = blockDataEnsureCapacity(pExtW->binfo.pRes, pOperator->resultInfo.capacity);
2402
    //QUERY_CHECK_CODE(code, lino, _error);
2403

2404
    pExtW->pWinRowIdx = taosArrayInit(4096, sizeof(int64_t));
1,069,048✔
2405
    TSDB_CHECK_NULL(pExtW->pWinRowIdx, code, lino, _error, terrno);
1,069,048✔
2406
    
2407
    int32_t num = 0;
1,069,048✔
2408
    SExprInfo* pExprInfo = NULL;
1,069,048✔
2409
    code = createExprInfo(pPhynode->window.pFuncs, NULL, &pExprInfo, &num);
1,069,048✔
2410
    QUERY_CHECK_CODE(code, lino, _error);
1,069,048✔
2411
    pOperator->exprSupp.hasWindow = true;
1,069,048✔
2412
    pOperator->exprSupp.hasWindowOrGroup = true;
1,069,048✔
2413
    code = initAggSup(&pOperator->exprSupp, &pExtW->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, 0, 0);
1,069,048✔
2414
    QUERY_CHECK_CODE(code, lino, _error);
1,069,048✔
2415

2416
    code = filterInitFromNode((SNode*)pNode->pConditions, &pOperator->exprSupp.pFilterInfo, 0,
1,069,048✔
2417
                              pTaskInfo->pStreamRuntimeInfo);
1,069,048✔
2418
    QUERY_CHECK_CODE(code, lino, _error);
1,069,048✔
2419

2420
    nodesWalkExprs(pPhynode->window.pFuncs, extWinHasCountLikeFunc, &pExtW->hasCountFunc);
1,069,048✔
2421
    if (pExtW->hasCountFunc) {
1,069,048✔
2422
      code = extWinCreateEmptyInputBlock(pOperator, &pExtW->pEmptyInputBlock);
614,254✔
2423
      QUERY_CHECK_CODE(code, lino, _error);
614,024✔
2424
      qDebug("%s ext window has CountLikeFunc", pOperator->pTaskInfo->id.str);
614,024✔
2425
    } else {
2426
      qDebug("%s ext window doesn't have CountLikeFunc", pOperator->pTaskInfo->id.str);
454,794✔
2427
    }
2428

2429
    code = initExecTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pTaskInfo->window);
1,069,048✔
2430
    QUERY_CHECK_CODE(code, lino, _error);
1,069,048✔
2431

2432
    pExtW->lastSKey = INT64_MIN;
1,069,048✔
2433
    pExtW->lastEKey = INT64_MIN;
1,069,048✔
2434
  } else {
2435
    size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
×
2436
    
2437
    if (pPhynode->window.pExprs != NULL) {
×
2438
      int32_t    num = 0;
×
2439
      SExprInfo* pSExpr = NULL;
×
2440
      code = createExprInfo(pPhynode->window.pExprs, NULL, &pSExpr, &num);
×
2441
      QUERY_CHECK_CODE(code, lino, _error);
×
2442
    
2443
      code = initExprSupp(&pExtW->scalarSupp, pSExpr, num, &pTaskInfo->storageAPI.functionStore);
×
2444
      if (code != TSDB_CODE_SUCCESS) {
×
2445
        goto _error;
×
2446
      }
2447
    }
2448
    
2449
    int32_t    numOfExpr = 0;
×
2450
    SExprInfo* pExprInfo = NULL;
×
2451
    code = createExprInfo(pPhynode->window.pFuncs, NULL, &pExprInfo, &numOfExpr);
×
2452
    TSDB_CHECK_CODE(code, lino, _error);
×
2453
    
2454
    code = initAggSup(&pOperator->exprSupp, &pExtW->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str,
×
2455
                              NULL, &pTaskInfo->storageAPI.functionStore);
2456
    TSDB_CHECK_CODE(code, lino, _error);
×
2457
    pOperator->exprSupp.hasWindowOrGroup = false;
×
2458
    
2459
    //code = setFunctionResultOutput(pOperator, &pExtW->binfo, &pExtW->aggSup, MAIN_SCAN, numOfExpr);
2460
    //TSDB_CHECK_CODE(code, lino, _error);
2461
    
2462
    code = filterInitFromNode((SNode*)pNode->pConditions, &pOperator->exprSupp.pFilterInfo, 0,
×
2463
                              pTaskInfo->pStreamRuntimeInfo);
×
2464
    TSDB_CHECK_CODE(code, lino, _error);
×
2465
    
2466
    pExtW->binfo.inputTsOrder = pNode->inputTsOrder;
×
2467
    pExtW->binfo.outputTsOrder = pNode->outputTsOrder;
×
2468
    code = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfExpr, &pExtW->pPseudoColInfo);
×
2469
    TSDB_CHECK_CODE(code, lino, _error);
×
2470

2471
  //if (pExtW->multiTableMode) {
2472
    pExtW->pOutputBlocks = taosArrayInit_s(POINTER_BYTES, STREAM_CALC_REQ_MAX_WIN_NUM);
×
2473
    if (!pExtW->pOutputBlocks) QUERY_CHECK_CODE(terrno, lino, _error);
×
2474
  //}
2475
    pExtW->pFreeBlocks = tdListNew(POINTER_BYTES * 2);
×
2476
    QUERY_CHECK_NULL(pExtW->pFreeBlocks, code, lino, _error, terrno);  
×
2477
  }
2478

2479
  pExtW->pWins = taosArrayInit(4096, sizeof(SExtWinTimeWindow));
1,070,045✔
2480
  if (!pExtW->pWins) QUERY_CHECK_CODE(terrno, lino, _error);
1,070,045✔
2481
  
2482
  //initResultRowInfo(&pExtW->binfo.resultRowInfo);
2483

2484
  pExtW->timeRangeExpr = (STimeRangeNode*)pPhynode->pTimeRange;
1,070,045✔
2485
  if (pExtW->timeRangeExpr) {
1,070,045✔
2486
    QUERY_CHECK_NULL(pExtW->timeRangeExpr->pStart, code, lino, _error, TSDB_CODE_STREAM_INTERNAL_ERROR);
151,829✔
2487
    QUERY_CHECK_NULL(pExtW->timeRangeExpr->pEnd, code, lino, _error, TSDB_CODE_STREAM_INTERNAL_ERROR);
151,829✔
2488
  }
2489

2490
  if (pPhynode->isSingleTable) {
1,070,045✔
2491
    pExtW->getWinFp = (pExtW->timeRangeExpr && (pExtW->timeRangeExpr->needCalc || (pTaskInfo->pStreamRuntimeInfo->funcInfo.addOptions & CALC_SLIDING_OVERLAP))) ? extWinGetOvlpWin : extWinGetNoOvlpWin;
748,228✔
2492
    pExtW->multiTableMode = false;
748,228✔
2493
  } else {
2494
    pExtW->getWinFp = (pExtW->timeRangeExpr && (pExtW->timeRangeExpr->needCalc || (pTaskInfo->pStreamRuntimeInfo->funcInfo.addOptions & CALC_SLIDING_OVERLAP))) ? extWinGetMultiTbOvlpWin : extWinGetMultiTbNoOvlpWin;
321,817✔
2495
    pExtW->multiTableMode = true;
321,583✔
2496
  }
2497
  pExtW->inputHasOrder = pPhynode->inputHasOrder;
1,069,576✔
2498
  pExtW->orgTableUid = pPhynode->orgTableUid;
1,070,045✔
2499
  pExtW->orgTableVgId = pPhynode->orgTableVgId;
1,070,045✔
2500

2501
  pOperator->fpSet = createOperatorFpSet(extWinOpen, extWinNext, NULL, destroyExternalWindowOperatorInfo,
1,069,810✔
2502
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2503
  setOperatorResetStateFn(pOperator, resetExternalWindowOperator);
1,070,045✔
2504
  code = appendDownstream(pOperator, &pDownstream, 1);
1,070,045✔
2505
  if (code != 0) {
1,070,045✔
2506
    goto _error;
×
2507
  }
2508

2509
  *pOptrOut = pOperator;
1,070,045✔
2510
  return code;
1,070,045✔
2511

2512
_error:
×
2513

2514
  if (pExtW != NULL) {
×
2515
    destroyExternalWindowOperatorInfo(pExtW);
×
2516
  }
2517

2518
  destroyOperatorAndDownstreams(pOperator, &pDownstream, 1);
×
2519
  pTaskInfo->code = code;
×
2520
  qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code));
×
2521
  return code;
×
2522
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc