• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5011

03 Apr 2026 03:59PM UTC coverage: 72.3% (+0.008%) from 72.292%
#5011

push

travis-ci

web-flow
merge: from main to 3.0 branch #35067

4053 of 5985 new or added lines in 68 files covered. (67.72%)

732 existing lines in 143 files now uncovered.

257430 of 356056 relevant lines covered (72.3%)

131834103.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.98
/source/libs/executor/src/streamexternalwindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "operator.h"
18
#include "querytask.h"
19
#include "tdatablock.h"
20
#include "stream.h"
21
#include "filter.h"
22
#include "cmdnodes.h"
23

24
typedef struct SBlockList {
25
  const SSDataBlock* pSrcBlock;
26
  SList*             pBlocks;
27
  int32_t            blockRowNumThreshold;
28
} SBlockList;
29

30

31
typedef int32_t (*extWinGetWinFp)(SOperatorInfo*, int64_t*, int32_t*, SDataBlockInfo*, SExtWinTimeWindow**, int32_t*);
32

33
typedef struct SExtWindowStat {
34
  int64_t resBlockCreated;
35
  int64_t resBlockDestroyed;
36
  int64_t resBlockRecycled;
37
  int64_t resBlockReused;
38
  int64_t resBlockAppend;
39
} SExtWindowStat;
40

41
typedef struct SExternalWindowOperator {
42
  SOptrBasicInfo     binfo;
43
  SExprSupp          scalarSupp;
44
  int32_t            primaryTsIndex;
45
  EExtWinMode        mode;
46
  bool               multiTableMode;
47
  bool               inputHasOrder;
48
  SArray*            pWins;           // SArray<SExtWinTimeWindow>
49
  SArray*            pPseudoColInfo;  
50
  STimeRangeNode*    timeRangeExpr;
51
  
52
  extWinGetWinFp     getWinFp;
53

54
  bool               blkWinStartSet;
55
  int32_t            blkWinStartIdx;
56
  int32_t            blkWinIdx;
57
  int32_t            blkRowStartIdx;
58
  int32_t            outputWinId;
59
  int32_t            outputWinNum;
60
  int32_t            outWinIdx;
61

62
  // for project&indefRows
63
  SList*             pFreeBlocks;    // SList<SSDatablock*+SAarray*>
64
  SArray*            pOutputBlocks;  // SArray<SList*>, for each window, we have a list of blocks
65
  SListNode*         pLastBlkNode; 
66
  SSDataBlock*       pTmpBlock;
67
  
68
  // for agg
69
  SAggSupporter      aggSup;
70
  STimeWindowAggSupp twAggSup;
71

72
  int32_t            resultRowCapacity;
73
  SResultRow*        pResultRow;
74

75
  int64_t            lastSKey;
76
  int64_t            lastEKey;
77
  int32_t            lastWinId;
78
  SSDataBlock*       pEmptyInputBlock;
79
  bool               hasCountFunc;
80
  SExtWindowStat     stat;
81
  SArray*            pWinRowIdx;
82

83
  // for vtable window query
84
  bool               isDynWindow;
85
  int32_t            orgTableVgId;
86
  tb_uid_t           orgTableUid;
87
  STimeWindow        orgTableTimeRange;
88
} SExternalWindowOperator;
89

90

91
static int32_t extWinBlockListAddBlock(SExternalWindowOperator* pExtW, SList* pList, int32_t rows, SSDataBlock** ppBlock, SArray** ppIdx) {
10,099✔
92
  SSDataBlock* pRes = NULL;
10,099✔
93
  int32_t code = 0, lino = 0;
10,099✔
94

95
  if (listNEles(pExtW->pFreeBlocks) > 0) {
10,099✔
96
    SListNode* pNode = tdListPopHead(pExtW->pFreeBlocks);
8,850✔
97
    *ppBlock = *(SSDataBlock**)pNode->data;
8,850✔
98
    *ppIdx = *(SArray**)((SArray**)pNode->data + 1);
8,850✔
99
    tdListAppendNode(pList, pNode);
8,850✔
100
    pExtW->stat.resBlockReused++;
8,850✔
101
  } else {
102
    TAOS_CHECK_EXIT(createOneDataBlock(pExtW->binfo.pRes, false, &pRes));
1,249✔
103
    TAOS_CHECK_EXIT(blockDataEnsureCapacity(pRes, TMAX(rows, 4096)));
1,249✔
104
    SArray* pIdx = taosArrayInit(10, sizeof(int64_t));
1,249✔
105
    TSDB_CHECK_NULL(pIdx, code, lino, _exit, terrno);
1,249✔
106
    void* res[2] = {pRes, pIdx};
1,249✔
107
    TAOS_CHECK_EXIT(tdListAppend(pList, res));
1,249✔
108

109
    *ppBlock = pRes;
1,249✔
110
    *ppIdx = pIdx;
1,249✔
111
    pExtW->stat.resBlockCreated++;
1,249✔
112
  }
113
  
114
_exit:
10,099✔
115

116
  if (code) {
10,099✔
NEW
117
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
NEW
118
    blockDataDestroy(pRes);
×
119
  }
120
  
121
  return code;
10,099✔
122
}
123

124
static int32_t extWinGetLastBlockFromList(SExternalWindowOperator* pExtW, SList* pList, int32_t rows, SSDataBlock** ppBlock, SArray** ppIdx) {
11,533✔
125
  int32_t    code = 0, lino = 0;
11,533✔
126
  SSDataBlock* pRes = NULL;
11,533✔
127

128
  SListNode* pNode = TD_DLIST_TAIL(pList);
11,533✔
129
  if (NULL == pNode) {
11,533✔
130
    TAOS_CHECK_EXIT(extWinBlockListAddBlock(pExtW, pList, rows, ppBlock, ppIdx));
10,099✔
131
    return code;
10,099✔
132
  }
133

134
  pRes = *(SSDataBlock**)pNode->data;
1,434✔
135
  if ((pRes->info.rows + rows) > pRes->info.capacity) {
1,434✔
NEW
136
    TAOS_CHECK_EXIT(extWinBlockListAddBlock(pExtW, pList, rows, ppBlock, ppIdx));
×
NEW
137
    return code;
×
138
  }
139

140
  *ppIdx = *(SArray**)((SSDataBlock**)pNode->data + 1);
1,434✔
141
  *ppBlock = pRes;
1,434✔
142
  pExtW->stat.resBlockAppend++;
1,434✔
143

144
_exit:
1,434✔
145

146
  if (code) {
1,434✔
NEW
147
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
148
  }
149
  
150
  return code;
1,434✔
151
}
152

153
static void extWinDestroyBlockList(void* p) {
41,000,960✔
154
  if (NULL == p) {
41,000,960✔
NEW
155
    return;
×
156
  }
157

158
  SListNode* pTmp = NULL;
41,000,960✔
159
  SList** ppList = (SList**)p;
41,000,960✔
160
  if ((*ppList) && TD_DLIST_NELES(*ppList) > 0) {
41,000,960✔
NEW
161
    SListNode* pNode = TD_DLIST_HEAD(*ppList);
×
NEW
162
    while (pNode) {
×
NEW
163
      SSDataBlock* pBlock = *(SSDataBlock**)pNode->data;
×
NEW
164
      blockDataDestroy(pBlock);
×
NEW
165
      SArray* pIdx = *(SArray**)((SArray**)pNode->data + 1);
×
NEW
166
      taosArrayDestroy(pIdx);
×
NEW
167
      pTmp = pNode;
×
NEW
168
      pNode = pNode->dl_next_;
×
NEW
169
      taosMemoryFree(pTmp);
×
170
    }
171
  }
172
  taosMemoryFree(*ppList);
41,000,960✔
173
}
174

175

176
static void extWinRecycleBlkNode(SExternalWindowOperator* pExtW, SListNode** ppNode) {
1,813,629✔
177
  if (NULL == ppNode || NULL == *ppNode) {
1,813,629✔
178
    return;
1,804,758✔
179
  }
180

181
  SSDataBlock* pBlock = *(SSDataBlock**)(*ppNode)->data;
8,871✔
182
  SArray* pIdx = *(SArray**)((SArray**)(*ppNode)->data + 1);
9,098✔
183
  
184
  if (listNEles(pExtW->pFreeBlocks) >= 10) {
9,098✔
NEW
185
    blockDataDestroy(pBlock);
×
NEW
186
    taosArrayDestroy(pIdx);
×
NEW
187
    taosMemoryFreeClear(*ppNode);
×
NEW
188
    pExtW->stat.resBlockDestroyed++;
×
NEW
189
    return;
×
190
  }
191
  
192
  blockDataCleanup(pBlock);
9,098✔
193
  taosArrayClear(pIdx);
9,098✔
194
  tdListPrependNode(pExtW->pFreeBlocks, *ppNode);
9,098✔
195
  *ppNode = NULL;
9,098✔
196
  pExtW->stat.resBlockRecycled++;
9,098✔
197
}
198

199
static void extWinRecycleBlockList(SExternalWindowOperator* pExtW, void* p) {
10,099✔
200
  if (NULL == p) {
10,099✔
NEW
201
    return;
×
202
  }
203

204
  SListNode* pTmp = NULL;
10,099✔
205
  SList** ppList = (SList**)p;
10,099✔
206
  if ((*ppList) && TD_DLIST_NELES(*ppList) > 0) {
10,099✔
NEW
207
    SListNode* pNode = TD_DLIST_HEAD(*ppList);
×
NEW
208
    while (pNode) {
×
NEW
209
      pTmp = pNode;
×
NEW
210
      pNode = pNode->dl_next_;
×
NEW
211
      extWinRecycleBlkNode(pExtW, &pTmp);
×
212
    }
213
  }
214
  taosMemoryFree(*ppList);
10,099✔
215
}
216
static void extWinDestroyBlkNode(SExternalWindowOperator* pInfo, SListNode* pNode) {
188,414✔
217
  if (NULL == pNode) {
188,414✔
218
    return;
187,165✔
219
  }
220

221
  SSDataBlock* pBlock = *(SSDataBlock**)pNode->data;
1,249✔
222
  SArray* pIdx = *(SArray**)((SArray**)pNode->data + 1);
1,249✔
223
  
224
  blockDataDestroy(pBlock);
1,249✔
225
  taosArrayDestroy(pIdx);
1,249✔
226

227
  taosMemoryFree(pNode);
1,249✔
228

229
  pInfo->stat.resBlockDestroyed++;
1,249✔
230
}
231

232

233
static void destroyExternalWindowOperatorInfo(void* param) {
188,166✔
234
  if (NULL == param) {
188,166✔
NEW
235
    return;
×
236
  }
237
  SExternalWindowOperator* pInfo = (SExternalWindowOperator*)param;
188,166✔
238
  cleanupBasicInfo(&pInfo->binfo);
188,166✔
239

240
  taosArrayDestroyEx(pInfo->pOutputBlocks, extWinDestroyBlockList);
188,166✔
241
  taosArrayDestroy(pInfo->pWins);
188,166✔
242
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
188,166✔
243
  taosArrayDestroy(pInfo->pWinRowIdx);
188,166✔
244
  
245
  taosArrayDestroy(pInfo->pPseudoColInfo);
188,166✔
246
  blockDataDestroy(pInfo->pTmpBlock);
188,166✔
247
  blockDataDestroy(pInfo->pEmptyInputBlock);
188,166✔
248

249
  extWinDestroyBlkNode(pInfo, pInfo->pLastBlkNode);
188,166✔
250
  if (pInfo->pFreeBlocks) {
188,166✔
251
    SListNode *node;
252
    while ((node = TD_DLIST_HEAD(pInfo->pFreeBlocks)) != NULL) {
1,249✔
253
      TD_DLIST_POP(pInfo->pFreeBlocks, node);
248✔
254
      extWinDestroyBlkNode(pInfo, node);
248✔
255
    }
256
    taosMemoryFree(pInfo->pFreeBlocks);
1,001✔
257
  }
258
  
259
  cleanupAggSup(&pInfo->aggSup);
188,166✔
260
  cleanupExprSupp(&pInfo->scalarSupp);
188,166✔
261
  taosMemoryFreeClear(pInfo->pResultRow);
188,166✔
262

263
  pInfo->binfo.resultRowInfo.openWindow = tdListFree(pInfo->binfo.resultRowInfo.openWindow);
188,166✔
264

265
  qDebug("ext window stat at destroy, created:%" PRId64 ", destroyed:%" PRId64 ", recycled:%" PRId64 ", reused:%" PRId64 ", append:%" PRId64, 
188,166✔
266
      pInfo->stat.resBlockCreated, pInfo->stat.resBlockDestroyed, pInfo->stat.resBlockRecycled, 
267
      pInfo->stat.resBlockReused, pInfo->stat.resBlockAppend);
268

269
  taosMemoryFreeClear(pInfo);
188,166✔
270
}
271

272
static int32_t extWinOpen(SOperatorInfo* pOperator);
273
static int32_t extWinNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
274

275
typedef struct SMergeAlignedExternalWindowOperator {
276
  SExternalWindowOperator* pExtW;
277
  int64_t curTs;
278
  SResultRow*  pResultRow;
279
} SMergeAlignedExternalWindowOperator;
280

281
static void destroyMergeAlignedExternalWindowOperator(void* pOperator) {
17,477✔
282
  SMergeAlignedExternalWindowOperator* pMlExtInfo = (SMergeAlignedExternalWindowOperator*)pOperator;
17,477✔
283
  destroyExternalWindowOperatorInfo(pMlExtInfo->pExtW);
17,477✔
284
  taosMemoryFreeClear(pMlExtInfo);
17,477✔
285
}
17,477✔
286

287
static int64_t* extWinExtractTsCol(SSDataBlock* pBlock, int32_t primaryTsIndex, SExecTaskInfo* pTaskInfo) {
4,270,323✔
288
  TSKEY* tsCols = NULL;
4,270,323✔
289

290
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
4,270,323✔
291
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, primaryTsIndex);
4,270,096✔
292
    if (!pColDataInfo) {
4,270,096✔
NEW
293
      pTaskInfo->code = terrno;
×
NEW
294
      T_LONG_JMP(pTaskInfo->env, terrno);
×
295
    }
296

297
    tsCols = (int64_t*)pColDataInfo->pData;
4,270,096✔
298
    if (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0) {
4,270,096✔
299
      int32_t code = blockDataUpdateTsWindow(pBlock, primaryTsIndex);
4,265,974✔
300
      if (code != TSDB_CODE_SUCCESS) {
4,270,323✔
301
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
4,349✔
302
        pTaskInfo->code = code;
4,349✔
NEW
303
        T_LONG_JMP(pTaskInfo->env, code);
×
304
      }
305
    }
306
  }
307

308
  return tsCols;
4,270,323✔
309
}
310

311
static int32_t extWinGetCurWinIdx(SExecTaskInfo* pTaskInfo) {
838,301,171✔
312
  if (!pTaskInfo->pStreamRuntimeInfo) {
838,301,171✔
NEW
313
    return 0;
×
314
  }
315
  return pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx;
838,536,041✔
316
}
317

NEW
318
static void extWinIncCurWinIdx(SOperatorInfo* pOperator) {
×
NEW
319
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
×
NEW
320
  pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx++;
×
NEW
321
}
×
322

323
static void extWinSetCurWinIdx(SOperatorInfo* pOperator, int32_t idx) {
568,643,420✔
324
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
568,643,420✔
325
  if (pTaskInfo->pStreamRuntimeInfo) {
568,697,759✔
326
    pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx = idx;
568,671,082✔
327
  }
328
}
568,648,223✔
329

330

331
static void extWinIncCurWinOutIdx(SStreamRuntimeInfo* pStreamRuntimeInfo) {
10,099✔
332
  pStreamRuntimeInfo->funcInfo.curOutIdx++;
10,099✔
333
}
10,099✔
334

335

NEW
336
static const STimeWindow* extWinGetNextWin(SExternalWindowOperator* pExtW, SExecTaskInfo* pTaskInfo) {
×
NEW
337
  int32_t curIdx = extWinGetCurWinIdx(pTaskInfo);
×
NEW
338
  if (curIdx + 1 >= pExtW->pWins->size) return NULL;
×
NEW
339
  return taosArrayGet(pExtW->pWins, curIdx + 1);
×
340
}
341

342

343
static int32_t extWinAppendWinIdx(SExecTaskInfo*       pTaskInfo, SArray* pIdx, SSDataBlock* pBlock, int32_t currWinIdx, int32_t rows) {
269,949,809✔
344
  int32_t  code = 0, lino = 0;
269,949,809✔
345
  int64_t* lastRes = taosArrayGetLast(pIdx);
269,949,809✔
346
  int32_t* lastWinIdx = (int32_t*)lastRes;
269,963,853✔
347
  int32_t* lastRowIdx = lastWinIdx ? (lastWinIdx + 1) : NULL;
269,963,853✔
348
  int64_t  res = 0;
269,971,724✔
349
  int32_t* pWinIdx = (int32_t*)&res;
269,971,710✔
350
  int32_t* pRowIdx = pWinIdx + 1;
269,971,710✔
351

352
  if (lastWinIdx && *lastWinIdx == currWinIdx) {
269,969,306✔
353
    return code;
1,434✔
354
  }
355

356
  *pWinIdx = currWinIdx;
269,967,155✔
357
  *pRowIdx = pBlock->info.rows - rows;
269,949,698✔
358

359
  TSDB_CHECK_NULL(taosArrayPush(pIdx, &res), code, lino, _exit, terrno);
269,933,166✔
360

361
_exit:
269,933,166✔
362

363
  if (code) {
269,933,967✔
NEW
364
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
365
  }
366

367
  return code;
269,934,262✔
368
}
369

370
static int32_t extWinRebuildWinIdxByFilter(SExecTaskInfo* pTaskInfo, SArray* pIdx, int32_t rowsBeforeFilter,
241✔
371
                                           SColumnInfoData* pFilterRes) {
372
  int32_t code = TSDB_CODE_SUCCESS;
241✔
373
  int32_t lino = 0;
241✔
374

375
  if (pIdx == NULL || pFilterRes == NULL || rowsBeforeFilter <= 0 || taosArrayGetSize(pIdx) == 0) {
241✔
NEW
376
    return code;
×
377
  }
378

379
  int32_t idxSize = (int32_t)taosArrayGetSize(pIdx);
241✔
380
  SArray* pNewIdx = taosArrayInit(idxSize, sizeof(int64_t));
241✔
381
  TSDB_CHECK_NULL(pNewIdx, code, lino, _exit, terrno);
241✔
382

383
  int8_t* pIndicator = (int8_t*)pFilterRes->pData;
241✔
384
  int32_t newRowStart = 0;
241✔
385
  for (int32_t i = 0; i < idxSize; ++i) {
482✔
386
    int64_t cur = *(int64_t*)taosArrayGet(pIdx, i);
241✔
387
    int32_t* pCurWinIdx = (int32_t*)&cur;
241✔
388
    int32_t* pCurRowIdx = pCurWinIdx + 1;
241✔
389

390
    int32_t startRow = *pCurRowIdx;
241✔
391
    int32_t endRow = rowsBeforeFilter;
241✔
392
    if (i + 1 < idxSize) {
241✔
NEW
393
      int64_t next = *(int64_t*)taosArrayGet(pIdx, i + 1);
×
NEW
394
      int32_t* pNextWinIdx = (int32_t*)&next;
×
NEW
395
      int32_t* pNextRowIdx = pNextWinIdx + 1;
×
NEW
396
      endRow = *pNextRowIdx;
×
397
    }
398

399
    startRow = TMIN(TMAX(startRow, 0), rowsBeforeFilter);
241✔
400
    endRow = TMIN(TMAX(endRow, 0), rowsBeforeFilter);
241✔
401
    if (endRow <= startRow) {
241✔
NEW
402
      continue;
×
403
    }
404

405
    int32_t survivedRows = 0;
241✔
406
    for (int32_t r = startRow; r < endRow; ++r) {
482✔
407
      if (pIndicator[r]) {
241✔
NEW
408
        survivedRows++;
×
409
      }
410
    }
411

412
    if (survivedRows <= 0) {
241✔
413
      continue;
241✔
414
    }
415

NEW
416
    int64_t out = 0;
×
NEW
417
    int32_t* pOutWinIdx = (int32_t*)&out;
×
NEW
418
    int32_t* pOutRowIdx = pOutWinIdx + 1;
×
NEW
419
    *pOutWinIdx = *pCurWinIdx;
×
NEW
420
    *pOutRowIdx = newRowStart;
×
NEW
421
    TSDB_CHECK_NULL(taosArrayPush(pNewIdx, &out), code, lino, _exit, terrno);
×
422

NEW
423
    newRowStart += survivedRows;
×
424
  }
425

426
  taosArrayClear(pIdx);
241✔
427
  int32_t newSize = (int32_t)taosArrayGetSize(pNewIdx);
241✔
428
  if (newSize > 0) {
241✔
NEW
429
    void* dest = taosArrayReserve(pIdx, newSize);
×
NEW
430
    TSDB_CHECK_NULL(dest, code, lino, _exit, terrno);
×
NEW
431
    memcpy(dest, pNewIdx->pData, (size_t)newSize * sizeof(int64_t));
×
432
  }
433

434
_exit:
241✔
435
  taosArrayDestroy(pNewIdx);
241✔
436
  if (code != TSDB_CODE_SUCCESS) {
241✔
NEW
437
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
438
  }
439
  return code;
241✔
440
}
441

442

443
static int32_t mergeAlignExtWinSetOutputBuf(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, const STimeWindow* pWin, SResultRow** pResult,
3,079,428✔
444
                                       SExprSupp* pExprSup, SAggSupporter* pAggSup) {
445
  if (*pResult == NULL) {
3,079,428✔
446
    *pResult = getNewResultRow(pAggSup->pResultBuf, &pAggSup->currentPageId, pAggSup->resultRowSize);
17,925✔
447
    if (!*pResult) {
17,925✔
NEW
448
      qError("get new resultRow failed, err:%s", tstrerror(terrno));
×
NEW
449
      return terrno;
×
450
    }
451
    pResultRowInfo->cur = (SResultRowPosition){.pageId = (*pResult)->pageId, .offset = (*pResult)->offset};
17,925✔
452
  }
453
  
454
  (*pResult)->win = *pWin;
3,079,428✔
455
  (*pResult)->winIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
3,079,428✔
456
  
457
  return setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
3,078,950✔
458
}
459

460

461
static int32_t mergeAlignExtWinGetWinFromTs(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW, TSKEY ts, STimeWindow** ppWin) {
3,077,755✔
462
  int32_t blkWinIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
3,077,755✔
463
  
464
  // TODO handle desc order
465
  for (int32_t i = blkWinIdx; i < pExtW->pWins->size; ++i) {
6,131,849✔
466
    STimeWindow* pWin = taosArrayGet(pExtW->pWins, i);
6,124,792✔
467
    if (ts == pWin->skey) {
6,123,836✔
468
      extWinSetCurWinIdx(pOperator, i);
3,076,321✔
469
      *ppWin = pWin;
3,076,799✔
470
      return TSDB_CODE_SUCCESS;
3,076,799✔
471
    } else if (ts < pWin->skey) {
3,054,333✔
NEW
472
      qError("invalid ts %" PRId64 " for current window idx %d skey %" PRId64, ts, i, pWin->skey);
×
NEW
473
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
474
    }
475
  }
476
  
NEW
477
  qError("invalid ts %" PRId64 " to find merge aligned ext window, size:%d", ts, (int32_t)pExtW->pWins->size);
×
NEW
478
  return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
479
}
480

481
static int32_t mergeAlignExtWinFinalizeResult(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, SSDataBlock* pResultBlock) {
3,072,736✔
482
  int32_t        code = 0, lino = 0;
3,072,736✔
483
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
3,072,736✔
484
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
3,073,214✔
485
  SExprSupp*     pSup = &pOperator->exprSupp;
3,073,214✔
486
  SResultRow*  pResultRow = pMlExtInfo->pResultRow;
3,073,214✔
487
  
488
  finalizeResultRows(pExtW->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pOperator->pTaskInfo);
3,073,214✔
489
  
490
  if (pResultRow->numOfRows > 0) {
3,070,585✔
491
    TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pExtW->pWinRowIdx, pResultBlock, pResultRow->winIdx, pResultRow->numOfRows));
3,070,346✔
492
  }
493

494
_exit:
3,068,195✔
495

496
  if (code) {
3,068,195✔
NEW
497
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
498
  }
499

500
  return code;
3,067,956✔
501
}
502

503
static int32_t mergeAlignExtWinAggDo(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock, SSDataBlock* pResultBlock) {
25,812✔
504
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
25,812✔
505
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
25,812✔
506

507
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
25,812✔
508
  SExprSupp*     pSup = &pOperator->exprSupp;
25,812✔
509
  int32_t        code = 0, lino = 0;
25,812✔
510
  STimeWindow *pWin = NULL;
25,812✔
511

512
  int32_t startPos = 0;
25,812✔
513
  int64_t* tsCols = extWinExtractTsCol(pBlock, pExtW->primaryTsIndex, pTaskInfo);
25,812✔
514
  TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
25,812✔
515
  
516
  code = mergeAlignExtWinGetWinFromTs(pOperator, pExtW, ts, &pWin);
25,812✔
517
  if (code) {
25,812✔
NEW
518
    qError("failed to get time window for ts:%" PRId64 ", prim ts index:%d, error:%s", ts, pExtW->primaryTsIndex, tstrerror(code));
×
NEW
519
    TAOS_CHECK_EXIT(code);
×
520
  }
521

522
  if (pMlExtInfo->curTs != INT64_MIN && pMlExtInfo->curTs != pWin->skey) {
25,812✔
523
    TAOS_CHECK_EXIT(mergeAlignExtWinFinalizeResult(pOperator, pResultRowInfo, pResultBlock));
2,151✔
524
    resetResultRow(pMlExtInfo->pResultRow, pExtW->aggSup.resultRowSize - sizeof(SResultRow));
2,151✔
525
  }
526
  
527
  TAOS_CHECK_EXIT(mergeAlignExtWinSetOutputBuf(pOperator, pResultRowInfo, pWin, &pMlExtInfo->pResultRow, pSup, &pExtW->aggSup));
25,812✔
528

529
  int32_t currPos = startPos;
25,812✔
530
  pMlExtInfo->curTs = pWin->skey;
25,812✔
531
  
532
  while (++currPos < pBlock->info.rows) {
9,226,024✔
533
    if (tsCols[currPos] == pMlExtInfo->curTs) continue;
9,199,973✔
534

535
    qDebug("current ts:%" PRId64 ", startPos:%d, currPos:%d, tsCols[currPos]:%" PRId64,
3,053,616✔
536
      pMlExtInfo->curTs, startPos, currPos, tsCols[currPos]); 
537
    TAOS_CHECK_EXIT(applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pExtW->twAggSup.timeWindowData, startPos,
3,053,616✔
538
                                           currPos - startPos, pBlock->info.rows, pSup->numOfExprs));
539

540
    TAOS_CHECK_EXIT(mergeAlignExtWinFinalizeResult(pOperator, pResultRowInfo, pResultBlock));
3,052,660✔
541
    resetResultRow(pMlExtInfo->pResultRow, pExtW->aggSup.resultRowSize - sizeof(SResultRow));
3,047,641✔
542

543
    TAOS_CHECK_EXIT(mergeAlignExtWinGetWinFromTs(pOperator, pExtW, tsCols[currPos], &pWin));
3,051,226✔
544
    
545
    qDebug("ext window align2 start:%" PRId64 ", end:%" PRId64, pWin->skey, pWin->ekey);
3,050,987✔
546
    startPos = currPos;
3,054,094✔
547
    
548
    TAOS_CHECK_EXIT(mergeAlignExtWinSetOutputBuf(pOperator, pResultRowInfo, pWin, &pMlExtInfo->pResultRow, pSup, &pExtW->aggSup));
3,054,094✔
549

550
    pMlExtInfo->curTs = pWin->skey;
3,053,616✔
551
  }
552

553
  code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pExtW->twAggSup.timeWindowData, startPos,
51,624✔
554
                                         currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
25,812✔
555

556
_exit:
25,812✔
557

558
  if (code != 0) {
25,812✔
NEW
559
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
NEW
560
    T_LONG_JMP(pTaskInfo->env, code);
×
561
  }
562
  
563
  return code;
25,812✔
564
}
565

NEW
566
static int32_t mergeAlignExtWinBuildWinRowIdx(SOperatorInfo* pOperator, SSDataBlock* pInput, SSDataBlock* pResult) {
×
NEW
567
  SExternalWindowOperator* pExtW = pOperator->info;
×
NEW
568
  int64_t* tsCols = extWinExtractTsCol(pInput, pExtW->primaryTsIndex, pOperator->pTaskInfo);
×
NEW
569
  STimeWindow* pWin = NULL;
×
NEW
570
  int32_t code = 0, lino = 0;
×
NEW
571
  int64_t prevTs = INT64_MIN;
×
572
  
NEW
573
  for (int32_t i = 0; i < pInput->info.rows; ++i) {
×
NEW
574
    if (prevTs == tsCols[i]) {
×
NEW
575
      continue;
×
576
    }
577
    
NEW
578
    TAOS_CHECK_EXIT(mergeAlignExtWinGetWinFromTs(pOperator, pExtW, tsCols[i], &pWin));
×
NEW
579
    TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pExtW->pWinRowIdx, pResult, extWinGetCurWinIdx(pOperator->pTaskInfo), pInput->info.rows - i));
×
580

NEW
581
    prevTs = tsCols[i];
×
582
  }
583

NEW
584
_exit:
×
585

NEW
586
  if (code != 0) {
×
NEW
587
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
588
  }
589

NEW
590
  return code;  
×
591
}
592

NEW
593
static int32_t mergeAlignExtWinProjectDo(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
×
594
                                            SSDataBlock* pResultBlock) {
NEW
595
  SExternalWindowOperator* pExtW = pOperator->info;
×
NEW
596
  SExprSupp*               pExprSup = &pExtW->scalarSupp;
×
NEW
597
  int32_t                  code = 0, lino = 0;
×
598
  
NEW
599
  TAOS_CHECK_EXIT(projectApplyFunctions(pExprSup->pExprInfo, pResultBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
×
600
                        GET_STM_RTINFO(pOperator->pTaskInfo)));
601

NEW
602
  TAOS_CHECK_EXIT(mergeAlignExtWinBuildWinRowIdx(pOperator, pBlock, pResultBlock));
×
603

NEW
604
_exit:
×
605

NEW
606
  if (code != 0) {
×
NEW
607
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
608
  }
609

NEW
610
  return code;
×
611
}
612

613
static void mergeAlignExtWinDo(SOperatorInfo* pOperator) {
19,090✔
614
  SExecTaskInfo*                       pTaskInfo = pOperator->pTaskInfo;
19,090✔
615
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
19,090✔
616
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
19,090✔
617
  SResultRow*                          pResultRow = NULL;
19,090✔
618
  int32_t                              code = 0;
19,090✔
619
  SSDataBlock*                         pRes = pExtW->binfo.pRes;
19,090✔
620
  SExprSupp*                           pSup = &pOperator->exprSupp;
19,090✔
621
  int32_t                              lino = 0;
19,090✔
622

623
  taosArrayClear(pExtW->pWinRowIdx);
19,090✔
624
  blockDataCleanup(pRes);
19,090✔
625

626
  while (1) {
25,095✔
627
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
44,185✔
628

629
    if (pBlock == NULL) {
44,185✔
630
      // close last time window
631
      if (pMlExtInfo->curTs != INT64_MIN && EEXT_MODE_AGG == pExtW->mode) {
18,373✔
632
        TAOS_CHECK_EXIT(mergeAlignExtWinFinalizeResult(pOperator, &pExtW->binfo.resultRowInfo, pRes));
17,925✔
633
      }
634
      setOperatorCompleted(pOperator);
18,373✔
635
      break;
18,373✔
636
    }
637

638
    pRes->info.scanFlag = pBlock->info.scanFlag;
25,812✔
639
    code = setInputDataBlock(pSup, pBlock, pExtW->binfo.inputTsOrder, pBlock->info.scanFlag, true);
25,812✔
640
    QUERY_CHECK_CODE(code, lino, _exit);
25,812✔
641

642
    printDataBlock(pBlock, __func__, "externalwindowAlign", pTaskInfo->id.queryId);
25,812✔
643
    qDebug("ext windowpExtWAlign->scalarMode:%d", pExtW->mode);
25,812✔
644

645
    if (EEXT_MODE_SCALAR == pExtW->mode) {
25,812✔
NEW
646
      TAOS_CHECK_EXIT(mergeAlignExtWinProjectDo(pOperator, &pExtW->binfo.resultRowInfo, pBlock, pRes));
×
647
    } else {
648
      TAOS_CHECK_EXIT(mergeAlignExtWinAggDo(pOperator, &pExtW->binfo.resultRowInfo, pBlock, pRes));
25,812✔
649
    }
650

651
    if (pRes->info.rows >= pOperator->resultInfo.threshold) {
25,812✔
652
      break;
717✔
653
    }
654
  }
655

656
  pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamBlkWinIdx = pExtW->pWinRowIdx;
19,090✔
657
  
658
_exit:
19,090✔
659

660
  if (code != 0) {
19,090✔
NEW
661
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
NEW
662
    pTaskInfo->code = code;
×
NEW
663
    T_LONG_JMP(pTaskInfo->env, code);
×
664
  }
665
}
19,090✔
666

667
static int32_t mergeAlignExtWinNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
37,015✔
668
  SExecTaskInfo*                       pTaskInfo = pOperator->pTaskInfo;
37,015✔
669
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
37,015✔
670
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
37,015✔
671
  int32_t                              code = 0;
37,015✔
672
  int32_t lino = 0;
37,015✔
673

674
  if (pOperator->status == OP_EXEC_DONE) {
37,015✔
675
    (*ppRes) = NULL;
17,925✔
676
    return TSDB_CODE_SUCCESS;
17,925✔
677
  }
678

679
  SSDataBlock* pRes = pExtW->binfo.pRes;
19,090✔
680
  blockDataCleanup(pRes);
19,090✔
681

682
  if (taosArrayGetSize(pExtW->pWins) <= 0) {
19,090✔
683
    size_t size = taosArrayGetSize(pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPesudoFuncVals);
18,373✔
684
    STimeWindow* pWin = taosArrayReserve(pExtW->pWins, size);
18,373✔
685
    TSDB_CHECK_NULL(pWin, code, lino, _exit, terrno);
18,373✔
686

687
    for (int32_t i = 0; i < size; ++i) {
3,092,513✔
688
      SSTriggerCalcParam* pParam = taosArrayGet(pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPesudoFuncVals, i);
3,074,140✔
689
      pWin[i].skey = pParam->wstart;
3,074,140✔
690
      pWin[i].ekey = pParam->wstart + 1;
3,074,140✔
691
    }
692
    
693
    pExtW->outputWinId = pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx;
18,373✔
694
  }
695

696
  mergeAlignExtWinDo(pOperator);
19,090✔
697
  
698
  size_t rows = pRes->info.rows;
19,090✔
699
  (*ppRes) = (rows == 0) ? NULL : pRes;
19,090✔
700

701
_exit:
19,090✔
702

703
  if (code != 0) {
19,090✔
NEW
704
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
NEW
705
    pTaskInfo->code = code;
×
NEW
706
    T_LONG_JMP(pTaskInfo->env, code);
×
707
  }
708
  return code;
19,090✔
709
}
710

711
static int32_t resetMergeAlignedExtWinOperator(SOperatorInfo* pOperator) {
19,269✔
712
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
19,269✔
713
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
19,269✔
714
  SExecTaskInfo*                       pTaskInfo = pOperator->pTaskInfo;
19,269✔
715
  SMergeAlignedIntervalPhysiNode * pPhynode = (SMergeAlignedIntervalPhysiNode*)pOperator->pPhyNode;
19,269✔
716
  pOperator->status = OP_NOT_OPENED;
19,269✔
717

718
  taosArrayClear(pExtW->pWins);
19,269✔
719

720
  resetBasicOperatorState(&pExtW->binfo);
19,269✔
721
  pMlExtInfo->pResultRow = NULL;
19,269✔
722
  pMlExtInfo->curTs = INT64_MIN;
19,269✔
723

724
  int32_t code = resetAggSup(&pOperator->exprSupp, &pExtW->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
38,538✔
725
                             sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, NULL,
19,269✔
726
                             &pTaskInfo->storageAPI.functionStore);
727
  if (code == 0) {
19,269✔
728
    colDataDestroy(&pExtW->twAggSup.timeWindowData);
19,269✔
729
    code = initExecTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pTaskInfo->window);
19,269✔
730
  }
731
  return code;
19,269✔
732
}
733

734
int32_t createStreamMergeAlignedExternalWindowOperator(SOperatorInfo* pDownstream, SPhysiNode* pNode,
17,477✔
735
                                                       SExecTaskInfo* pTaskInfo, SOperatorInfo** ppOptrOut) {
736
  SMergeAlignedIntervalPhysiNode* pPhynode = (SMergeAlignedIntervalPhysiNode*)pNode;
17,477✔
737
  int32_t code = 0;
17,477✔
738
  int32_t lino = 0;
17,477✔
739
  SMergeAlignedExternalWindowOperator* pMlExtInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedExternalWindowOperator));
17,477✔
740
  SOperatorInfo*                       pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
17,477✔
741

742
  if (pTaskInfo->pStreamRuntimeInfo != NULL){
17,477✔
743
    pTaskInfo->pStreamRuntimeInfo->funcInfo.withExternalWindow = true;
17,477✔
744
  }
745
  pOperator->pPhyNode = pNode;
17,477✔
746
  if (!pMlExtInfo || !pOperator) {
17,477✔
NEW
747
    code = terrno;
×
NEW
748
    goto _error;
×
749
  }
750
  initOperatorCostInfo(pOperator);
17,477✔
751

752
  pMlExtInfo->pExtW = taosMemoryCalloc(1, sizeof(SExternalWindowOperator));
17,477✔
753
  if (!pMlExtInfo->pExtW) {
17,477✔
NEW
754
    code = terrno;
×
NEW
755
    goto _error;
×
756
  }
757

758
  SExternalWindowOperator* pExtW = pMlExtInfo->pExtW;
17,477✔
759
  SExprSupp* pSup = &pOperator->exprSupp;
17,477✔
760
  pSup->hasWindowOrGroup = true;
17,477✔
761
  pSup->hasWindow = true;
17,477✔
762
  pMlExtInfo->curTs = INT64_MIN;
17,477✔
763

764
  pExtW->primaryTsIndex = ((SColumnNode*)pPhynode->window.pTspk)->slotId;
17,477✔
765
  pExtW->mode = pPhynode->window.pProjs ? EEXT_MODE_SCALAR : EEXT_MODE_AGG;
17,477✔
766
  pExtW->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC;
17,477✔
767
  pExtW->binfo.outputTsOrder = pExtW->binfo.inputTsOrder;
17,477✔
768

769
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
17,477✔
770
  initResultSizeInfo(&pOperator->resultInfo, 4096);
17,477✔
771

772
  int32_t num = 0;
17,229✔
773
  SExprInfo* pExprInfo = NULL;
17,229✔
774
  code = createExprInfo(pPhynode->window.pFuncs, NULL, &pExprInfo, &num);
17,477✔
775
  QUERY_CHECK_CODE(code, lino, _error);
17,477✔
776

777
  if (pExtW->mode == EEXT_MODE_AGG) {
17,477✔
778
    code = initAggSup(pSup, &pExtW->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, NULL,
17,477✔
779
                      &pTaskInfo->storageAPI.functionStore);
780
    QUERY_CHECK_CODE(code, lino, _error);
17,477✔
781
  }
782

783
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhynode->window.node.pOutputDataBlockDesc);
17,477✔
784
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
17,477✔
785
  initBasicInfo(&pExtW->binfo, pResBlock);
17,477✔
786

787
  pExtW->pWins = taosArrayInit(4096, sizeof(STimeWindow));
17,477✔
788
  if (!pExtW->pWins) QUERY_CHECK_CODE(terrno, lino, _error);
17,477✔
789

790
  pExtW->pWinRowIdx = taosArrayInit(4096, sizeof(int64_t));
17,477✔
791
  TSDB_CHECK_NULL(pExtW->pWinRowIdx, code, lino, _error, terrno);
17,477✔
792

793
  initResultRowInfo(&pExtW->binfo.resultRowInfo);
17,477✔
794
  code = blockDataEnsureCapacity(pExtW->binfo.pRes, pOperator->resultInfo.capacity);
17,477✔
795
  QUERY_CHECK_CODE(code, lino, _error);
17,477✔
796
  setOperatorInfo(pOperator, "MergeAlignedExternalWindowOperator", QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW, false, OP_NOT_OPENED, pMlExtInfo, pTaskInfo);
17,477✔
797
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignExtWinNext, NULL,
17,477✔
798
                                         destroyMergeAlignedExternalWindowOperator, optrDefaultBufFn, NULL,
799
                                         optrDefaultGetNextExtFn, NULL);
800
  setOperatorResetStateFn(pOperator, resetMergeAlignedExtWinOperator);
17,477✔
801

802
  code = appendDownstream(pOperator, &pDownstream, 1);
17,477✔
803
  QUERY_CHECK_CODE(code, lino, _error);
17,477✔
804
  *ppOptrOut = pOperator;
17,477✔
805
  return code;
17,477✔
806
  
NEW
807
_error:
×
NEW
808
  if (pMlExtInfo) destroyMergeAlignedExternalWindowOperator(pMlExtInfo);
×
NEW
809
  destroyOperatorAndDownstreams(pOperator, &pDownstream, 1);
×
NEW
810
  pTaskInfo->code = code;
×
NEW
811
  return code;
×
812
}
813

814
static int32_t resetExternalWindowExprSupp(SExternalWindowOperator* pExtW, SExecTaskInfo* pTaskInfo,
767,650✔
815
                                           SExternalWindowPhysiNode* pPhynode) {
816
  int32_t    code = 0, lino = 0, num = 0;
767,650✔
817
  SExprInfo* pExprInfo = NULL;
767,650✔
818
  cleanupExprSuppWithoutFilter(&pExtW->scalarSupp);
767,524✔
819

820
  SNodeList* pNodeList = NULL;
767,740✔
821
  if (pPhynode->window.pProjs) {
767,740✔
822
    pNodeList = pPhynode->window.pProjs;
8,850✔
823
  } else {
824
    pNodeList = pPhynode->window.pExprs;
758,890✔
825
  }
826

827
  code = createExprInfo(pNodeList, NULL, &pExprInfo, &num);
767,740✔
828
  QUERY_CHECK_CODE(code, lino, _error);
767,740✔
829
  code = initExprSupp(&pExtW->scalarSupp, pExprInfo, num, &pTaskInfo->storageAPI.functionStore);
767,740✔
830
  QUERY_CHECK_CODE(code, lino, _error);
767,740✔
831
  return code;
767,740✔
NEW
832
_error:
×
NEW
833
  if (code != TSDB_CODE_SUCCESS) {
×
NEW
834
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
NEW
835
    pTaskInfo->code = code;
×
836
  }
NEW
837
  return code;
×
838
}
839

840
static int32_t resetExternalWindowOperator(SOperatorInfo* pOperator) {
767,740✔
841
  int32_t code = 0, lino = 0;
767,740✔
842
  SExternalWindowOperator* pExtW = pOperator->info;
767,740✔
843
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
767,740✔
844
  SExternalWindowPhysiNode* pPhynode = (SExternalWindowPhysiNode*)pOperator->pPhyNode;
767,740✔
845
  pOperator->status = OP_NOT_OPENED;
767,614✔
846

847
  //resetBasicOperatorState(&pExtW->binfo);
848
  initResultRowInfo(&pExtW->binfo.resultRowInfo);
767,740✔
849

850
  pExtW->outputWinId = 0;
767,614✔
851
  pExtW->lastWinId = -1;
767,614✔
852
  pExtW->outputWinNum = 0;
767,614✔
853
  taosArrayClear(pExtW->pWins);
767,614✔
854
  extWinRecycleBlkNode(pExtW, &pExtW->pLastBlkNode);
767,524✔
855

856
/*
857
  int32_t code = blockDataEnsureCapacity(pExtW->binfo.pRes, pOperator->resultInfo.capacity);
858
  if (code == 0) {
859
    code = resetAggSup(&pOperator->exprSupp, &pExtW->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
860
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
861
                       &pTaskInfo->storageAPI.functionStore);
862
  }
863
*/
864
  TAOS_CHECK_EXIT(resetExternalWindowExprSupp(pExtW, pTaskInfo, pPhynode));
767,650✔
865
  colDataDestroy(&pExtW->twAggSup.timeWindowData);
767,614✔
866
  TAOS_CHECK_EXIT(initExecTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pTaskInfo->window));
767,740✔
867

868
  pExtW->outWinIdx = 0;
767,710✔
869
  pExtW->lastSKey = INT64_MIN;
767,710✔
870
  pExtW->lastEKey = INT64_MIN;
767,584✔
871
  pExtW->isDynWindow = false;
767,584✔
872

873
  qDebug("%s ext window stat at reset, created:%" PRId64 ", destroyed:%" PRId64 ", recycled:%" PRId64 ", reused:%" PRId64 ", append:%" PRId64, 
767,584✔
874
      pTaskInfo->id.str, pExtW->stat.resBlockCreated, pExtW->stat.resBlockDestroyed, pExtW->stat.resBlockRecycled, 
875
      pExtW->stat.resBlockReused, pExtW->stat.resBlockAppend);
876

877
_exit:
3,840✔
878

879
  if (code) {
767,740✔
NEW
880
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
881
  }
882
  
883
  return code;
767,740✔
884
}
885

886
static EDealRes extWinHasCountLikeFunc(SNode* pNode, void* res) {
1,446,647✔
887
  if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
1,446,647✔
888
    SFunctionNode* pFunc = (SFunctionNode*)pNode;
493,644✔
889
    if (fmIsCountLikeFunc(pFunc->funcId) || (pFunc->hasOriginalFunc && fmIsCountLikeFunc(pFunc->originalFuncId))) {
493,644✔
890
      *(bool*)res = true;
111,234✔
891
      return DEAL_RES_END;
111,457✔
892
    }
893
  }
894
  return DEAL_RES_CONTINUE;
1,335,867✔
895
}
896

897

898
static int32_t extWinCreateEmptyInputBlock(SOperatorInfo* pOperator, SSDataBlock** ppBlock) {
111,457✔
899
  int32_t code = TSDB_CODE_SUCCESS;
111,457✔
900
  int32_t lino = 0;
111,457✔
901
  SSDataBlock* pBlock = NULL;
111,457✔
902
  if (!tsCountAlwaysReturnValue) {
111,457✔
NEW
903
    return TSDB_CODE_SUCCESS;
×
904
  }
905

906
  SExternalWindowOperator* pExtW = pOperator->info;
111,457✔
907

908
  if (!pExtW->hasCountFunc) {
111,457✔
NEW
909
    return TSDB_CODE_SUCCESS;
×
910
  }
911

912
  code = createDataBlock(&pBlock);
111,457✔
913
  if (code) {
111,457✔
NEW
914
    return code;
×
915
  }
916

917
  pBlock->info.rows = 1;
111,457✔
918
  pBlock->info.capacity = 0;
111,457✔
919

920
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
659,430✔
921
    SColumnInfoData colInfo = {0};
547,746✔
922
    colInfo.hasNull = true;
547,746✔
923
    colInfo.info.type = TSDB_DATA_TYPE_NULL;
547,746✔
924
    colInfo.info.bytes = 1;
547,746✔
925

926
    SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
547,746✔
927
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
1,126,307✔
928
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
578,334✔
929
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
578,334✔
930
        int32_t slotId = pFuncParam->pCol->slotId;
346,945✔
931
        int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
346,945✔
932
        if (slotId >= numOfCols) {
346,945✔
933
          code = taosArrayEnsureCap(pBlock->pDataBlock, slotId + 1);
242,868✔
934
          QUERY_CHECK_CODE(code, lino, _end);
242,868✔
935

936
          for (int32_t k = numOfCols; k < slotId + 1; ++k) {
494,886✔
937
            void* tmp = taosArrayPush(pBlock->pDataBlock, &colInfo);
252,144✔
938
            QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
252,018✔
939
          }
940
        }
941
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
231,616✔
942
        // do nothing
943
      }
944
    }
945
  }
946

947
  code = blockDataEnsureCapacity(pBlock, pBlock->info.rows);
111,457✔
948
  QUERY_CHECK_CODE(code, lino, _end);
111,457✔
949

950
  for (int32_t i = 0; i < blockDataGetNumOfCols(pBlock); ++i) {
363,601✔
951
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
252,144✔
952
    QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
252,144✔
953
    colDataSetNULL(pColInfoData, 0);
954
  }
955
  *ppBlock = pBlock;
111,457✔
956

957
_end:
111,457✔
958
  if (code != TSDB_CODE_SUCCESS) {
111,457✔
NEW
959
    blockDataDestroy(pBlock);
×
NEW
960
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
961
  }
962
  return code;
111,457✔
963
}
964

965

966

967
static int extWinTsWinCompare(const void* pLeft, const void* pRight) {
9,258,241✔
968
  int64_t ts = *(int64_t*)pLeft;
9,258,241✔
969
  SExtWinTimeWindow* pWin = (SExtWinTimeWindow*)pRight;
9,258,241✔
970
  if (ts < pWin->tw.skey) {
9,258,241✔
971
    return -1;
4,865,161✔
972
  }
973
  if (ts >= pWin->tw.ekey) {
4,393,080✔
974
    return 1;
455,103✔
975
  }
976

977
  return 0;
3,937,977✔
978
}
979

980

981
static int32_t extWinGetMultiTbWinFromTs(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW, int64_t* tsCol, int64_t rowNum, int32_t* startPos) {
2,855,196✔
982
  int32_t idx = taosArraySearchIdx(pExtW->pWins, tsCol, extWinTsWinCompare, TD_EQ);
2,855,196✔
983
  if (idx >= 0) {
2,855,251✔
984
    *startPos = 0;
2,855,251✔
985
    return idx;
2,855,251✔
986
  }
987

NEW
988
  SExtWinTimeWindow* pWin = NULL;
×
NEW
989
  int32_t w = 0;
×
NEW
990
  for (int64_t i = 1; i < rowNum; ++i) {
×
NEW
991
    for (; w < pExtW->pWins->size; ++w) {
×
NEW
992
      pWin = TARRAY_GET_ELEM(pExtW->pWins, w);
×
NEW
993
      if (tsCol[i] < pWin->tw.skey) {
×
NEW
994
        break;
×
995
      }
996
      
NEW
997
      if (tsCol[i] < pWin->tw.ekey) {
×
NEW
998
        *startPos = i;
×
NEW
999
        return w;
×
1000
      }
1001
    }
1002
  }
1003

NEW
1004
  return -1;
×
1005
}
1006

1007
static int32_t extWinGetNoOvlpWin(SOperatorInfo* pOperator, int64_t* tsCol, int32_t* startPos, SDataBlockInfo* pInfo, SExtWinTimeWindow** ppWin, int32_t* winRows) {
517,545✔
1008
  SExternalWindowOperator* pExtW = pOperator->info;
517,545✔
1009
  if ((*startPos) >= pInfo->rows) {
517,545✔
1010
    qDebug("%s %s blk rowIdx %d reach the end, size: %d, skip block", 
208,927✔
1011
        GET_TASKID(pOperator->pTaskInfo), __func__, *startPos, (int32_t)pInfo->rows);
1012
    *ppWin = NULL;
208,927✔
1013
    return TSDB_CODE_SUCCESS;
208,927✔
1014
  }
1015
  
1016
  if (pExtW->blkWinIdx < 0) {
308,379✔
1017
    pExtW->blkWinIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
209,700✔
1018
  } else {
1019
    pExtW->blkWinIdx++;
98,679✔
1020
  }
1021

1022
  if (pExtW->blkWinIdx >= pExtW->pWins->size) {
308,379✔
NEW
1023
    qDebug("%s %s ext win blk idx %d reach the end, size: %d, skip block", 
×
1024
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (int32_t)pExtW->pWins->size);
NEW
1025
    *ppWin = NULL;
×
NEW
1026
    return TSDB_CODE_SUCCESS;
×
1027
  }
1028
  
1029
  SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
308,618✔
1030
  if (tsCol[pInfo->rows - 1] < pWin->tw.skey) {
308,618✔
NEW
1031
    qDebug("%s %s block end ts %" PRId64 " is small than curr win %d skey %" PRId64 ", skip block", 
×
1032
        GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[pInfo->rows - 1], pExtW->blkWinIdx, pWin->tw.skey);
NEW
1033
    *ppWin = NULL;
×
NEW
1034
    return TSDB_CODE_SUCCESS;
×
1035
  }
1036

1037
  int32_t r = *startPos;
308,618✔
1038

1039
  qDebug("%s %s start to get novlp win from winIdx %d rowIdx %d", GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, r);
308,618✔
1040

1041
  // TODO handle desc order
1042
  for (; pExtW->blkWinIdx < pExtW->pWins->size; ++pExtW->blkWinIdx) {
313,705✔
1043
    pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
313,705✔
1044
    for (; r < pInfo->rows; ++r) {
359,991✔
1045
      if (tsCol[r] < pWin->tw.skey) {
359,457✔
1046
        continue;
46,525✔
1047
      }
1048

1049
      if (tsCol[r] < pWin->tw.ekey) {
312,932✔
1050
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
307,845✔
1051
        *ppWin = pWin;
307,845✔
1052
        *startPos = r;
307,845✔
1053
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
307,845✔
1054

1055
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
307,845✔
1056
            GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, r, tsCol[r], tsCol[r + *winRows - 1]);
1057
        
1058
        return TSDB_CODE_SUCCESS;
307,845✔
1059
      }
1060

1061
      if (!pOperator->pTaskInfo->pStreamRuntimeInfo && tsCol[r] >= pWin->tw.ekey) {
5,087✔
NEW
1062
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
×
NEW
1063
        *ppWin = pWin;
×
NEW
1064
        *startPos = r;
×
NEW
1065
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
×
1066

NEW
1067
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk",
×
1068
               GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, r, tsCol[r], tsCol[r + *winRows - 1]);
1069

NEW
1070
        return TSDB_CODE_SUCCESS;
×
1071
      }
1072

1073
      break;
5,087✔
1074
    }
1075

1076
    if (r == pInfo->rows) {
5,860✔
1077
      break;
773✔
1078
    }
1079
  }
1080

1081
  qDebug("%s %s no more ext win in block, TR[%" PRId64 ", %" PRId64 "), skip it", 
773✔
1082
      GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1083

1084
  *ppWin = NULL;
773✔
1085
  return TSDB_CODE_SUCCESS;
773✔
1086
}
1087

1088
static int32_t extWinGetOvlpWin(SOperatorInfo* pOperator, int64_t* tsCol, int32_t* startPos, SDataBlockInfo* pInfo, SExtWinTimeWindow** ppWin, int32_t* winRows) {
262,127✔
1089
  SExternalWindowOperator* pExtW = pOperator->info;
262,127✔
1090
  if (pExtW->blkWinIdx < 0) {
262,127✔
1091
    pExtW->blkWinIdx = pExtW->blkWinStartIdx;
27,187✔
1092
  } else {
1093
    pExtW->blkWinIdx++;
234,940✔
1094
  }
1095

1096
  if (pExtW->blkWinIdx >= pExtW->pWins->size) {
262,127✔
1097
    qDebug("%s %s ext win blk idx %d reach the end, size: %d, skip block", 
26,414✔
1098
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (int32_t)pExtW->pWins->size);
1099
    *ppWin = NULL;
26,414✔
1100
    return TSDB_CODE_SUCCESS;
26,414✔
1101
  }
1102
  
1103
  SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
235,713✔
1104
  if (tsCol[pInfo->rows - 1] < pWin->tw.skey) {
235,713✔
NEW
1105
    qDebug("%s %s block end ts %" PRId64 " is small than curr win %d skey %" PRId64 ", skip block", 
×
1106
        GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[pInfo->rows - 1], pExtW->blkWinIdx, pWin->tw.skey);
NEW
1107
    *ppWin = NULL;
×
NEW
1108
    return TSDB_CODE_SUCCESS;
×
1109
  }
1110

1111
  int64_t r = 0;
235,713✔
1112

1113
  qDebug("%s %s start to get ovlp win from winIdx %d rowIdx %d", GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pExtW->blkRowStartIdx);
235,713✔
1114
  
1115
  // TODO handle desc order
1116
  for (; pExtW->blkWinIdx < pExtW->pWins->size; ++pExtW->blkWinIdx) {
236,486✔
1117
    pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
236,486✔
1118
    for (r = pExtW->blkRowStartIdx; r < pInfo->rows; ++r) {
469,539✔
1119
      if (tsCol[r] < pWin->tw.skey) {
468,766✔
1120
        pExtW->blkRowStartIdx = r + 1;
233,053✔
1121
        continue;
233,053✔
1122
      }
1123

1124
      if (tsCol[r] < pWin->tw.ekey) {
235,713✔
1125
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
234,940✔
1126
        *ppWin = pWin;
234,940✔
1127
        *startPos = r;
234,940✔
1128
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
234,940✔
1129

1130
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
234,940✔
1131
            GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, (int32_t)r, tsCol[r], tsCol[r + *winRows - 1]);
1132
        
1133
        if ((r + *winRows) < pInfo->rows) {
234,940✔
1134
          pExtW->blkWinStartIdx = pExtW->blkWinIdx + 1;
208,526✔
1135
          pExtW->blkWinStartSet = true;
208,526✔
1136
        }
1137
        
1138
        return TSDB_CODE_SUCCESS;
234,940✔
1139
      }
1140

1141
      break;
773✔
1142
    }
1143

1144
    if (r >= pInfo->rows) {
1,546✔
1145
      if (!pExtW->blkWinStartSet) {
773✔
1146
        pExtW->blkWinStartIdx = pExtW->blkWinIdx;
773✔
1147
      }
1148
      
1149
      break;
773✔
1150
    }
1151
  }
1152

1153
  qDebug("%s %s no more ext win in block, TR[%" PRId64 ", %" PRId64 "), skip it", 
773✔
1154
      GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1155

1156
  *ppWin = NULL;
773✔
1157
  return TSDB_CODE_SUCCESS;
773✔
1158
}
1159

1160

1161
static int32_t extWinGetMultiTbNoOvlpWin(SOperatorInfo* pOperator, int64_t* tsCol, int32_t* startPos, SDataBlockInfo* pInfo, SExtWinTimeWindow** ppWin, int32_t* winRows) {
26,583,508✔
1162
  SExternalWindowOperator* pExtW = pOperator->info;
26,583,508✔
1163
  if ((*startPos) >= pInfo->rows) {
26,584,225✔
1164
    qDebug("%s %s blk rowIdx %d reach the end, size: %d, skip block", 
2,855,251✔
1165
        GET_TASKID(pOperator->pTaskInfo), __func__, *startPos, (int32_t)pInfo->rows);
1166
    *ppWin = NULL;
2,855,435✔
1167
    return TSDB_CODE_SUCCESS;
2,855,435✔
1168
  }
1169
  
1170
  if (pExtW->blkWinIdx < 0) {
23,729,953✔
1171
    pExtW->blkWinIdx = extWinGetMultiTbWinFromTs(pOperator, pExtW, tsCol, pInfo->rows, startPos);
2,855,435✔
1172
    if (pExtW->blkWinIdx < 0) {
2,855,251✔
NEW
1173
      qDebug("%s %s blk TR[%" PRId64 ", %" PRId64 ") not in any win, skip block", 
×
1174
          GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
NEW
1175
      *ppWin = NULL;
×
NEW
1176
      return TSDB_CODE_SUCCESS;
×
1177
    }
1178

1179
    extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
2,855,251✔
1180
    *ppWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
2,855,028✔
1181
    *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, *startPos, (*ppWin)->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
2,855,028✔
1182

1183
    qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
2,855,212✔
1184
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (*ppWin)->tw.skey, (*ppWin)->tw.ekey, *winRows, *startPos, tsCol[*startPos], tsCol[*startPos + *winRows - 1]);
1185
    
1186
    return TSDB_CODE_SUCCESS;
2,855,435✔
1187
  } else {
1188
    pExtW->blkWinIdx++;
20,860,447✔
1189
  }
1190

1191
  if (pExtW->blkWinIdx >= pExtW->pWins->size) {
20,875,982✔
NEW
1192
    qDebug("%s %s ext win blk idx %d reach the end, size: %d, skip block", 
×
1193
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (int32_t)pExtW->pWins->size);
NEW
1194
    *ppWin = NULL;
×
NEW
1195
    return TSDB_CODE_SUCCESS;
×
1196
  }
1197
  
1198
  SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
20,876,460✔
1199
  if (tsCol[pInfo->rows - 1] < pWin->tw.skey) {
20,876,221✔
NEW
1200
    qDebug("%s %s block end ts %" PRId64 " is small than curr win %d skey %" PRId64 ", skip block", 
×
1201
        GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[pInfo->rows - 1], pExtW->blkWinIdx, pWin->tw.skey);
NEW
1202
    *ppWin = NULL;
×
NEW
1203
    return TSDB_CODE_SUCCESS;
×
1204
  }
1205

1206
  int32_t r = *startPos;
20,877,177✔
1207

1208
  qDebug("%s %s start to get mnovlp win from winIdx %d rowIdx %d", GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, r);
20,876,699✔
1209

1210
  // TODO handle desc order
1211
  for (; pExtW->blkWinIdx < pExtW->pWins->size; ++pExtW->blkWinIdx) {
20,893,054✔
1212
    pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
20,892,815✔
1213
    for (; r < pInfo->rows; ++r) {
20,896,858✔
1214
      if (tsCol[r] < pWin->tw.skey) {
20,896,858✔
1215
        continue;
4,521✔
1216
      }
1217

1218
      if (tsCol[r] < pWin->tw.ekey) {
20,892,815✔
1219
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
20,879,567✔
1220
        *ppWin = pWin;
20,878,611✔
1221
        *startPos = r;
20,878,611✔
1222
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
20,878,611✔
1223

1224
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
20,874,548✔
1225
            GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, r, tsCol[r], tsCol[r + *winRows - 1]);
1226
        
1227
        return TSDB_CODE_SUCCESS;
20,876,221✔
1228
      }
1229

1230
      if (!pOperator->pTaskInfo->pStreamRuntimeInfo && tsCol[r] >= pWin->tw.ekey) {
13,248✔
NEW
1231
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
×
NEW
1232
        *ppWin = pWin;
×
NEW
1233
        *startPos = r;
×
NEW
1234
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
×
1235

NEW
1236
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk",
×
1237
               GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, r, tsCol[r], tsCol[r + *winRows - 1]);
1238

NEW
1239
        return TSDB_CODE_SUCCESS;
×
1240
      }
1241

1242
      break;
13,248✔
1243
    }
1244

1245
    if (r == pInfo->rows) {
13,064✔
NEW
1246
      break;
×
1247
    }
1248
  }
1249

1250
  qDebug("%s %s no more ext win in block, TR[%" PRId64 ", %" PRId64 "), skip it", 
239✔
1251
      GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1252

1253
  *ppWin = NULL;
239✔
NEW
1254
  return TSDB_CODE_SUCCESS;
×
1255
}
1256

1257
static int32_t extWinGetFirstWinFromTs(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW, int64_t* tsCol,
1,152,189✔
1258
                                       int64_t rowNum, int32_t* startPos) {
1259
  SExtWinTimeWindow* pWin = NULL;
1,152,189✔
1260
  int32_t            idx = taosArraySearchIdx(pExtW->pWins, tsCol, extWinTsWinCompare, TD_EQ);
1,152,189✔
1261
  if (idx >= 0) {
1,152,189✔
1262
    for (int i = idx - 1; i >= 0; --i) {
1,082,542✔
1263
      pWin = TARRAY_GET_ELEM(pExtW->pWins, i);
51,106✔
1264
      if (extWinTsWinCompare(tsCol, pWin) == 0) {
51,106✔
1265
        idx = i;
253✔
1266
      } else {
1267
        break;
50,853✔
1268
      }
1269
    }
1270
    *startPos = 0;
1,082,289✔
1271
    return idx;
1,082,289✔
1272
  }
1273

1274
  pWin = NULL;
69,900✔
1275
  int32_t w = 0;
69,900✔
1276
  for (int64_t i = 1; i < rowNum; ++i) {
140,276✔
1277
    for (; w < pExtW->pWins->size; ++w) {
164,076✔
1278
      pWin = TARRAY_GET_ELEM(pExtW->pWins, w);
164,076✔
1279
      if (tsCol[i] < pWin->tw.skey) {
164,076✔
1280
        break;
70,376✔
1281
      }
1282

1283
      if (tsCol[i] < pWin->tw.ekey) {
93,700✔
1284
        *startPos = i;
23,800✔
1285
        return w;
23,800✔
1286
      }
1287
    }
1288
  }
1289

1290
  return -1;
46,100✔
1291
}
1292

1293
static int32_t extWinGetMultiTbOvlpWin(SOperatorInfo* pOperator, int64_t* tsCol, int32_t* startPos, SDataBlockInfo* pInfo, SExtWinTimeWindow** ppWin, int32_t* winRows) {
259,165,619✔
1294
  SExternalWindowOperator* pExtW = pOperator->info;
259,165,619✔
1295
  if (pExtW->blkWinIdx < 0) {
259,284,768✔
1296
    pExtW->blkWinIdx = extWinGetFirstWinFromTs(pOperator, pExtW, tsCol, pInfo->rows, startPos);
1,152,189✔
1297
    if (pExtW->blkWinIdx < 0) {
1,152,189✔
1298
      qDebug("%s %s blk TR[%" PRId64 ", %" PRId64 ") not in any win, skip block", 
46,100✔
1299
          GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1300
      *ppWin = NULL;
46,100✔
1301
      return TSDB_CODE_SUCCESS;
46,100✔
1302
    }
1303

1304
    extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
1,106,089✔
1305
    *ppWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
1,106,089✔
1306
    *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, *startPos, (*ppWin)->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
1,106,089✔
1307
    
1308
    qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
1,106,089✔
1309
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (*ppWin)->tw.skey, (*ppWin)->tw.ekey, *winRows, *startPos, tsCol[*startPos], tsCol[*startPos + *winRows - 1]);
1310
    
1311
    return TSDB_CODE_SUCCESS;
1,106,089✔
1312
  } else {
1313
    pExtW->blkWinIdx++;
257,999,992✔
1314
  }
1315

1316
  if (pExtW->blkWinIdx >= pExtW->pWins->size) {
258,065,493✔
1317
    qDebug("%s %s ext win blk idx %d reach the end, size: %d, skip block", 
1,055,489✔
1318
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (int32_t)pExtW->pWins->size);
1319
    *ppWin = NULL;
1,055,489✔
1320
    return TSDB_CODE_SUCCESS;
1,055,489✔
1321
  }
1322
  
1323
  SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
257,191,658✔
1324
  if (tsCol[pInfo->rows - 1] < pWin->tw.skey) {
257,163,069✔
1325
    qDebug("%s %s block end ts %" PRId64 " is small than curr win %d skey %" PRId64 ", skip block", 
50,600✔
1326
        GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[pInfo->rows - 1], pExtW->blkWinIdx, pWin->tw.skey);
1327
    *ppWin = NULL;
50,600✔
1328
    return TSDB_CODE_SUCCESS;
50,600✔
1329
  }
1330

1331
  int64_t r = 0;
257,081,856✔
1332

1333
  qDebug("%s %s start to get movlp win from winIdx %d rowIdx %d", GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pExtW->blkRowStartIdx);
257,081,856✔
1334

1335
  // TODO handle desc order
1336
  for (; pExtW->blkWinIdx < pExtW->pWins->size; ++pExtW->blkWinIdx) {
257,233,287✔
1337
    pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
257,243,660✔
1338
    for (r = pExtW->blkRowStartIdx; r < pInfo->rows; ++r) {
520,238,940✔
1339
      if (tsCol[r] < pWin->tw.skey) {
520,222,256✔
1340
        pExtW->blkRowStartIdx = r + 1;
263,001,605✔
1341
        continue;
263,000,846✔
1342
      }
1343

1344
      if (tsCol[r] < pWin->tw.ekey) {
257,232,022✔
1345
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
257,171,418✔
1346
        *ppWin = pWin;
257,165,107✔
1347
        *startPos = r;
257,156,505✔
1348
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
257,168,902✔
1349

1350
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
257,142,604✔
1351
            GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, (int32_t)r, tsCol[r], tsCol[r + *winRows - 1]);
1352
        
1353
        return TSDB_CODE_SUCCESS;
257,122,842✔
1354
      }
1355

1356
      break;
68,700✔
1357
    }
1358

1359
    if (r >= pInfo->rows) {
67,555✔
NEW
1360
      break;
×
1361
    }
1362
  }
1363

NEW
1364
  qDebug("%s %s no more ext win in block, TR[%" PRId64 ", %" PRId64 "), skip it", 
×
1365
      GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1366

NEW
1367
  *ppWin = NULL;
×
NEW
1368
  return TSDB_CODE_SUCCESS;
×
1369
}
1370

1371

NEW
1372
static int32_t extWinGetWinStartPos(STimeWindow win, const SDataBlockInfo* pBlockInfo, int32_t lastEndPos, int32_t order, int32_t* nextPos, int64_t* tsCol) {
×
NEW
1373
  bool ascQuery = order == TSDB_ORDER_ASC;
×
1374

NEW
1375
  if (win.ekey <= pBlockInfo->window.skey && ascQuery) {
×
NEW
1376
    return -2;
×
1377
  }
1378
//if (win.skey > pBlockInfo->window.ekey && !ascQuery) return -2;
1379

NEW
1380
  if (win.skey > pBlockInfo->window.ekey && ascQuery) return -1;
×
1381
//if (win.ekey < pBlockInfo->window.skey && !ascQuery) return -1;
1382

1383
  while (true) {
NEW
1384
    if (win.ekey <= tsCol[lastEndPos + 1] && ascQuery) return -2;
×
NEW
1385
    if (win.skey <= tsCol[lastEndPos + 1] && ascQuery) break;
×
NEW
1386
    lastEndPos++;
×
1387
  }
1388

NEW
1389
  *nextPos = lastEndPos + 1;
×
NEW
1390
  return 0;
×
1391
}
1392

1393
static int32_t extWinAggSetWinOutputBuf(SOperatorInfo* pOperator, SExtWinTimeWindow* win, SExprSupp* pSupp, 
281,716,273✔
1394
                                     SAggSupporter* pAggSup, SExecTaskInfo* pTaskInfo) {
1395
  int32_t code = 0, lino = 0;
281,716,273✔
1396
  SResultRow* pResultRow = NULL;
281,716,273✔
1397
  SExternalWindowOperator* pExtW = (SExternalWindowOperator*)pOperator->info;
281,716,273✔
1398
  
1399
#if 0
1400
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
1401
                                                  true, tableGroupId, pTaskInfo, true, pAggSup, true);
1402
  if (pResultRow == NULL) {
1403
    qError("failed to set result output buffer, error:%s", tstrerror(pTaskInfo->code));
1404
    return pTaskInfo->code;
1405
  }
1406

1407
  qDebug("current result rows num:%d", tSimpleHashGetSize(pAggSup->pResultRowHashTable));
1408

1409
#else
1410
  if (win->resWinIdx >= 0) {
281,732,536✔
1411
    pResultRow = (SResultRow*)((char*)pExtW->pResultRow + win->resWinIdx * pAggSup->resultRowSize);
14,797,441✔
1412
  } else {
1413
    win->resWinIdx = pExtW->outWinIdx++;
266,921,194✔
1414
    
1415
    qDebug("set window [%" PRId64 ", %" PRId64 "] outIdx:%d", win->tw.skey, win->tw.ekey, win->resWinIdx);
266,936,388✔
1416

1417
    pResultRow = (SResultRow*)((char*)pExtW->pResultRow + win->resWinIdx * pAggSup->resultRowSize);
266,908,249✔
1418
    
1419
    memset(pResultRow, 0, pAggSup->resultRowSize);
266,937,091✔
1420

1421
    pResultRow->winIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
266,929,754✔
1422
    TAOS_SET_POBJ_ALIGNED(&pResultRow->win, &win->tw);
266,925,593✔
1423
  }
1424
#endif
1425

1426
  // set time window for current result
1427
  TAOS_CHECK_EXIT(setResultRowInitCtx(pResultRow, pSupp->pCtx, pSupp->numOfExprs, pSupp->rowEntryInfoOffset));
281,730,287✔
1428

1429
_exit:
281,732,072✔
1430
  
1431
  if (code) {
281,732,072✔
NEW
1432
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1433
  }
1434

1435
  return code;
281,676,426✔
1436
}
1437

1438
static int32_t extWinAggDo(SOperatorInfo* pOperator, int32_t startPos, int32_t forwardRows,
283,160,434✔
1439
                                  SSDataBlock* pInputBlock) {
1440
  if (pOperator->pTaskInfo->pStreamRuntimeInfo && forwardRows == 0) {
283,160,434✔
NEW
1441
    return TSDB_CODE_SUCCESS;
×
1442
  }
1443

1444
  SExprSupp*               pSup = &pOperator->exprSupp;
283,177,273✔
1445
  SExternalWindowOperator* pExtW = pOperator->info;
283,146,168✔
1446
  return applyAggFunctionOnPartialTuples(pOperator->pTaskInfo, pSup->pCtx, &pExtW->twAggSup.timeWindowData, startPos,
566,069,550✔
1447
                                         forwardRows, pInputBlock->info.rows, pSup->numOfExprs);
283,182,811✔
1448

1449
}
1450

1451
static bool extWinLastWinClosed(SExternalWindowOperator* pExtW) {
10,099✔
1452
  if (pExtW->outWinIdx <= 0 || (pExtW->multiTableMode && !pExtW->inputHasOrder)) {
10,099✔
1453
    return false;
10,099✔
1454
  }
1455

NEW
1456
  if (NULL == pExtW->timeRangeExpr || !pExtW->timeRangeExpr->needCalc) {
×
NEW
1457
    return true;
×
1458
  }
1459

NEW
1460
  SList* pList = taosArrayGetP(pExtW->pOutputBlocks, pExtW->outWinIdx - 1);
×
NEW
1461
  if (0 == listNEles(pList)) {
×
NEW
1462
    return true;
×
1463
  }
1464

NEW
1465
  SListNode* pNode = listTail(pList);
×
NEW
1466
  SArray* pBlkWinIdx = *((SArray**)pNode->data + 1);
×
NEW
1467
  int64_t* pIdx = taosArrayGetLast(pBlkWinIdx);
×
NEW
1468
  if (pIdx && *(int32_t*)pIdx < pExtW->blkWinStartIdx) {
×
NEW
1469
    return true;
×
1470
  }
1471

NEW
1472
  return false;
×
1473
}
1474

1475
static int32_t extWinGetWinResBlock(SOperatorInfo* pOperator, int32_t rows, SExtWinTimeWindow* pWin, SSDataBlock** ppRes, SArray** ppIdx) {
11,533✔
1476
  SExternalWindowOperator* pExtW = pOperator->info;
11,533✔
1477
  SList*                   pList = NULL;
11,533✔
1478
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
11,533✔
1479
  
1480
  if (pWin->resWinIdx >= 0) {
11,533✔
1481
    pList = taosArrayGetP(pExtW->pOutputBlocks, pWin->resWinIdx);
1,434✔
1482
  } else {
1483
    if (extWinLastWinClosed(pExtW)) {
10,099✔
NEW
1484
      pWin->resWinIdx = pExtW->outWinIdx - 1;
×
NEW
1485
      pList = taosArrayGetP(pExtW->pOutputBlocks, pWin->resWinIdx);
×
1486
    } else {
1487
      pWin->resWinIdx = pExtW->outWinIdx++;
10,099✔
1488
      pList = tdListNew(POINTER_BYTES * 2);
10,099✔
1489
      TSDB_CHECK_NULL(pList, code, lino, _exit, terrno);
10,099✔
1490
      SList** ppList = taosArrayGet(pExtW->pOutputBlocks, pWin->resWinIdx);
10,099✔
1491
      extWinRecycleBlockList(pExtW, ppList);
10,099✔
1492
      *ppList = pList;
10,099✔
1493
    }
1494
  }
1495
  
1496
  TAOS_CHECK_EXIT(extWinGetLastBlockFromList(pExtW, pList, rows, ppRes, ppIdx));
11,533✔
1497

1498
_exit:
11,533✔
1499

1500
  if (code) {
11,533✔
NEW
1501
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1502
  }
1503

1504
  return code;
11,533✔
1505
}
1506

1507
static int32_t extWinProjectDo(SOperatorInfo* pOperator, SSDataBlock* pInputBlock, int32_t startPos, int32_t rows, SExtWinTimeWindow* pWin) {
11,533✔
1508
  SExternalWindowOperator* pExtW = pOperator->info;
11,533✔
1509
  SExprSupp*               pExprSup = &pExtW->scalarSupp;
11,533✔
1510
  SSDataBlock*             pResBlock = NULL;
11,533✔
1511
  SArray*                  pIdx = NULL;
11,533✔
1512
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
11,533✔
1513
  
1514
  TAOS_CHECK_EXIT(extWinGetWinResBlock(pOperator, rows, pWin, &pResBlock, &pIdx));
11,533✔
1515

1516
  qDebug("%s %s win[%" PRId64 ", %" PRId64 "] got res block %p winRowIdx %p, resWinIdx:%d, capacity:%d", 
11,533✔
1517
      pOperator->pTaskInfo->id.str, __func__, pWin->tw.skey, pWin->tw.ekey, pResBlock, pIdx, pWin->resWinIdx, pResBlock->info.capacity);
1518
  
1519
  if (!pExtW->pTmpBlock) {
11,533✔
1520
    TAOS_CHECK_EXIT(createOneDataBlock(pInputBlock, false, &pExtW->pTmpBlock));
1,001✔
1521
  } else {
1522
    blockDataCleanup(pExtW->pTmpBlock);
10,532✔
1523
  }
1524
  
1525
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pExtW->pTmpBlock, TMAX(1, rows)));
11,533✔
1526

1527
  qDebug("%s %s start to copy %d rows to tmp blk", pOperator->pTaskInfo->id.str, __func__, rows);
11,533✔
1528
  TAOS_CHECK_EXIT(blockDataMergeNRows(pExtW->pTmpBlock, pInputBlock, startPos, rows));
11,533✔
1529

1530
  qDebug("%s %s start to apply project to tmp blk", pOperator->pTaskInfo->id.str, __func__);
11,533✔
1531
  TAOS_CHECK_EXIT(projectApplyFunctionsWithSelect(pExprSup->pExprInfo, pResBlock, pExtW->pTmpBlock, pExprSup->pCtx, pExprSup->numOfExprs,
11,533✔
1532
        NULL, GET_STM_RTINFO(pOperator->pTaskInfo), true, pExprSup->hasIndefRowsFunc));
1533

1534
  TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pIdx, pResBlock, extWinGetCurWinIdx(pOperator->pTaskInfo), rows));
11,533✔
1535

1536
_exit:
11,533✔
1537

1538
  if (code) {
11,533✔
NEW
1539
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1540
  } else {
1541
    qDebug("%s %s project succeed", pOperator->pTaskInfo->id.str, __func__);
11,533✔
1542
  }
1543
  
1544
  return code;
11,533✔
1545
}
1546

1547
static int32_t extWinProjectOpen(SOperatorInfo* pOperator, SSDataBlock* pInputBlock) {
11,037✔
1548
  SExternalWindowOperator* pExtW = pOperator->info;
11,037✔
1549
  int64_t*                 tsCol = extWinExtractTsCol(pInputBlock, pExtW->primaryTsIndex, pOperator->pTaskInfo);
11,037✔
1550
  SExtWinTimeWindow*       pWin = NULL;
11,037✔
1551
  bool                     ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
11,037✔
1552
  int32_t                  startPos = 0, winRows = 0;
11,037✔
1553
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
11,037✔
1554
  
1555
  while (true) {
1556
    TAOS_CHECK_EXIT((*pExtW->getWinFp)(pOperator, tsCol, &startPos, &pInputBlock->info, &pWin, &winRows));
22,570✔
1557
    if (pWin == NULL) {
22,570✔
1558
      break;
11,037✔
1559
    }
1560

1561
    qDebug("%s ext window [%" PRId64 ", %" PRId64 ") project start, ascScan:%d, startPos:%d, winRows:%d",
11,533✔
1562
           GET_TASKID(pOperator->pTaskInfo), pWin->tw.skey, pWin->tw.ekey, ascScan, startPos, winRows);        
1563
    
1564
    TAOS_CHECK_EXIT(extWinProjectDo(pOperator, pInputBlock, startPos, winRows, pWin));
11,533✔
1565
    
1566
    startPos += winRows;
11,533✔
1567
  }
1568
  
1569
_exit:
11,037✔
1570

1571
  if (code) {
11,037✔
NEW
1572
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
1573
  }
1574

1575
  return code;
11,037✔
1576
}
1577

NEW
1578
static int32_t extWinIndefRowsDoImpl(SOperatorInfo* pOperator, SSDataBlock* pRes, SSDataBlock* pBlock) {
×
NEW
1579
  SExternalWindowOperator* pExtW = pOperator->info;
×
NEW
1580
  SOptrBasicInfo*     pInfo = &pExtW->binfo;
×
NEW
1581
  SExprSupp*          pSup = &pOperator->exprSupp;
×
NEW
1582
  SExecTaskInfo*      pTaskInfo = pOperator->pTaskInfo;
×
NEW
1583
  int32_t order = pInfo->inputTsOrder;
×
NEW
1584
  int32_t scanFlag = pBlock->info.scanFlag;
×
NEW
1585
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
×
1586

NEW
1587
  SExprSupp* pScalarSup = &pExtW->scalarSupp;
×
NEW
1588
  if (pScalarSup->pExprInfo != NULL) {
×
NEW
1589
    TAOS_CHECK_EXIT(projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
×
1590
                                 pExtW->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo)));
1591
  }
1592

NEW
1593
  TAOS_CHECK_EXIT(setInputDataBlock(pSup, pBlock, order, scanFlag, false));
×
1594

NEW
1595
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pRes, pRes->info.rows + pBlock->info.rows));
×
1596

NEW
1597
  TAOS_CHECK_EXIT(projectApplyFunctions(pSup->pExprInfo, pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
×
1598
                               pExtW->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo)));
1599

NEW
1600
_exit:
×
1601

NEW
1602
  if (code) {
×
NEW
1603
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __FUNCTION__, lino, tstrerror(code));
×
1604
  }
1605

NEW
1606
  return code;
×
1607
}
1608

NEW
1609
static int32_t extWinIndefRowsSetWinOutputBuf(SExternalWindowOperator* pExtW, SExtWinTimeWindow* win, SExprSupp* pSupp, 
×
1610
                                     SAggSupporter* pAggSup, SExecTaskInfo* pTaskInfo, bool reset) {
NEW
1611
  int32_t code = 0, lino = 0;
×
NEW
1612
  SResultRow* pResultRow = NULL;
×
1613

NEW
1614
  pResultRow = (SResultRow*)((char*)pExtW->pResultRow + win->resWinIdx * pAggSup->resultRowSize);
×
1615
  
NEW
1616
  qDebug("set window [%" PRId64 ", %" PRId64 "] outIdx:%d", win->tw.skey, win->tw.ekey, win->resWinIdx);
×
1617

NEW
1618
  if (reset) {
×
NEW
1619
    memset(pResultRow, 0, pAggSup->resultRowSize);
×
NEW
1620
    for (int32_t k = 0; k < pSupp->numOfExprs; ++k) {
×
NEW
1621
      SqlFunctionCtx* pCtx = &pSupp->pCtx[k];
×
NEW
1622
      pCtx->pOutput = NULL;
×
1623
    }
1624
  }
1625

NEW
1626
  TAOS_SET_POBJ_ALIGNED(&pResultRow->win, &win->tw);
×
1627

1628
  // set time window for current result
NEW
1629
  TAOS_CHECK_EXIT(setResultRowInitCtx(pResultRow, pSupp->pCtx, pSupp->numOfExprs, pSupp->rowEntryInfoOffset));
×
1630

NEW
1631
_exit:
×
1632
  
NEW
1633
  if (code) {
×
NEW
1634
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1635
  }
1636

NEW
1637
  return code;
×
1638
}
1639

NEW
1640
static int32_t extWinGetSetWinResBlockBuf(SOperatorInfo* pOperator, int32_t rows, SExtWinTimeWindow* pWin, SSDataBlock** ppRes, SArray** ppIdx) {
×
NEW
1641
  SExternalWindowOperator* pExtW = pOperator->info;
×
NEW
1642
  SList*                   pList = NULL;
×
NEW
1643
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
×
1644
  
NEW
1645
  if (pWin->resWinIdx >= 0) {
×
NEW
1646
    pList = taosArrayGetP(pExtW->pOutputBlocks, pWin->resWinIdx);
×
NEW
1647
    TAOS_CHECK_EXIT(extWinIndefRowsSetWinOutputBuf(pExtW, pWin, &pOperator->exprSupp, &pExtW->aggSup, pOperator->pTaskInfo, false));
×
1648
  } else {
NEW
1649
    if (extWinLastWinClosed(pExtW)) {
×
NEW
1650
      pWin->resWinIdx = pExtW->outWinIdx - 1;
×
NEW
1651
      pList = taosArrayGetP(pExtW->pOutputBlocks, pWin->resWinIdx);
×
1652
    } else {
NEW
1653
      pWin->resWinIdx = pExtW->outWinIdx++;
×
NEW
1654
      pList = tdListNew(POINTER_BYTES * 2);
×
NEW
1655
      TSDB_CHECK_NULL(pList, code, lino, _exit, terrno);
×
NEW
1656
      SList** ppList = taosArrayGet(pExtW->pOutputBlocks, pWin->resWinIdx);
×
NEW
1657
      extWinRecycleBlockList(pExtW, ppList);
×
NEW
1658
      *ppList = pList;
×
1659
    }
NEW
1660
    TAOS_CHECK_EXIT(extWinIndefRowsSetWinOutputBuf(pExtW, pWin, &pOperator->exprSupp, &pExtW->aggSup, pOperator->pTaskInfo, true));
×
1661
  }
1662
  
NEW
1663
  TAOS_CHECK_EXIT(extWinGetLastBlockFromList(pExtW, pList, rows, ppRes, ppIdx));
×
1664

NEW
1665
_exit:
×
1666

NEW
1667
  if (code) {
×
NEW
1668
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1669
  }
1670

NEW
1671
  return code;
×
1672
}
1673

1674

NEW
1675
static int32_t extWinIndefRowsDo(SOperatorInfo* pOperator, SSDataBlock* pInputBlock, int32_t startPos, int32_t rows, SExtWinTimeWindow* pWin) {
×
NEW
1676
  SExternalWindowOperator* pExtW = pOperator->info;
×
NEW
1677
  SSDataBlock*             pResBlock = NULL;
×
NEW
1678
  SArray*                  pIdx = NULL;
×
NEW
1679
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
×
1680
  
NEW
1681
  TAOS_CHECK_EXIT(extWinGetSetWinResBlockBuf(pOperator, rows, pWin, &pResBlock, &pIdx));
×
1682
  
NEW
1683
  if (!pExtW->pTmpBlock) {
×
NEW
1684
    TAOS_CHECK_EXIT(createOneDataBlock(pInputBlock, false, &pExtW->pTmpBlock));
×
1685
  } else {
NEW
1686
    blockDataCleanup(pExtW->pTmpBlock);
×
1687
  }
1688
  
NEW
1689
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pExtW->pTmpBlock, TMAX(1, rows)));
×
1690

NEW
1691
  TAOS_CHECK_EXIT(blockDataMergeNRows(pExtW->pTmpBlock, pInputBlock, startPos, rows));
×
NEW
1692
  TAOS_CHECK_EXIT(extWinIndefRowsDoImpl(pOperator, pResBlock, pExtW->pTmpBlock));
×
1693

NEW
1694
  TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pIdx, pResBlock, extWinGetCurWinIdx(pOperator->pTaskInfo), rows));
×
1695

NEW
1696
_exit:
×
1697

NEW
1698
  if (code) {
×
NEW
1699
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1700
  }
1701
  
NEW
1702
  return code;
×
1703
}
1704

1705

NEW
1706
static int32_t extWinIndefRowsOpen(SOperatorInfo* pOperator, SSDataBlock* pInputBlock) {
×
NEW
1707
  SExternalWindowOperator* pExtW = pOperator->info;
×
NEW
1708
  int64_t*                 tsCol = extWinExtractTsCol(pInputBlock, pExtW->primaryTsIndex, pOperator->pTaskInfo);
×
NEW
1709
  SExtWinTimeWindow*       pWin = NULL;
×
NEW
1710
  bool                     ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
×
NEW
1711
  int32_t                  startPos = 0, winRows = 0;
×
NEW
1712
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
×
1713
  
1714
  while (true) {
NEW
1715
    TAOS_CHECK_EXIT((*pExtW->getWinFp)(pOperator, tsCol, &startPos, &pInputBlock->info, &pWin, &winRows));
×
NEW
1716
    if (pWin == NULL) {
×
NEW
1717
      break;
×
1718
    }
1719

NEW
1720
    qDebug("%s ext window [%" PRId64 ", %" PRId64 ") indefRows start, ascScan:%d, startPos:%d, winRows:%d",
×
1721
           GET_TASKID(pOperator->pTaskInfo), pWin->tw.skey, pWin->tw.ekey, ascScan, startPos, winRows);        
1722
    
NEW
1723
    TAOS_CHECK_EXIT(extWinIndefRowsDo(pOperator, pInputBlock, startPos, winRows, pWin));
×
1724
    
NEW
1725
    startPos += winRows;
×
1726
  }
1727
  
NEW
1728
_exit:
×
1729

NEW
1730
  if (code) {
×
NEW
1731
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
1732
  }
1733

NEW
1734
  return code;
×
1735
}
1736

1737
static int32_t extWinNonAggOutputRes(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
10,099✔
1738
  SExternalWindowOperator* pExtW = pOperator->info;
10,099✔
1739
  int32_t                  numOfWin = pExtW->outWinIdx;
10,099✔
1740
  int32_t                  code = TSDB_CODE_SUCCESS;
10,099✔
1741
  int32_t                  lino = 0;
10,099✔
1742
  SSDataBlock*             pRes = NULL;
10,099✔
1743

1744
  for (; pExtW->outputWinId < numOfWin; pExtW->outputWinId++, extWinIncCurWinOutIdx(pOperator->pTaskInfo->pStreamRuntimeInfo)) {
10,099✔
1745
    SList* pList = taosArrayGetP(pExtW->pOutputBlocks, pExtW->outputWinId);
10,099✔
1746
    if (listNEles(pList) <= 0) {
10,099✔
NEW
1747
      continue;
×
1748
    }
1749

1750
    SListNode* pNode = tdListPopHead(pList);
10,099✔
1751
    pRes = *(SSDataBlock**)pNode->data;
10,099✔
1752
    pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamBlkWinIdx = *(SArray**)((SArray**)pNode->data + 1);
10,099✔
1753
    pExtW->pLastBlkNode = pNode;
10,099✔
1754

1755
    if (listNEles(pList) <= 0) {
10,099✔
1756
      pExtW->outputWinId++;
10,099✔
1757
      extWinIncCurWinOutIdx(pOperator->pTaskInfo->pStreamRuntimeInfo);
10,099✔
1758
    }
1759

1760
    break;
10,099✔
1761
  }
1762

1763
  if (pRes) {
10,099✔
1764
    qDebug("%s result generated, rows:%" PRId64 , GET_TASKID(pOperator->pTaskInfo), pRes->info.rows);
10,099✔
1765
    pRes->info.version = pOperator->pTaskInfo->version;
10,099✔
1766
    pRes->info.dataLoad = 1;
10,099✔
1767
  } else {
NEW
1768
    pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamBlkWinIdx = NULL;
×
NEW
1769
    qDebug("%s ext window done", GET_TASKID(pOperator->pTaskInfo));
×
1770
  }
1771

1772
  *ppRes = (pRes && pRes->info.rows > 0) ? pRes : NULL;
10,099✔
1773

1774
_exit:
10,099✔
1775

1776
  if (code != TSDB_CODE_SUCCESS) {
10,099✔
NEW
1777
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1778
  }
1779

1780
  return code;
10,099✔
1781
}
1782

1783
static int32_t extWinAggHandleEmptyWins(SOperatorInfo* pOperator, SSDataBlock* pBlock, bool allRemains, SExtWinTimeWindow* pWin) {
283,015,313✔
1784
  int32_t code = 0, lino = 0;
283,015,313✔
1785
  SExternalWindowOperator* pExtW = (SExternalWindowOperator*)pOperator->info;
283,015,313✔
1786
  SExprSupp* pSup = &pOperator->exprSupp;
283,033,515✔
1787
  int32_t currIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
283,033,754✔
1788

1789
  if (NULL == pExtW->pEmptyInputBlock ||
282,987,778✔
1790
      (pWin && pWin->tw.skey == pExtW->lastSKey && pWin->tw.ekey == pExtW->lastEKey)) {
16,974,545✔
1791
    goto _exit;
267,033,129✔
1792
  }
1793

1794
  bool ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
15,997,659✔
1795
  int32_t endIdx = allRemains ? (pExtW->pWins->size - 1) : (currIdx - 1);
15,973,029✔
1796
  SResultRowInfo* pResultRowInfo = &pExtW->binfo.resultRowInfo;
15,973,029✔
1797
  SSDataBlock* pInput = pExtW->pEmptyInputBlock;
15,973,029✔
1798

1799
  if ((pExtW->lastWinId + 1) <= endIdx) {
15,973,029✔
1800
    TAOS_CHECK_EXIT(setInputDataBlock(pSup, pExtW->pEmptyInputBlock, pExtW->binfo.inputTsOrder, MAIN_SCAN, true));
193,097✔
1801
  }
1802
  
1803
  for (int32_t i = pExtW->lastWinId + 1; i <= endIdx; ++i) {
16,620,541✔
1804
    SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, i);
647,512✔
1805

1806
    extWinSetCurWinIdx(pOperator, i);
647,512✔
1807
    qDebug("%s %dth ext empty window start:%" PRId64 ", end:%" PRId64 ", ascScan:%d",
647,512✔
1808
           GET_TASKID(pOperator->pTaskInfo), i, pWin->tw.skey, pWin->tw.ekey, ascScan);
1809

1810
    TAOS_CHECK_EXIT(extWinAggSetWinOutputBuf(pOperator, pWin, pSup, &pExtW->aggSup, pOperator->pTaskInfo));
647,512✔
1811

1812
    updateTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pWin->tw, 1);
647,512✔
1813
    code = extWinAggDo(pOperator, 0, 1, pInput);
647,512✔
1814
    pExtW->lastWinId = i;  
647,512✔
1815
    TAOS_CHECK_EXIT(code);
647,512✔
1816
  }
1817

1818
  
1819
_exit:
15,973,029✔
1820

1821
  if (code) {
283,021,919✔
NEW
1822
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __FUNCTION__, lino, tstrerror(code));
×
1823
  } else {
1824
    if (pBlock) {
283,021,919✔
1825
      TAOS_CHECK_EXIT(setInputDataBlock(pSup, pBlock, pExtW->binfo.inputTsOrder, pBlock->info.scanFlag, true));
282,532,783✔
1826
    }
1827

1828
    if (!allRemains) {
282,984,934✔
1829
      extWinSetCurWinIdx(pOperator, currIdx);  
282,519,149✔
1830
    }
1831
  }
1832

1833
  return code;
283,018,884✔
1834
}
1835

1836
static int32_t extWinAggOpen(SOperatorInfo* pOperator, SSDataBlock* pInputBlock) {
4,233,474✔
1837
  SExternalWindowOperator* pExtW = (SExternalWindowOperator*)pOperator->info;
4,233,474✔
1838
  int32_t                  startPos = 0, winRows = 0;
4,233,474✔
1839
  int64_t*                 tsCol = extWinExtractTsCol(pInputBlock, pExtW->primaryTsIndex, pOperator->pTaskInfo);
4,233,247✔
1840
  bool                     ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
4,233,474✔
1841
  int32_t                  code = 0, lino = 0;
4,233,474✔
1842
  SExtWinTimeWindow*       pWin = NULL;
4,233,474✔
1843
  bool                     scalarCalc = false;
4,233,474✔
1844

1845
  while (true) {
1846
    TAOS_CHECK_EXIT((*pExtW->getWinFp)(pOperator, tsCol, &startPos, &pInputBlock->info, &pWin, &winRows));
286,724,918✔
1847
    if (pWin == NULL) {
286,752,833✔
1848
      break;
4,233,474✔
1849
    }
1850

1851
    TAOS_CHECK_EXIT(extWinAggHandleEmptyWins(pOperator, pInputBlock, false, pWin));
282,519,359✔
1852

1853
    qDebug("%s ext window [%" PRId64 ", %" PRId64 ") agg start, ascScan:%d, startPos:%d, winRows:%d",
282,507,638✔
1854
           GET_TASKID(pOperator->pTaskInfo), pWin->tw.skey, pWin->tw.ekey, ascScan, startPos, winRows);        
1855

1856
    if (!scalarCalc) {
282,540,978✔
1857
      if (pExtW->scalarSupp.pExprInfo) {
4,186,601✔
1858
        SExprSupp* pScalarSup = &pExtW->scalarSupp;
6,453✔
1859
        TAOS_CHECK_EXIT(projectApplyFunctions(pScalarSup->pExprInfo, pInputBlock, pInputBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
6,453✔
1860
                                     pExtW->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo)));
1861
      }
1862
      
1863
      scalarCalc = true;
4,186,601✔
1864
    }
1865

1866
    if (pWin->tw.skey != pExtW->lastSKey || pWin->tw.ekey != pExtW->lastEKey || pWin->tw.skey == INT64_MIN) {
282,540,978✔
1867
      TAOS_CHECK_EXIT(
281,086,753✔
1868
          extWinAggSetWinOutputBuf(pOperator, pWin, &pOperator->exprSupp, &pExtW->aggSup, pOperator->pTaskInfo));
1869
    }
1870

1871
    updateTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pWin->tw, 1);
282,437,866✔
1872
    TAOS_CHECK_EXIT(extWinAggDo(pOperator, startPos, winRows, pInputBlock));
282,520,892✔
1873
    
1874
    pExtW->lastSKey = pWin->tw.skey;
282,373,800✔
1875
    pExtW->lastEKey = pWin->tw.ekey;
282,431,145✔
1876
    pExtW->lastWinId = extWinGetCurWinIdx(pOperator->pTaskInfo);
282,480,187✔
1877
    startPos += winRows;
282,491,444✔
1878
  }
1879

1880
_exit:
4,233,474✔
1881

1882
  if (code) {
4,233,474✔
NEW
1883
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
1884
  }
1885

1886
  return code;
4,233,474✔
1887
}
1888

1889
static int32_t extWinAggOutputRes(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,035,838✔
1890
  SExternalWindowOperator* pExtW = pOperator->info;
1,035,838✔
1891
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
1,035,838✔
1892
  SSDataBlock*             pBlock = pExtW->binfo.pRes;
1,035,838✔
1893
  int32_t                  code = TSDB_CODE_SUCCESS;
1,035,838✔
1894
  int32_t                  lino = 0;
1,035,838✔
1895
  SExprInfo*               pExprInfo = pOperator->exprSupp.pExprInfo;
1,035,838✔
1896
  int32_t                  numOfExprs = pOperator->exprSupp.numOfExprs;
1,035,838✔
1897
  int32_t*                 rowEntryOffset = pOperator->exprSupp.rowEntryInfoOffset;
1,035,838✔
1898
  SqlFunctionCtx*          pCtx = pOperator->exprSupp.pCtx;
1,035,838✔
1899
  int32_t                  numOfWin = pExtW->outWinIdx;
1,035,599✔
1900

1901
  pBlock->info.version = pTaskInfo->version;
1,035,838✔
1902
  blockDataCleanup(pBlock);
1,035,838✔
1903
  taosArrayClear(pExtW->pWinRowIdx);
1,035,838✔
1904

1905
  for (; pExtW->outputWinId < pExtW->pWins->size; ++pExtW->outputWinId) {
268,047,309✔
1906
    SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->outputWinId);
267,077,365✔
1907
    int32_t            winIdx = pWin->resWinIdx;
267,076,901✔
1908
    if (winIdx < 0) {
267,078,995✔
1909
      continue;
202,012✔
1910
    }
1911

1912
    pExtW->outputWinNum++;
266,876,983✔
1913
    SResultRow* pRow = (SResultRow*)((char*)pExtW->pResultRow + winIdx * pExtW->aggSup.resultRowSize);
266,880,216✔
1914

1915
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
266,881,495✔
1916

1917
    // no results, continue to check the next one
1918
    if (pRow->numOfRows == 0) {
266,885,741✔
NEW
1919
      continue;
×
1920
    }
1921

1922
    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
266,886,950✔
1923
      uint32_t newSize = pBlock->info.rows + pRow->numOfRows + numOfWin - pExtW->outputWinNum;
167,797✔
1924
      TAOS_CHECK_EXIT(blockDataEnsureCapacity(pBlock, newSize));
167,797✔
1925
      qDebug("datablock capacity not sufficient, expand to required:%d, current capacity:%d, %s", newSize,
167,797✔
1926
             pBlock->info.capacity, GET_TASKID(pTaskInfo));
1927
    }
1928

1929
    TAOS_CHECK_EXIT(copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo));
266,894,681✔
1930

1931
    pBlock->info.rows += pRow->numOfRows;
266,922,807✔
1932

1933
    TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pExtW->pWinRowIdx, pBlock, pRow->winIdx, pRow->numOfRows));
266,921,795✔
1934

1935
    if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
266,861,648✔
1936
      ++pExtW->outputWinId;
53,707✔
1937
      break;
53,707✔
1938
    }
1939
  }
1940

1941
  qDebug("%s result generated, rows:%" PRId64 ", groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
1,033,884✔
1942
         pBlock->info.id.groupId);
1943

1944
  int32_t rowsBeforeFilter = pBlock->info.rows;
1,033,884✔
1945
  SColumnInfoData* pFilterRes = NULL;
1,035,838✔
1946
  TAOS_CHECK_EXIT(doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL, &pFilterRes));
1,035,838✔
1947
  if (pBlock->info.rows < rowsBeforeFilter) {
1,035,838✔
1948
    if (pFilterRes != NULL) {
241✔
1949
      TAOS_CHECK_EXIT(extWinRebuildWinIdxByFilter(pTaskInfo, pExtW->pWinRowIdx, rowsBeforeFilter, pFilterRes));
241✔
1950
    } else {
1951
      // no indicator means all rows are filtered out by short-circuit path
NEW
1952
      taosArrayClear(pExtW->pWinRowIdx);
×
1953
    }
1954
  }
1955

1956
  pBlock->info.dataLoad = 1;
1,035,838✔
1957

1958
  *ppRes = (pBlock->info.rows > 0) ? pBlock : NULL;
1,035,838✔
1959

1960
  if (*ppRes) {
1,035,838✔
1961
    (*ppRes)->info.window.skey = pExtW->orgTableTimeRange.skey;
537,877✔
1962
    (*ppRes)->info.window.ekey = pExtW->orgTableTimeRange.ekey;
537,877✔
1963
  }
1964
  if (pOperator->pTaskInfo->pStreamRuntimeInfo) {
1,035,599✔
1965
    pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamBlkWinIdx = pExtW->pWinRowIdx;
1,035,599✔
1966
  }
1967

1968
_exit:
712,319✔
1969
  colDataDestroy(pFilterRes);
1,035,599✔
1970
  taosMemoryFree(pFilterRes);
1,035,838✔
1971

1972
  if (code != TSDB_CODE_SUCCESS) {
1,035,599✔
NEW
1973
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1974
  }
1975

1976
  return code;
1,035,599✔
1977
}
1978

1979
static int32_t extWinInitResultRow(SExecTaskInfo*        pTaskInfo, SExternalWindowOperator* pExtW, int32_t winNum) {
507,944✔
1980
  if (EEXT_MODE_SCALAR == pExtW->mode) {
507,944✔
1981
    return TSDB_CODE_SUCCESS;
9,851✔
1982
  }
1983

1984
  if (winNum <= pExtW->resultRowCapacity) {
498,320✔
1985
    return TSDB_CODE_SUCCESS;
281,392✔
1986
  }
1987
  
1988
  taosMemoryFreeClear(pExtW->pResultRow);
216,928✔
1989
  pExtW->resultRowCapacity = -1;
216,928✔
1990

1991
  int32_t code = 0, lino = 0;
216,928✔
1992
  
1993
  pExtW->pResultRow = taosMemoryCalloc(winNum, pExtW->aggSup.resultRowSize);
216,928✔
1994
  QUERY_CHECK_NULL(pExtW->pResultRow, code, lino, _exit, terrno);
216,701✔
1995

1996
  pExtW->resultRowCapacity = winNum;
216,701✔
1997

1998
_exit:
216,701✔
1999

2000
  if (code) {
216,701✔
NEW
2001
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
2002
  }
2003

2004
  return code;
216,701✔
2005
}
2006

2007
static void extWinFreeResultRow(SExternalWindowOperator* pExtW) {
497,961✔
2008
  if (pExtW->resultRowCapacity * pExtW->aggSup.resultRowSize >= 1048576) {
497,961✔
2009
    taosMemoryFreeClear(pExtW->pResultRow);
52,751✔
2010
    pExtW->resultRowCapacity = -1;
52,751✔
2011
  }
2012
  if (pExtW->binfo.pRes && pExtW->binfo.pRes->info.rows * pExtW->aggSup.resultRowSize >= 1048576) {
497,961✔
NEW
2013
    blockDataFreeCols(pExtW->binfo.pRes);
×
2014
  }
2015
}
497,961✔
2016

2017
static int32_t extWinInitWindowList(SExternalWindowOperator* pExtW, SExecTaskInfo*        pTaskInfo) {
507,944✔
2018
  if (taosArrayGetSize(pExtW->pWins) > 0) {
507,944✔
NEW
2019
    return TSDB_CODE_SUCCESS;
×
2020
  }
2021
  
2022
  int32_t code = 0, lino = 0;
508,171✔
2023
  SStreamRuntimeFuncInfo* pInfo = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
508,171✔
2024
  size_t size = taosArrayGetSize(pInfo->pStreamPesudoFuncVals);
508,171✔
2025
  SExtWinTimeWindow* pWin = taosArrayReserve(pExtW->pWins, size);
508,171✔
2026
  TSDB_CHECK_NULL(pWin, code, lino, _exit, terrno);
508,171✔
2027

2028
  TAOS_CHECK_EXIT(extWinInitResultRow(pTaskInfo, pExtW, size));
508,171✔
2029

2030
  if (pExtW->timeRangeExpr && pExtW->timeRangeExpr->needCalc) {
507,944✔
2031
    TAOS_CHECK_EXIT(scalarCalculateExtWinsTimeRange(pExtW->timeRangeExpr, pInfo, pWin));
106,199✔
2032
    if (qDebugFlag & DEBUG_DEBUG) {
106,199✔
2033
      for (int32_t i = 0; i < size; ++i) {
331,811✔
2034
        qDebug("%s the %d/%d ext window calced initialized, TR[%" PRId64 ", %" PRId64 ")", 
225,612✔
2035
            pTaskInfo->id.str, i, (int32_t)size, pWin[i].tw.skey, pWin[i].tw.ekey);
2036
      }
2037
    }
2038
  } else {
2039
    for (int32_t i = 0; i < size; ++i) {
267,330,949✔
2040
      SSTriggerCalcParam* pParam = taosArrayGet(pInfo->pStreamPesudoFuncVals, i);
266,929,483✔
2041

2042
      pWin[i].tw.skey = pParam->wstart;
266,927,192✔
2043
      pWin[i].tw.ekey = pParam->wend + ((pInfo->triggerType != STREAM_TRIGGER_SLIDING) ? 1 : 0);
266,927,684✔
2044
      pWin[i].resWinIdx = -1;
266,927,670✔
2045

2046
      qDebug("%s the %d/%d ext window initialized, TR[%" PRId64 ", %" PRId64 ")", 
266,927,178✔
2047
          pTaskInfo->id.str, i, (int32_t)size, pWin[i].tw.skey, pWin[i].tw.ekey);
2048
    }
2049
  }
2050
  
2051
  pExtW->outputWinId = pInfo->curIdx;
507,665✔
2052
  pExtW->lastWinId = -1;
508,171✔
2053
  pExtW->blkWinStartIdx = pInfo->curIdx;
508,171✔
2054

2055
_exit:
508,171✔
2056

2057
  if (code) {
508,171✔
NEW
2058
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
2059
  }
2060

2061
  return code;
508,171✔
2062
}
2063

2064
static bool extWinNonAggGotResBlock(SExternalWindowOperator* pExtW) {
11,037✔
2065
  if (pExtW->multiTableMode && !pExtW->inputHasOrder) {
11,037✔
2066
    return false;
2,124✔
2067
  }
2068
  int32_t remainWin = pExtW->outWinIdx - pExtW->outputWinId;
8,913✔
2069
  if (remainWin > 1 && (NULL == pExtW->timeRangeExpr || !pExtW->timeRangeExpr->needCalc)) {
8,913✔
NEW
2070
    return true;
×
2071
  }
2072
  
2073
  SList* pList = taosArrayGetP(pExtW->pOutputBlocks, pExtW->outputWinId);
8,913✔
2074
  if (!pList || listNEles(pList) <= 0) {
8,913✔
NEW
2075
    return false;
×
2076
  }
2077
  if (listNEles(pList) > 1) {
8,913✔
NEW
2078
    return true;
×
2079
  }
2080

2081
  SListNode* pNode = listHead(pList);
8,913✔
2082
  SArray* pIdx = *(SArray**)((SArray**)pNode->data + 1);
8,913✔
2083
  int32_t* winIdx = taosArrayGetLast(pIdx);
8,913✔
2084
  if (winIdx && *winIdx < pExtW->blkWinStartIdx) {
8,913✔
NEW
2085
    return true;
×
2086
  }
2087

2088
  return false;
8,913✔
2089
}
2090

NEW
2091
static int32_t getTimeWindowOfBlock(SSDataBlock *pBlock, col_id_t tsSlotId, int64_t *startTs, int64_t *endTs) {
×
NEW
2092
  int32_t code = TSDB_CODE_SUCCESS;
×
NEW
2093
  int32_t lino = 0;
×
NEW
2094
  int32_t tsIndex = -1;
×
NEW
2095
  for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
×
NEW
2096
    SColumnInfoData *pCol = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
×
NEW
2097
    QUERY_CHECK_NULL(pCol, code, lino, _return, terrno)
×
NEW
2098
    if (pCol->info.colId == tsSlotId) {
×
NEW
2099
      tsIndex = i;
×
NEW
2100
      break;
×
2101
    }
2102
  }
2103

NEW
2104
  if (tsIndex == -1) {
×
NEW
2105
    tsIndex = (int32_t)taosArrayGetSize(pBlock->pDataBlock) - 1;
×
2106
  }
2107

NEW
2108
  SColumnInfoData *pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, tsIndex);
×
NEW
2109
  QUERY_CHECK_NULL(pColData, code, lino, _return, terrno)
×
2110

NEW
2111
  GET_TYPED_DATA(*startTs, int64_t, TSDB_DATA_TYPE_TIMESTAMP, colDataGetNumData(pColData, 0), 0);
×
NEW
2112
  GET_TYPED_DATA(*endTs, int64_t, TSDB_DATA_TYPE_TIMESTAMP, colDataGetNumData(pColData, pBlock->info.rows - 1), 0);
×
2113

NEW
2114
  return code;
×
NEW
2115
_return:
×
NEW
2116
  qError("failed to get time window of block, %s code:%s, line:%d", __func__, tstrerror(code), lino);
×
NEW
2117
  return code;
×
2118
}
2119

2120
static int32_t extWinOpen(SOperatorInfo* pOperator) {
508,171✔
2121
  if (OPTR_IS_OPENED(pOperator) && !pOperator->pOperatorGetParam) {
508,171✔
NEW
2122
    return TSDB_CODE_SUCCESS;
×
2123
  }
2124
  
2125
  int32_t                  code = 0;
508,171✔
2126
  int32_t                  lino = 0;
508,171✔
2127
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
508,171✔
2128
  SOperatorInfo*           pDownstream = pOperator->pDownstream[0];
508,171✔
2129
  SExternalWindowOperator* pExtW = pOperator->info;
508,171✔
2130
  SExprSupp*               pSup = &pOperator->exprSupp;
508,171✔
2131

2132
  if (pOperator->pOperatorGetParam) {
507,944✔
NEW
2133
    SOperatorParam*               pParam = (SOperatorParam*)(pOperator->pOperatorGetParam);
×
NEW
2134
    SOperatorParam*               pDownParam = (SOperatorParam*)(pOperator->pDownstreamGetParams[0]);
×
NEW
2135
    SExchangeOperatorParam*       pExecParam = NULL;
×
NEW
2136
    SExternalWindowOperatorParam* pExtPram = (SExternalWindowOperatorParam*)pParam->value;
×
2137

NEW
2138
    if (pExtW->pWins) {
×
NEW
2139
      taosArrayDestroy(pExtW->pWins);
×
2140
    }
2141

NEW
2142
    pExtW->pWins = pExtPram->ExtWins;
×
2143

NEW
2144
    TAOS_CHECK_EXIT(extWinInitResultRow(pTaskInfo, pExtW, taosArrayGetSize(pExtW->pWins)));
×
NEW
2145
    pExtPram->ExtWins = NULL;
×
NEW
2146
    pExtW->outputWinId = 0;
×
NEW
2147
    pExtW->lastWinId = -1;
×
NEW
2148
    pExtW->blkWinStartIdx = 0;
×
NEW
2149
    pExtW->outWinIdx = 0;
×
NEW
2150
    pExtW->lastSKey = INT64_MIN;
×
NEW
2151
    pExtW->lastEKey = INT64_MIN;
×
NEW
2152
    pExtW->isDynWindow = true;
×
NEW
2153
    pExtW->orgTableTimeRange.skey = INT64_MAX;
×
NEW
2154
    pExtW->orgTableTimeRange.ekey = INT64_MIN;
×
2155

NEW
2156
    QUERY_CHECK_CONDITION(pOperator->numOfDownstream == 1, code, lino, _exit, TSDB_CODE_INVALID_PARA)
×
2157

NEW
2158
    switch (pDownParam->opType) {
×
NEW
2159
      case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: {
×
NEW
2160
        pExecParam = (SExchangeOperatorParam*)((SOperatorParam*)(pOperator->pDownstreamGetParams[0]))->value;
×
NEW
2161
        if (!pExecParam->multiParams) {
×
NEW
2162
          pExecParam->basic.vgId = pExtW->orgTableVgId;
×
NEW
2163
          taosArrayClear(pExecParam->basic.uidList);
×
NEW
2164
          QUERY_CHECK_NULL(taosArrayPush(pExecParam->basic.uidList, &pExtW->orgTableUid), code, lino, _exit, terrno)
×
2165
        }
NEW
2166
        break;
×
2167
      }
NEW
2168
      default:
×
NEW
2169
        break;
×
2170
    }
2171

NEW
2172
    freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
×
NEW
2173
    pOperator->pOperatorGetParam = NULL;
×
2174
  } else {
2175
    TAOS_CHECK_EXIT(extWinInitWindowList(pExtW, pTaskInfo));
507,944✔
2176
  }
2177

2178
  while (1) {
4,244,481✔
2179
    pExtW->blkWinIdx = -1;
4,752,652✔
2180
    pExtW->blkWinStartSet = false;
4,752,652✔
2181
    pExtW->blkRowStartIdx = 0;
4,752,652✔
2182

2183
    SSDataBlock* pBlock = getNextBlockFromDownstreamRemainDetach(pOperator, 0);
4,752,652✔
2184
    if (pBlock == NULL) {
4,752,114✔
2185
      if (EEXT_MODE_AGG == pExtW->mode) {
507,842✔
2186
        TAOS_CHECK_EXIT(extWinAggHandleEmptyWins(pOperator, pBlock, true, NULL));
497,991✔
2187
      }
2188
      pExtW->blkWinStartIdx = pExtW->pWins->size;
507,842✔
2189
      break;
507,842✔
2190
    }
2191

2192
    if (pExtW->isDynWindow) {
4,244,272✔
NEW
2193
      TSKEY skey = 0;
×
NEW
2194
      TSKEY ekey = 0;
×
NEW
2195
      code = getTimeWindowOfBlock(pBlock, pExtW->primaryTsIndex, &skey, &ekey);
×
NEW
2196
      QUERY_CHECK_CODE(code, lino, _exit);
×
NEW
2197
      pExtW->orgTableTimeRange.skey = TMIN(pExtW->orgTableTimeRange.skey, skey);
×
NEW
2198
      pExtW->orgTableTimeRange.ekey = TMAX(pExtW->orgTableTimeRange.ekey, ekey);
×
2199
    }
2200

2201
    printDataBlock(pBlock, __func__, pTaskInfo->id.str, pTaskInfo->id.queryId);
4,244,051✔
2202

2203
    qDebug("ext window mode:%d got %" PRId64 " rows from downstream", pExtW->mode, pBlock->info.rows);
4,244,260✔
2204
    
2205
    switch (pExtW->mode) {
4,244,511✔
2206
      case EEXT_MODE_SCALAR:
11,037✔
2207
        TAOS_CHECK_EXIT(extWinProjectOpen(pOperator, pBlock));
11,037✔
2208
        if (extWinNonAggGotResBlock(pExtW)) {
11,037✔
NEW
2209
          return code;
×
2210
        }
2211
        break;
11,037✔
2212
      case EEXT_MODE_AGG:
4,233,474✔
2213
        TAOS_CHECK_EXIT(extWinAggOpen(pOperator, pBlock));
4,233,474✔
2214
        break;
4,233,444✔
NEW
2215
      case EEXT_MODE_INDEFR_FUNC:
×
NEW
2216
        TAOS_CHECK_EXIT(extWinIndefRowsOpen(pOperator, pBlock));
×
NEW
2217
        if (extWinNonAggGotResBlock(pExtW)) {
×
NEW
2218
          return code;
×
2219
        }
NEW
2220
        break;
×
NEW
2221
      default:
×
NEW
2222
        break;
×
2223
    }
2224
  }
2225

2226
  if (pOperator->pOperatorGetParam) {
507,842✔
NEW
2227
    freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
×
NEW
2228
    pOperator->pOperatorGetParam = NULL;
×
2229
  }
2230
  OPTR_SET_OPENED(pOperator);
507,842✔
2231

2232
#if 0
2233
  if (pExtW->mode == EEXT_MODE_AGG) {
2234
    qDebug("ext window before dump final rows num:%d", tSimpleHashGetSize(pExtW->aggSup.pResultRowHashTable));
2235

2236
    code = initGroupedResultInfo(&pExtW->groupResInfo, pExtW->aggSup.pResultRowHashTable, pExtW->binfo.inputTsOrder);
2237
    QUERY_CHECK_CODE(code, lino, _exit);
2238

2239
    qDebug("ext window after dump final rows num:%d", tSimpleHashGetSize(pExtW->aggSup.pResultRowHashTable));
2240
  }
2241
#endif
2242

2243
_exit:
507,842✔
2244

2245
  if (code != 0) {
507,842✔
NEW
2246
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
NEW
2247
    pTaskInfo->code = code;
×
NEW
2248
    T_LONG_JMP(pTaskInfo->env, code);
×
2249
  }
2250
  
2251
  return code;
507,842✔
2252
}
2253

2254
static int32_t extWinNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,046,266✔
2255
  int32_t                  code = 0;
1,046,266✔
2256
  int32_t                  lino = 0;
1,046,266✔
2257
  SExternalWindowOperator* pExtW = pOperator->info;
1,046,266✔
2258
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
1,046,266✔
2259

2260
  if (pOperator->status == OP_EXEC_DONE && !pOperator->pOperatorGetParam) {
1,046,266✔
NEW
2261
    *ppRes = NULL;
×
NEW
2262
    return code;
×
2263
  }
2264

2265
  if (pOperator->pOperatorGetParam) {
1,046,266✔
NEW
2266
    if (pOperator->status == OP_EXEC_DONE) {
×
NEW
2267
      pOperator->status = OP_NOT_OPENED;
×
2268
    }
2269
  }
2270

2271
  extWinRecycleBlkNode(pExtW, &pExtW->pLastBlkNode);
1,046,266✔
2272

2273
  if (pOperator->status == OP_NOT_OPENED) {
1,046,027✔
2274
    TAOS_CHECK_EXIT(pOperator->fpSet._openFn(pOperator));
508,171✔
2275
  }
2276

2277
  if (pExtW->mode == EEXT_MODE_SCALAR || pExtW->mode == EEXT_MODE_INDEFR_FUNC) {
1,045,698✔
2278
    TAOS_CHECK_EXIT(extWinNonAggOutputRes(pOperator, ppRes));
9,860✔
2279
    if (NULL == *ppRes) {
10,099✔
NEW
2280
      setOperatorCompleted(pOperator);
×
NEW
2281
      extWinFreeResultRow(pExtW);
×
2282
    }
2283
  } else {
2284
#if 0    
2285
    doBuildResultDatablock(pOperator, &pExtW->binfo, &pExtW->groupResInfo, pExtW->aggSup.pResultBuf);
2286
    bool hasRemain = hasRemainResults(&pExtW->groupResInfo);
2287
    if (!hasRemain) {
2288
      setOperatorCompleted(pOperator);
2289
      break;
2290
    }
2291
    if (pExtW->binfo.pRes->info.rows > 0) break;
2292
#else
2293
    while (1) {
2294
      TAOS_CHECK_EXIT(extWinAggOutputRes(pOperator, ppRes));
1,035,838✔
2295
      if (NULL != *ppRes) {
1,035,599✔
2296
        break;
537,638✔
2297
      }
2298

2299
      if (pExtW->outputWinId >= pExtW->pWins->size) {
497,961✔
2300
        setOperatorCompleted(pOperator);
497,961✔
2301
        if (pTaskInfo->pStreamRuntimeInfo) {
497,961✔
2302
          extWinFreeResultRow(pExtW);
497,961✔
2303
        }
2304
        break;
497,722✔
2305
      }
2306
    }
2307
#endif      
2308
  }
2309

2310
  if (*ppRes) {
1,045,459✔
2311
    printDataBlock(*ppRes, __func__, GET_TASKID(pTaskInfo), pTaskInfo->id.queryId);
547,707✔
2312
  }
2313
  
2314
_exit:
497,752✔
2315

2316
  if (code) {
1,045,459✔
NEW
2317
    qError("%s %s failed at line %d since %s", GET_TASKID(pTaskInfo), __func__, lino, tstrerror(code));
×
NEW
2318
    pTaskInfo->code = code;
×
NEW
2319
    T_LONG_JMP(pTaskInfo->env, code);
×
2320
  }
2321

2322
  if ((*ppRes) && (*ppRes)->info.rows <= 0) {
1,045,459✔
NEW
2323
    *ppRes = NULL;
×
2324
  }
2325

2326
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM && (*ppRes)) {
1,045,459✔
2327
    printDataBlock(*ppRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo), pTaskInfo->id.queryId);
547,976✔
2328
  }
2329

2330
  return code;
1,045,459✔
2331
}
2332

2333
int32_t createStreamExternalWindowOperator(SOperatorInfo* pDownstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo,
170,689✔
2334
                                           SOperatorInfo** pOptrOut) {
2335
  SExternalWindowPhysiNode* pPhynode = (SExternalWindowPhysiNode*)pNode;
170,689✔
2336
  QRY_PARAM_CHECK(pOptrOut);
170,689✔
2337
  int32_t                  code = 0;
170,689✔
2338
  int32_t                  lino = 0;
170,689✔
2339
  SExternalWindowOperator* pExtW = taosMemoryCalloc(1, sizeof(SExternalWindowOperator));
170,689✔
2340
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
170,379✔
2341
  pOperator->pPhyNode = pNode;
170,505✔
2342
  if (!pExtW || !pOperator) {
170,505✔
NEW
2343
    code = terrno;
×
NEW
2344
    lino = __LINE__;
×
NEW
2345
    goto _error;
×
2346
  }
2347
  initOperatorCostInfo(pOperator);
170,689✔
2348
  
2349
  setOperatorInfo(pOperator, "ExternalWindowOperator", QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW, true, OP_NOT_OPENED,
170,689✔
2350
                  pExtW, pTaskInfo);
2351
                  
2352
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhynode->window.node.pOutputDataBlockDesc);
170,689✔
2353
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
170,689✔
2354
  initBasicInfo(&pExtW->binfo, pResBlock);
170,689✔
2355

2356
  pExtW->primaryTsIndex = ((SColumnNode*)pPhynode->window.pTspk)->slotId;
170,689✔
2357
  pExtW->mode = pPhynode->window.pProjs ? EEXT_MODE_SCALAR : (pPhynode->window.indefRowsFunc ? EEXT_MODE_INDEFR_FUNC : EEXT_MODE_AGG);
170,689✔
2358
  pExtW->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC;
170,689✔
2359
  pExtW->binfo.outputTsOrder = pExtW->binfo.inputTsOrder;
170,689✔
2360
  pExtW->isDynWindow = false;
170,689✔
2361

2362
  if (pTaskInfo->pStreamRuntimeInfo != NULL){
170,689✔
2363
    pTaskInfo->pStreamRuntimeInfo->funcInfo.withExternalWindow = true;
170,689✔
2364
  }
2365

2366
  // pExtW->limitInfo = (SLimitInfo){0};
2367
  // initLimitInfo(pPhynode->window.node.pLimit, pPhynode->window.node.pSlimit, &pExtW->limitInfo);
2368

2369
  if (pPhynode->window.pProjs) {
170,321✔
2370
    int32_t    numOfScalarExpr = 0;
1,001✔
2371
    SExprInfo* pScalarExprInfo = NULL;
1,001✔
2372
    code = createExprInfo(pPhynode->window.pProjs, NULL, &pScalarExprInfo, &numOfScalarExpr);
1,001✔
2373
    QUERY_CHECK_CODE(code, lino, _error);
1,001✔
2374

2375
    code = initExprSupp(&pExtW->scalarSupp, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
1,001✔
2376
    QUERY_CHECK_CODE(code, lino, _error);
1,001✔
2377

2378
  //if (pExtW->multiTableMode) {
2379
    pExtW->pOutputBlocks = taosArrayInit_s(POINTER_BYTES, STREAM_CALC_REQ_MAX_WIN_NUM);
1,001✔
2380
    if (!pExtW->pOutputBlocks) QUERY_CHECK_CODE(terrno, lino, _error);
1,001✔
2381
  //}
2382
    pExtW->pFreeBlocks = tdListNew(POINTER_BYTES * 2);
1,001✔
2383
    QUERY_CHECK_NULL(pExtW->pFreeBlocks, code, lino, _error, terrno);
1,001✔
2384
  } else if (pExtW->mode == EEXT_MODE_AGG) {
169,504✔
2385
    if (pPhynode->window.pExprs != NULL) {
169,504✔
2386
      int32_t    num = 0;
2,151✔
2387
      SExprInfo* pSExpr = NULL;
2,151✔
2388
      code = createExprInfo(pPhynode->window.pExprs, NULL, &pSExpr, &num);
2,151✔
2389
      QUERY_CHECK_CODE(code, lino, _error);
2,151✔
2390
    
2391
      code = initExprSupp(&pExtW->scalarSupp, pSExpr, num, &pTaskInfo->storageAPI.functionStore);
2,151✔
2392
      if (code != TSDB_CODE_SUCCESS) {
2,151✔
NEW
2393
        goto _error;
×
2394
      }
2395
      checkIndefRowsFuncs(&pExtW->scalarSupp);
2,151✔
2396
    }
2397
    
2398
    size_t keyBufSize = sizeof(int64_t) * 2 + POINTER_BYTES;
169,504✔
2399
    initResultSizeInfo(&pOperator->resultInfo, 4096);
169,504✔
2400
    //code = blockDataEnsureCapacity(pExtW->binfo.pRes, pOperator->resultInfo.capacity);
2401
    //QUERY_CHECK_CODE(code, lino, _error);
2402

2403
    pExtW->pWinRowIdx = taosArrayInit(4096, sizeof(int64_t));
169,688✔
2404
    TSDB_CHECK_NULL(pExtW->pWinRowIdx, code, lino, _error, terrno);
169,688✔
2405
    
2406
    int32_t num = 0;
169,688✔
2407
    SExprInfo* pExprInfo = NULL;
169,688✔
2408
    code = createExprInfo(pPhynode->window.pFuncs, NULL, &pExprInfo, &num);
169,688✔
2409
    QUERY_CHECK_CODE(code, lino, _error);
169,238✔
2410
    pOperator->exprSupp.hasWindow = true;
169,238✔
2411
    pOperator->exprSupp.hasWindowOrGroup = true;
169,238✔
2412
    code = initAggSup(&pOperator->exprSupp, &pExtW->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, 0, 0);
169,461✔
2413
    QUERY_CHECK_CODE(code, lino, _error);
169,688✔
2414

2415
    code = filterInitFromNode((SNode*)pNode->pConditions, &pOperator->exprSupp.pFilterInfo, 0,
169,688✔
2416
                              pTaskInfo->pStreamRuntimeInfo);
169,688✔
2417
    QUERY_CHECK_CODE(code, lino, _error);
169,688✔
2418

2419
    nodesWalkExprs(pPhynode->window.pFuncs, extWinHasCountLikeFunc, &pExtW->hasCountFunc);
169,688✔
2420
    if (pExtW->hasCountFunc) {
169,688✔
2421
      code = extWinCreateEmptyInputBlock(pOperator, &pExtW->pEmptyInputBlock);
111,457✔
2422
      QUERY_CHECK_CODE(code, lino, _error);
111,457✔
2423
      qDebug("%s ext window has CountLikeFunc", pOperator->pTaskInfo->id.str);
111,457✔
2424
    } else {
2425
      qDebug("%s ext window doesn't have CountLikeFunc", pOperator->pTaskInfo->id.str);
58,231✔
2426
    }
2427

2428
    code = initExecTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pTaskInfo->window);
169,688✔
2429
    QUERY_CHECK_CODE(code, lino, _error);
169,688✔
2430

2431
    pExtW->lastSKey = INT64_MIN;
169,688✔
2432
    pExtW->lastEKey = INT64_MIN;
169,688✔
2433
  } else {
NEW
2434
    size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
×
2435
    
NEW
2436
    if (pPhynode->window.pExprs != NULL) {
×
NEW
2437
      int32_t    num = 0;
×
NEW
2438
      SExprInfo* pSExpr = NULL;
×
NEW
2439
      code = createExprInfo(pPhynode->window.pExprs, NULL, &pSExpr, &num);
×
NEW
2440
      QUERY_CHECK_CODE(code, lino, _error);
×
2441
    
NEW
2442
      code = initExprSupp(&pExtW->scalarSupp, pSExpr, num, &pTaskInfo->storageAPI.functionStore);
×
NEW
2443
      if (code != TSDB_CODE_SUCCESS) {
×
NEW
2444
        goto _error;
×
2445
      }
2446
    }
2447
    
NEW
2448
    int32_t    numOfExpr = 0;
×
NEW
2449
    SExprInfo* pExprInfo = NULL;
×
NEW
2450
    code = createExprInfo(pPhynode->window.pFuncs, NULL, &pExprInfo, &numOfExpr);
×
NEW
2451
    TSDB_CHECK_CODE(code, lino, _error);
×
2452
    
NEW
2453
    code = initAggSup(&pOperator->exprSupp, &pExtW->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str,
×
2454
                              NULL, &pTaskInfo->storageAPI.functionStore);
NEW
2455
    TSDB_CHECK_CODE(code, lino, _error);
×
NEW
2456
    pOperator->exprSupp.hasWindowOrGroup = false;
×
2457
    
2458
    //code = setFunctionResultOutput(pOperator, &pExtW->binfo, &pExtW->aggSup, MAIN_SCAN, numOfExpr);
2459
    //TSDB_CHECK_CODE(code, lino, _error);
2460
    
NEW
2461
    code = filterInitFromNode((SNode*)pNode->pConditions, &pOperator->exprSupp.pFilterInfo, 0,
×
NEW
2462
                              pTaskInfo->pStreamRuntimeInfo);
×
NEW
2463
    TSDB_CHECK_CODE(code, lino, _error);
×
2464
    
NEW
2465
    pExtW->binfo.inputTsOrder = pNode->inputTsOrder;
×
NEW
2466
    pExtW->binfo.outputTsOrder = pNode->outputTsOrder;
×
NEW
2467
    code = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfExpr, &pExtW->pPseudoColInfo);
×
NEW
2468
    TSDB_CHECK_CODE(code, lino, _error);
×
2469

2470
  //if (pExtW->multiTableMode) {
NEW
2471
    pExtW->pOutputBlocks = taosArrayInit_s(POINTER_BYTES, STREAM_CALC_REQ_MAX_WIN_NUM);
×
NEW
2472
    if (!pExtW->pOutputBlocks) QUERY_CHECK_CODE(terrno, lino, _error);
×
2473
  //}
NEW
2474
    pExtW->pFreeBlocks = tdListNew(POINTER_BYTES * 2);
×
NEW
2475
    QUERY_CHECK_NULL(pExtW->pFreeBlocks, code, lino, _error, terrno);  
×
2476
  }
2477

2478
  pExtW->pWins = taosArrayInit(4096, sizeof(SExtWinTimeWindow));
170,689✔
2479
  if (!pExtW->pWins) QUERY_CHECK_CODE(terrno, lino, _error);
170,689✔
2480
  
2481
  //initResultRowInfo(&pExtW->binfo.resultRowInfo);
2482

2483
  pExtW->timeRangeExpr = (STimeRangeNode*)pPhynode->pTimeRange;
170,689✔
2484
  if (pExtW->timeRangeExpr) {
170,689✔
2485
    QUERY_CHECK_NULL(pExtW->timeRangeExpr->pStart, code, lino, _error, TSDB_CODE_STREAM_INTERNAL_ERROR);
170,689✔
2486
    QUERY_CHECK_NULL(pExtW->timeRangeExpr->pEnd, code, lino, _error, TSDB_CODE_STREAM_INTERNAL_ERROR);
170,689✔
2487
  }
2488

2489
  if (pPhynode->isSingleTable) {
170,689✔
2490
    pExtW->getWinFp = (pExtW->timeRangeExpr && (pExtW->timeRangeExpr->needCalc || (pTaskInfo->pStreamRuntimeInfo->funcInfo.addOptions & CALC_SLIDING_OVERLAP))) ? extWinGetOvlpWin : extWinGetNoOvlpWin;
46,075✔
2491
    pExtW->multiTableMode = false;
46,075✔
2492
  } else {
2493
    pExtW->getWinFp = (pExtW->timeRangeExpr && (pExtW->timeRangeExpr->needCalc || (pTaskInfo->pStreamRuntimeInfo->funcInfo.addOptions & CALC_SLIDING_OVERLAP))) ? extWinGetMultiTbOvlpWin : extWinGetMultiTbNoOvlpWin;
124,614✔
2494
    pExtW->multiTableMode = true;
124,614✔
2495
  }
2496
  pExtW->inputHasOrder = pPhynode->inputHasOrder;
170,689✔
2497
  pExtW->orgTableUid = pPhynode->orgTableUid;
170,689✔
2498
  pExtW->orgTableVgId = pPhynode->orgTableVgId;
170,689✔
2499

2500
  pOperator->fpSet = createOperatorFpSet(extWinOpen, extWinNext, NULL, destroyExternalWindowOperatorInfo,
170,689✔
2501
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2502
  setOperatorResetStateFn(pOperator, resetExternalWindowOperator);
170,689✔
2503
  code = appendDownstream(pOperator, &pDownstream, 1);
170,689✔
2504
  if (code != 0) {
170,689✔
NEW
2505
    goto _error;
×
2506
  }
2507

2508
  *pOptrOut = pOperator;
170,689✔
2509
  return code;
170,689✔
2510

NEW
2511
_error:
×
2512

NEW
2513
  if (pExtW != NULL) {
×
NEW
2514
    destroyExternalWindowOperatorInfo(pExtW);
×
2515
  }
2516

NEW
2517
  destroyOperatorAndDownstreams(pOperator, &pDownstream, 1);
×
NEW
2518
  pTaskInfo->code = code;
×
NEW
2519
  qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code));
×
NEW
2520
  return code;
×
2521
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc