• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4975

05 Mar 2026 08:43AM UTC coverage: 68.37% (+0.7%) from 67.664%
#4975

push

travis-ci

web-flow
merge: from main to 3.0 branch #34682

250 of 345 new or added lines in 28 files covered. (72.46%)

446 existing lines in 120 files now uncovered.

210600 of 308032 relevant lines covered (68.37%)

130326818.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.15
/source/libs/executor/src/externalwindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "operator.h"
18
#include "querytask.h"
19
#include "tdatablock.h"
20
#include "stream.h"
21
#include "filter.h"
22
#include "cmdnodes.h"
23

24
typedef struct SBlockList {
25
  const SSDataBlock* pSrcBlock;
26
  SList*             pBlocks;
27
  int32_t            blockRowNumThreshold;
28
} SBlockList;
29

30

31
typedef int32_t (*extWinGetWinFp)(SOperatorInfo*, int64_t*, int32_t*, SDataBlockInfo*, SExtWinTimeWindow**, int32_t*);
32

33
typedef struct SExtWindowStat {
34
  int64_t resBlockCreated;
35
  int64_t resBlockDestroyed;
36
  int64_t resBlockRecycled;
37
  int64_t resBlockReused;
38
  int64_t resBlockAppend;
39
} SExtWindowStat;
40

41
typedef struct SExternalWindowOperator {
42
  SOptrBasicInfo     binfo;
43
  SExprSupp          scalarSupp;
44
  int32_t            primaryTsIndex;
45
  EExtWinMode        mode;
46
  bool               multiTableMode;
47
  bool               inputHasOrder;
48
  SArray*            pWins;           // SArray<SExtWinTimeWindow>
49
  SArray*            pPseudoColInfo;  
50
  STimeRangeNode*    timeRangeExpr;
51
  
52
  extWinGetWinFp     getWinFp;
53

54
  bool               blkWinStartSet;
55
  int32_t            blkWinStartIdx;
56
  int32_t            blkWinIdx;
57
  int32_t            blkRowStartIdx;
58
  int32_t            outputWinId;
59
  int32_t            outputWinNum;
60
  int32_t            outWinIdx;
61

62
  // for project&indefRows
63
  SList*             pFreeBlocks;    // SList<SSDatablock*+SAarray*>
64
  SArray*            pOutputBlocks;  // SArray<SList*>, for each window, we have a list of blocks
65
  SListNode*         pLastBlkNode; 
66
  SSDataBlock*       pTmpBlock;
67
  
68
  // for agg
69
  SAggSupporter      aggSup;
70
  STimeWindowAggSupp twAggSup;
71

72
  int32_t            resultRowCapacity;
73
  SResultRow*        pResultRow;
74

75
  int64_t            lastSKey;
76
  int32_t            lastWinId;
77
  SSDataBlock*       pEmptyInputBlock;
78
  bool               hasCountFunc;
79
  SExtWindowStat     stat;
80
  SArray*            pWinRowIdx;
81

82
  // for vtable window query
83
  bool               isDynWindow;
84
  int32_t            orgTableVgId;
85
  tb_uid_t           orgTableUid;
86
  STimeWindow        orgTableTimeRange;
87
} SExternalWindowOperator;
88

89

90
static int32_t extWinBlockListAddBlock(SExternalWindowOperator* pExtW, SList* pList, int32_t rows, SSDataBlock** ppBlock, SArray** ppIdx) {
1,168✔
91
  SSDataBlock* pRes = NULL;
1,168✔
92
  int32_t code = 0, lino = 0;
1,168✔
93

94
  if (listNEles(pExtW->pFreeBlocks) > 0) {
1,168✔
95
    SListNode* pNode = tdListPopHead(pExtW->pFreeBlocks);
×
96
    *ppBlock = *(SSDataBlock**)pNode->data;
×
97
    *ppIdx = *(SArray**)((SArray**)pNode->data + 1);
×
98
    tdListAppendNode(pList, pNode);
×
99
    pExtW->stat.resBlockReused++;
×
100
  } else {
101
    TAOS_CHECK_EXIT(createOneDataBlock(pExtW->binfo.pRes, false, &pRes));
1,168✔
102
    TAOS_CHECK_EXIT(blockDataEnsureCapacity(pRes, TMAX(rows, 4096)));
1,168✔
103
    SArray* pIdx = taosArrayInit(10, sizeof(int64_t));
1,168✔
104
    TSDB_CHECK_NULL(pIdx, code, lino, _exit, terrno);
1,168✔
105
    void* res[2] = {pRes, pIdx};
1,168✔
106
    TAOS_CHECK_EXIT(tdListAppend(pList, res));
1,168✔
107

108
    *ppBlock = pRes;
1,168✔
109
    *ppIdx = pIdx;
1,168✔
110
    pExtW->stat.resBlockCreated++;
1,168✔
111
  }
112
  
113
_exit:
1,168✔
114

115
  if (code) {
1,168✔
116
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
117
    blockDataDestroy(pRes);
×
118
  }
119
  
120
  return code;
1,168✔
121
}
122

123
static int32_t extWinGetLastBlockFromList(SExternalWindowOperator* pExtW, SList* pList, int32_t rows, SSDataBlock** ppBlock, SArray** ppIdx) {
2,506✔
124
  int32_t    code = 0, lino = 0;
2,506✔
125
  SSDataBlock* pRes = NULL;
2,506✔
126

127
  SListNode* pNode = TD_DLIST_TAIL(pList);
2,506✔
128
  if (NULL == pNode) {
2,506✔
129
    TAOS_CHECK_EXIT(extWinBlockListAddBlock(pExtW, pList, rows, ppBlock, ppIdx));
1,168✔
130
    return code;
1,168✔
131
  }
132

133
  pRes = *(SSDataBlock**)pNode->data;
1,338✔
134
  if ((pRes->info.rows + rows) > pRes->info.capacity) {
1,338✔
135
    TAOS_CHECK_EXIT(extWinBlockListAddBlock(pExtW, pList, rows, ppBlock, ppIdx));
×
136
    return code;
×
137
  }
138

139
  *ppIdx = *(SArray**)((SSDataBlock**)pNode->data + 1);
1,338✔
140
  *ppBlock = pRes;
1,338✔
141
  pExtW->stat.resBlockAppend++;
1,338✔
142

143
_exit:
1,338✔
144

145
  if (code) {
1,338✔
146
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
147
  }
148
  
149
  return code;
1,338✔
150
}
151

152
static void extWinDestroyBlockList(void* p) {
38,584,320✔
153
  if (NULL == p) {
38,584,320✔
154
    return;
×
155
  }
156

157
  SListNode* pTmp = NULL;
38,584,320✔
158
  SList** ppList = (SList**)p;
38,584,320✔
159
  if ((*ppList) && TD_DLIST_NELES(*ppList) > 0) {
38,584,320✔
160
    SListNode* pNode = TD_DLIST_HEAD(*ppList);
×
161
    while (pNode) {
×
162
      SSDataBlock* pBlock = *(SSDataBlock**)pNode->data;
×
163
      blockDataDestroy(pBlock);
×
164
      SArray* pIdx = *(SArray**)((SArray**)pNode->data + 1);
×
165
      taosArrayDestroy(pIdx);
×
166
      pTmp = pNode;
×
167
      pNode = pNode->dl_next_;
×
168
      taosMemoryFree(pTmp);
×
169
    }
170
  }
171
  taosMemoryFree(*ppList);
38,584,320✔
172
}
173

174

175
static void extWinRecycleBlkNode(SExternalWindowOperator* pExtW, SListNode** ppNode) {
2,202,846✔
176
  if (NULL == ppNode || NULL == *ppNode) {
2,202,846✔
177
    return;
2,202,844✔
178
  }
179

180
  SSDataBlock* pBlock = *(SSDataBlock**)(*ppNode)->data;
2✔
181
  SArray* pIdx = *(SArray**)((SArray**)(*ppNode)->data + 1);
226✔
182
  
183
  if (listNEles(pExtW->pFreeBlocks) >= 10) {
226✔
184
    blockDataDestroy(pBlock);
×
185
    taosArrayDestroy(pIdx);
×
186
    taosMemoryFreeClear(*ppNode);
×
187
    pExtW->stat.resBlockDestroyed++;
×
188
    return;
×
189
  }
190
  
191
  blockDataCleanup(pBlock);
226✔
192
  taosArrayClear(pIdx);
226✔
193
  tdListPrependNode(pExtW->pFreeBlocks, *ppNode);
226✔
194
  *ppNode = NULL;
226✔
195
  pExtW->stat.resBlockRecycled++;
226✔
196
}
197

198
static void extWinRecycleBlockList(SExternalWindowOperator* pExtW, void* p) {
1,168✔
199
  if (NULL == p) {
1,168✔
200
    return;
×
201
  }
202

203
  SListNode* pTmp = NULL;
1,168✔
204
  SList** ppList = (SList**)p;
1,168✔
205
  if ((*ppList) && TD_DLIST_NELES(*ppList) > 0) {
1,168✔
206
    SListNode* pNode = TD_DLIST_HEAD(*ppList);
×
207
    while (pNode) {
×
208
      pTmp = pNode;
×
209
      pNode = pNode->dl_next_;
×
210
      extWinRecycleBlkNode(pExtW, &pTmp);
×
211
    }
212
  }
213
  taosMemoryFree(*ppList);
1,168✔
214
}
215
static void extWinDestroyBlkNode(SExternalWindowOperator* pInfo, SListNode* pNode) {
1,555,491✔
216
  if (NULL == pNode) {
1,555,491✔
217
    return;
1,554,323✔
218
  }
219

220
  SSDataBlock* pBlock = *(SSDataBlock**)pNode->data;
1,168✔
221
  SArray* pIdx = *(SArray**)((SArray**)pNode->data + 1);
1,168✔
222
  
223
  blockDataDestroy(pBlock);
1,168✔
224
  taosArrayDestroy(pIdx);
1,168✔
225

226
  taosMemoryFree(pNode);
1,168✔
227

228
  pInfo->stat.resBlockDestroyed++;
1,168✔
229
}
230

231

232
void destroyExternalWindowOperatorInfo(void* param) {
1,555,265✔
233
  if (NULL == param) {
1,555,265✔
234
    return;
×
235
  }
236
  SExternalWindowOperator* pInfo = (SExternalWindowOperator*)param;
1,555,265✔
237
  cleanupBasicInfo(&pInfo->binfo);
1,555,265✔
238

239
  taosArrayDestroyEx(pInfo->pOutputBlocks, extWinDestroyBlockList);
1,555,265✔
240
  taosArrayDestroy(pInfo->pWins);
1,555,265✔
241
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
1,555,265✔
242
  taosArrayDestroy(pInfo->pWinRowIdx);
1,555,265✔
243
  
244
  taosArrayDestroy(pInfo->pPseudoColInfo);
1,555,265✔
245
  blockDataDestroy(pInfo->pTmpBlock);
1,555,265✔
246
  blockDataDestroy(pInfo->pEmptyInputBlock);
1,555,265✔
247

248
  extWinDestroyBlkNode(pInfo, pInfo->pLastBlkNode);
1,555,265✔
249
  if (pInfo->pFreeBlocks) {
1,555,265✔
250
    SListNode *node;
251
    while ((node = TD_DLIST_HEAD(pInfo->pFreeBlocks)) != NULL) {
1,168✔
252
      TD_DLIST_POP(pInfo->pFreeBlocks, node);
226✔
253
      extWinDestroyBlkNode(pInfo, node);
226✔
254
    }
255
    taosMemoryFree(pInfo->pFreeBlocks);
942✔
256
  }
257
  
258
  cleanupAggSup(&pInfo->aggSup);
1,555,265✔
259
  cleanupExprSupp(&pInfo->scalarSupp);
1,555,265✔
260
  taosMemoryFreeClear(pInfo->pResultRow);
1,555,265✔
261

262
  pInfo->binfo.resultRowInfo.openWindow = tdListFree(pInfo->binfo.resultRowInfo.openWindow);
1,555,265✔
263

264
  qDebug("ext window stat at destroy, created:%" PRId64 ", destroyed:%" PRId64 ", recycled:%" PRId64 ", reused:%" PRId64 ", append:%" PRId64, 
1,555,265✔
265
      pInfo->stat.resBlockCreated, pInfo->stat.resBlockDestroyed, pInfo->stat.resBlockRecycled, 
266
      pInfo->stat.resBlockReused, pInfo->stat.resBlockAppend);
267

268
  taosMemoryFreeClear(pInfo);
1,555,265✔
269
}
270

271
static int32_t extWinOpen(SOperatorInfo* pOperator);
272
static int32_t extWinNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
273

274
typedef struct SMergeAlignedExternalWindowOperator {
275
  SExternalWindowOperator* pExtW;
276
  int64_t curTs;
277
  SResultRow*  pResultRow;
278
} SMergeAlignedExternalWindowOperator;
279

280
void destroyMergeAlignedExternalWindowOperator(void* pOperator) {
15,514✔
281
  SMergeAlignedExternalWindowOperator* pMlExtInfo = (SMergeAlignedExternalWindowOperator*)pOperator;
15,514✔
282
  destroyExternalWindowOperatorInfo(pMlExtInfo->pExtW);
15,514✔
283
  taosMemoryFreeClear(pMlExtInfo);
15,514✔
284
}
15,514✔
285

286
int64_t* extWinExtractTsCol(SSDataBlock* pBlock, int32_t primaryTsIndex, SExecTaskInfo* pTaskInfo) {
5,829,146✔
287
  TSKEY* tsCols = NULL;
5,829,146✔
288

289
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
5,829,146✔
290
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, primaryTsIndex);
5,830,930✔
291
    if (!pColDataInfo) {
5,830,930✔
292
      pTaskInfo->code = terrno;
×
293
      T_LONG_JMP(pTaskInfo->env, terrno);
×
294
    }
295

296
    tsCols = (int64_t*)pColDataInfo->pData;
5,830,930✔
297
    if (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0) {
5,830,930✔
298
      int32_t code = blockDataUpdateTsWindow(pBlock, primaryTsIndex);
5,830,261✔
299
      if (code != TSDB_CODE_SUCCESS) {
5,830,499✔
300
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
669✔
301
        pTaskInfo->code = code;
669✔
302
        T_LONG_JMP(pTaskInfo->env, code);
×
303
      }
304
    }
305
  }
306

307
  return tsCols;
5,830,499✔
308
}
309

310
static int32_t extWinGetCurWinIdx(SExecTaskInfo* pTaskInfo) {
2,147,483,647✔
311
  if (!pTaskInfo->pStreamRuntimeInfo) {
2,147,483,647✔
312
    return 0;
2,147,483,647✔
313
  }
314
  return pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx;
55,405,421✔
315
}
316

317
static void extWinIncCurWinIdx(SOperatorInfo* pOperator) {
×
318
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
×
319
  pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx++;
×
320
}
×
321

322
static void extWinSetCurWinIdx(SOperatorInfo* pOperator, int32_t idx) {
2,147,483,647✔
323
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2,147,483,647✔
324
  if (pTaskInfo->pStreamRuntimeInfo) {
2,147,483,647✔
325
    pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx = idx;
40,501,623✔
326
  }
327
}
2,147,483,647✔
328

329

330
static void extWinIncCurWinOutIdx(SStreamRuntimeInfo* pStreamRuntimeInfo) {
1,168✔
331
  pStreamRuntimeInfo->funcInfo.curOutIdx++;
1,168✔
332
}
1,168✔
333

334

335
static const STimeWindow* extWinGetNextWin(SExternalWindowOperator* pExtW, SExecTaskInfo* pTaskInfo) {
×
336
  int32_t curIdx = extWinGetCurWinIdx(pTaskInfo);
×
337
  if (curIdx + 1 >= pExtW->pWins->size) return NULL;
×
338
  return taosArrayGet(pExtW->pWins, curIdx + 1);
×
339
}
340

341

342
static int32_t extWinAppendWinIdx(SExecTaskInfo*       pTaskInfo, SArray* pIdx, SSDataBlock* pBlock, int32_t currWinIdx, int32_t rows) {
2,147,483,647✔
343
  int32_t  code = 0, lino = 0;
2,147,483,647✔
344
  int64_t* lastRes = taosArrayGetLast(pIdx);
2,147,483,647✔
345
  int32_t* lastWinIdx = (int32_t*)lastRes;
2,147,483,647✔
346
  int32_t* lastRowIdx = lastWinIdx ? (lastWinIdx + 1) : NULL;
2,147,483,647✔
347
  int64_t  res = 0;
2,147,483,647✔
348
  int32_t* pWinIdx = (int32_t*)&res;
2,147,483,647✔
349
  int32_t* pRowIdx = pWinIdx + 1;
2,147,483,647✔
350

351
  if (lastWinIdx && *lastWinIdx == currWinIdx) {
2,147,483,647✔
352
    return code;
2,147,483,647✔
353
  }
354

355
  *pWinIdx = currWinIdx;
16,618,066✔
356
  *pRowIdx = pBlock->info.rows - rows;
16,616,129✔
357

358
  TSDB_CHECK_NULL(taosArrayPush(pIdx, &res), code, lino, _exit, terrno);
16,613,537✔
359

360
_exit:
16,613,537✔
361

362
  if (code) {
16,612,673✔
363
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
364
  }
365

366
  return code;
16,609,649✔
367
}
368

369
static int32_t extWinRebuildWinIdxByFilter(SExecTaskInfo* pTaskInfo, SArray* pIdx, int32_t rowsBeforeFilter,
224✔
370
                                           SColumnInfoData* pFilterRes) {
371
  int32_t code = TSDB_CODE_SUCCESS;
224✔
372
  int32_t lino = 0;
224✔
373

374
  if (pIdx == NULL || pFilterRes == NULL || rowsBeforeFilter <= 0 || taosArrayGetSize(pIdx) == 0) {
224✔
NEW
375
    return code;
×
376
  }
377

378
  int32_t idxSize = (int32_t)taosArrayGetSize(pIdx);
224✔
379
  SArray* pNewIdx = taosArrayInit(idxSize, sizeof(int64_t));
224✔
380
  TSDB_CHECK_NULL(pNewIdx, code, lino, _exit, terrno);
224✔
381

382
  int8_t* pIndicator = (int8_t*)pFilterRes->pData;
224✔
383
  int32_t newRowStart = 0;
224✔
384
  for (int32_t i = 0; i < idxSize; ++i) {
448✔
385
    int64_t cur = *(int64_t*)taosArrayGet(pIdx, i);
224✔
386
    int32_t* pCurWinIdx = (int32_t*)&cur;
224✔
387
    int32_t* pCurRowIdx = pCurWinIdx + 1;
224✔
388

389
    int32_t startRow = *pCurRowIdx;
224✔
390
    int32_t endRow = rowsBeforeFilter;
224✔
391
    if (i + 1 < idxSize) {
224✔
NEW
392
      int64_t next = *(int64_t*)taosArrayGet(pIdx, i + 1);
×
NEW
393
      int32_t* pNextWinIdx = (int32_t*)&next;
×
NEW
394
      int32_t* pNextRowIdx = pNextWinIdx + 1;
×
NEW
395
      endRow = *pNextRowIdx;
×
396
    }
397

398
    startRow = TMIN(TMAX(startRow, 0), rowsBeforeFilter);
224✔
399
    endRow = TMIN(TMAX(endRow, 0), rowsBeforeFilter);
224✔
400
    if (endRow <= startRow) {
224✔
NEW
401
      continue;
×
402
    }
403

404
    int32_t survivedRows = 0;
224✔
405
    for (int32_t r = startRow; r < endRow; ++r) {
448✔
406
      if (pIndicator[r]) {
224✔
NEW
407
        survivedRows++;
×
408
      }
409
    }
410

411
    if (survivedRows <= 0) {
224✔
412
      continue;
224✔
413
    }
414

NEW
415
    int64_t out = 0;
×
NEW
416
    int32_t* pOutWinIdx = (int32_t*)&out;
×
NEW
417
    int32_t* pOutRowIdx = pOutWinIdx + 1;
×
NEW
418
    *pOutWinIdx = *pCurWinIdx;
×
NEW
419
    *pOutRowIdx = newRowStart;
×
NEW
420
    TSDB_CHECK_NULL(taosArrayPush(pNewIdx, &out), code, lino, _exit, terrno);
×
421

NEW
422
    newRowStart += survivedRows;
×
423
  }
424

425
  taosArrayClear(pIdx);
224✔
426
  int32_t newSize = (int32_t)taosArrayGetSize(pNewIdx);
224✔
427
  if (newSize > 0) {
224✔
NEW
428
    void* dest = taosArrayReserve(pIdx, newSize);
×
NEW
429
    TSDB_CHECK_NULL(dest, code, lino, _exit, terrno);
×
NEW
430
    memcpy(dest, pNewIdx->pData, (size_t)newSize * sizeof(int64_t));
×
431
  }
432

433
_exit:
224✔
434
  taosArrayDestroy(pNewIdx);
224✔
435
  if (code != TSDB_CODE_SUCCESS) {
224✔
NEW
436
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
437
  }
438
  return code;
224✔
439
}
440

441

442
static int32_t mergeAlignExtWinSetOutputBuf(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, const STimeWindow* pWin, SResultRow** pResult,
2,778,552✔
443
                                       SExprSupp* pExprSup, SAggSupporter* pAggSup) {
444
  if (*pResult == NULL) {
2,778,552✔
445
    *pResult = getNewResultRow(pAggSup->pResultBuf, &pAggSup->currentPageId, pAggSup->resultRowSize);
15,948✔
446
    if (!*pResult) {
15,948✔
447
      qError("get new resultRow failed, err:%s", tstrerror(terrno));
×
448
      return terrno;
×
449
    }
450
    pResultRowInfo->cur = (SResultRowPosition){.pageId = (*pResult)->pageId, .offset = (*pResult)->offset};
15,948✔
451
  }
452
  
453
  (*pResult)->win = *pWin;
2,778,552✔
454
  (*pResult)->winIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
2,778,552✔
455
  
456
  return setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
2,778,552✔
457
}
458

459

460
static int32_t mergeAlignExtWinGetWinFromTs(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW, TSKEY ts, STimeWindow** ppWin) {
2,774,664✔
461
  int32_t blkWinIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
2,774,664✔
462
  
463
  // TODO handle desc order
464
  for (int32_t i = blkWinIdx; i < pExtW->pWins->size; ++i) {
5,534,244✔
465
    STimeWindow* pWin = taosArrayGet(pExtW->pWins, i);
5,531,652✔
466
    if (ts == pWin->skey) {
5,532,300✔
467
      extWinSetCurWinIdx(pOperator, i);
2,777,040✔
468
      *ppWin = pWin;
2,776,824✔
469
      return TSDB_CODE_SUCCESS;
2,776,608✔
470
    } else if (ts < pWin->skey) {
2,756,124✔
UNCOV
471
      qError("invalid ts %" PRId64 " for current window idx %d skey %" PRId64, ts, i, pWin->skey);
×
472
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
473
    }
474
  }
475
  
476
  qError("invalid ts %" PRId64 " to find merge aligned ext window, size:%d", ts, (int32_t)pExtW->pWins->size);
×
477
  return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
478
}
479

480
static int32_t mergeAlignExtWinFinalizeResult(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, SSDataBlock* pResultBlock) {
2,772,720✔
481
  int32_t        code = 0, lino = 0;
2,772,720✔
482
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
2,772,720✔
483
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
2,772,936✔
484
  SExprSupp*     pSup = &pOperator->exprSupp;
2,772,936✔
485
  SResultRow*  pResultRow = pMlExtInfo->pResultRow;
2,772,936✔
486
  
487
  finalizeResultRows(pExtW->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pOperator->pTaskInfo);
2,773,152✔
488
  
489
  if (pResultRow->numOfRows > 0) {
2,772,720✔
490
    TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pExtW->pWinRowIdx, pResultBlock, pResultRow->winIdx, pResultRow->numOfRows));
2,772,288✔
491
  }
492

493
_exit:
2,767,968✔
494

495
  if (code) {
2,767,968✔
496
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
497
  }
498

499
  return code;
2,766,672✔
500
}
501

502
static int32_t mergeAlignExtWinAggDo(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock, SSDataBlock* pResultBlock) {
23,076✔
503
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
23,076✔
504
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
23,076✔
505

506
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
23,076✔
507
  SExprSupp*     pSup = &pOperator->exprSupp;
23,076✔
508
  int32_t        code = 0, lino = 0;
23,076✔
509
  STimeWindow *pWin = NULL;
23,076✔
510

511
  int32_t startPos = 0;
23,076✔
512
  int64_t* tsCols = extWinExtractTsCol(pBlock, pExtW->primaryTsIndex, pTaskInfo);
23,076✔
513
  TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
23,076✔
514
  
515
  code = mergeAlignExtWinGetWinFromTs(pOperator, pExtW, ts, &pWin);
23,076✔
516
  if (code) {
23,076✔
517
    qError("failed to get time window for ts:%" PRId64 ", prim ts index:%d, error:%s", ts, pExtW->primaryTsIndex, tstrerror(code));
×
518
    TAOS_CHECK_EXIT(code);
×
519
  }
520

521
  if (pMlExtInfo->curTs != INT64_MIN && pMlExtInfo->curTs != pWin->skey) {
23,076✔
522
    TAOS_CHECK_EXIT(mergeAlignExtWinFinalizeResult(pOperator, pResultRowInfo, pResultBlock));
1,944✔
523
    resetResultRow(pMlExtInfo->pResultRow, pExtW->aggSup.resultRowSize - sizeof(SResultRow));
1,944✔
524
  }
525
  
526
  TAOS_CHECK_EXIT(mergeAlignExtWinSetOutputBuf(pOperator, pResultRowInfo, pWin, &pMlExtInfo->pResultRow, pSup, &pExtW->aggSup));
23,076✔
527

528
  int32_t currPos = startPos;
23,076✔
529
  pMlExtInfo->curTs = pWin->skey;
23,076✔
530
  
531
  while (++currPos < pBlock->info.rows) {
8,325,024✔
532
    if (tsCols[currPos] == pMlExtInfo->curTs) continue;
8,301,732✔
533

534
    qDebug("current ts:%" PRId64 ", startPos:%d, currPos:%d, tsCols[currPos]:%" PRId64,
2,755,476✔
535
      pMlExtInfo->curTs, startPos, currPos, tsCols[currPos]); 
536
    TAOS_CHECK_EXIT(applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pExtW->twAggSup.timeWindowData, startPos,
2,755,476✔
537
                                           currPos - startPos, pBlock->info.rows, pSup->numOfExprs));
538

539
    TAOS_CHECK_EXIT(mergeAlignExtWinFinalizeResult(pOperator, pResultRowInfo, pResultBlock));
2,755,044✔
540
    resetResultRow(pMlExtInfo->pResultRow, pExtW->aggSup.resultRowSize - sizeof(SResultRow));
2,749,860✔
541

542
    TAOS_CHECK_EXIT(mergeAlignExtWinGetWinFromTs(pOperator, pExtW, tsCols[currPos], &pWin));
2,748,564✔
543
    
544
    qDebug("ext window align2 start:%" PRId64 ", end:%" PRId64, pWin->skey, pWin->ekey);
2,753,748✔
545
    startPos = currPos;
2,755,692✔
546
    
547
    TAOS_CHECK_EXIT(mergeAlignExtWinSetOutputBuf(pOperator, pResultRowInfo, pWin, &pMlExtInfo->pResultRow, pSup, &pExtW->aggSup));
2,755,692✔
548

549
    pMlExtInfo->curTs = pWin->skey;
2,755,476✔
550
  }
551

552
  code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pExtW->twAggSup.timeWindowData, startPos,
46,152✔
553
                                         currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
23,076✔
554

555
_exit:
23,076✔
556

557
  if (code != 0) {
23,076✔
558
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
559
    T_LONG_JMP(pTaskInfo->env, code);
×
560
  }
561
  
562
  return code;
23,076✔
563
}
564

565
static int32_t mergeAlignExtWinBuildWinRowIdx(SOperatorInfo* pOperator, SSDataBlock* pInput, SSDataBlock* pResult) {
×
566
  SExternalWindowOperator* pExtW = pOperator->info;
×
567
  int64_t* tsCols = extWinExtractTsCol(pInput, pExtW->primaryTsIndex, pOperator->pTaskInfo);
×
568
  STimeWindow* pWin = NULL;
×
569
  int32_t code = 0, lino = 0;
×
570
  int64_t prevTs = INT64_MIN;
×
571
  
572
  for (int32_t i = 0; i < pInput->info.rows; ++i) {
×
573
    if (prevTs == tsCols[i]) {
×
574
      continue;
×
575
    }
576
    
577
    TAOS_CHECK_EXIT(mergeAlignExtWinGetWinFromTs(pOperator, pExtW, tsCols[i], &pWin));
×
578
    TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pExtW->pWinRowIdx, pResult, extWinGetCurWinIdx(pOperator->pTaskInfo), pInput->info.rows - i));
×
579

580
    prevTs = tsCols[i];
×
581
  }
582

583
_exit:
×
584

585
  if (code != 0) {
×
586
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
587
  }
588

589
  return code;  
×
590
}
591

592
static int32_t mergeAlignExtWinProjectDo(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
×
593
                                            SSDataBlock* pResultBlock) {
594
  SExternalWindowOperator* pExtW = pOperator->info;
×
595
  SExprSupp*               pExprSup = &pExtW->scalarSupp;
×
596
  int32_t                  code = 0, lino = 0;
×
597
  
598
  TAOS_CHECK_EXIT(projectApplyFunctions(pExprSup->pExprInfo, pResultBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
×
599
                        GET_STM_RTINFO(pOperator->pTaskInfo)));
600

601
  TAOS_CHECK_EXIT(mergeAlignExtWinBuildWinRowIdx(pOperator, pBlock, pResultBlock));
×
602

603
_exit:
×
604

605
  if (code != 0) {
×
606
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
607
  }
608

609
  return code;
×
610
}
611

612
void mergeAlignExtWinDo(SOperatorInfo* pOperator) {
17,030✔
613
  SExecTaskInfo*                       pTaskInfo = pOperator->pTaskInfo;
17,030✔
614
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
17,030✔
615
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
17,030✔
616
  SResultRow*                          pResultRow = NULL;
17,030✔
617
  int32_t                              code = 0;
17,030✔
618
  SSDataBlock*                         pRes = pExtW->binfo.pRes;
17,030✔
619
  SExprSupp*                           pSup = &pOperator->exprSupp;
17,030✔
620
  int32_t                              lino = 0;
17,030✔
621

622
  taosArrayClear(pExtW->pWinRowIdx);
17,030✔
623
  blockDataCleanup(pRes);
17,030✔
624

625
  while (1) {
22,428✔
626
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
39,458✔
627

628
    if (pBlock == NULL) {
39,458✔
629
      // close last time window
630
      if (pMlExtInfo->curTs != INT64_MIN && EEXT_MODE_AGG == pExtW->mode) {
16,382✔
631
        TAOS_CHECK_EXIT(mergeAlignExtWinFinalizeResult(pOperator, &pExtW->binfo.resultRowInfo, pRes));
15,948✔
632
      }
633
      setOperatorCompleted(pOperator);
16,382✔
634
      break;
16,382✔
635
    }
636

637
    pRes->info.scanFlag = pBlock->info.scanFlag;
23,076✔
638
    code = setInputDataBlock(pSup, pBlock, pExtW->binfo.inputTsOrder, pBlock->info.scanFlag, true);
23,076✔
639
    QUERY_CHECK_CODE(code, lino, _exit);
23,076✔
640

641
    printDataBlock(pBlock, __func__, "externalwindowAlign", pTaskInfo->id.queryId);
23,076✔
642
    qDebug("ext windowpExtWAlign->scalarMode:%d", pExtW->mode);
23,076✔
643

644
    if (EEXT_MODE_SCALAR == pExtW->mode) {
23,076✔
645
      TAOS_CHECK_EXIT(mergeAlignExtWinProjectDo(pOperator, &pExtW->binfo.resultRowInfo, pBlock, pRes));
×
646
    } else {
647
      TAOS_CHECK_EXIT(mergeAlignExtWinAggDo(pOperator, &pExtW->binfo.resultRowInfo, pBlock, pRes));
23,076✔
648
    }
649

650
    if (pRes->info.rows >= pOperator->resultInfo.threshold) {
23,076✔
651
      break;
648✔
652
    }
653
  }
654

655
  pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamBlkWinIdx = pExtW->pWinRowIdx;
17,030✔
656
  
657
_exit:
17,030✔
658

659
  if (code != 0) {
17,030✔
660
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
661
    pTaskInfo->code = code;
×
662
    T_LONG_JMP(pTaskInfo->env, code);
×
663
  }
664
}
17,030✔
665

666
static int32_t mergeAlignExtWinNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
32,978✔
667
  SExecTaskInfo*                       pTaskInfo = pOperator->pTaskInfo;
32,978✔
668
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
32,978✔
669
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
32,978✔
670
  int32_t                              code = 0;
32,978✔
671
  int32_t lino = 0;
32,978✔
672

673
  if (pOperator->status == OP_EXEC_DONE) {
32,978✔
674
    (*ppRes) = NULL;
15,948✔
675
    return TSDB_CODE_SUCCESS;
15,948✔
676
  }
677

678
  SSDataBlock* pRes = pExtW->binfo.pRes;
17,030✔
679
  blockDataCleanup(pRes);
17,030✔
680

681
  if (taosArrayGetSize(pExtW->pWins) <= 0) {
17,030✔
682
    size_t size = taosArrayGetSize(pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPesudoFuncVals);
16,382✔
683
    STimeWindow* pWin = taosArrayReserve(pExtW->pWins, size);
16,382✔
684
    TSDB_CHECK_NULL(pWin, code, lino, _exit, terrno);
16,382✔
685

686
    for (int32_t i = 0; i < size; ++i) {
2,790,184✔
687
      SSTriggerCalcParam* pParam = taosArrayGet(pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPesudoFuncVals, i);
2,773,802✔
688
      pWin[i].skey = pParam->wstart;
2,773,802✔
689
      pWin[i].ekey = pParam->wstart + 1;
2,773,802✔
690
    }
691
    
692
    pExtW->outputWinId = pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx;
16,382✔
693
  }
694

695
  mergeAlignExtWinDo(pOperator);
17,030✔
696
  
697
  size_t rows = pRes->info.rows;
17,030✔
698
  pOperator->resultInfo.totalRows += rows;
17,030✔
699
  (*ppRes) = (rows == 0) ? NULL : pRes;
17,030✔
700

701
_exit:
17,030✔
702

703
  if (code != 0) {
17,030✔
704
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
705
    pTaskInfo->code = code;
×
706
    T_LONG_JMP(pTaskInfo->env, code);
×
707
  }
708
  return code;
17,030✔
709
}
710

711
int32_t resetMergeAlignedExtWinOperator(SOperatorInfo* pOperator) {
17,250✔
712
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
17,250✔
713
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
17,250✔
714
  SExecTaskInfo*                       pTaskInfo = pOperator->pTaskInfo;
17,250✔
715
  SMergeAlignedIntervalPhysiNode * pPhynode = (SMergeAlignedIntervalPhysiNode*)pOperator->pPhyNode;
17,250✔
716
  pOperator->status = OP_NOT_OPENED;
17,250✔
717

718
  taosArrayClear(pExtW->pWins);
17,250✔
719

720
  resetBasicOperatorState(&pExtW->binfo);
17,250✔
721
  pMlExtInfo->pResultRow = NULL;
17,250✔
722
  pMlExtInfo->curTs = INT64_MIN;
17,250✔
723

724
  int32_t code = resetAggSup(&pOperator->exprSupp, &pExtW->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
34,500✔
725
                             sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
17,250✔
726
                             &pTaskInfo->storageAPI.functionStore);
727
  if (code == 0) {
17,250✔
728
    colDataDestroy(&pExtW->twAggSup.timeWindowData);
17,250✔
729
    code = initExecTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pTaskInfo->window);
17,250✔
730
  }
731
  return code;
17,250✔
732
}
733

734
int32_t createMergeAlignedExternalWindowOperator(SOperatorInfo* pDownstream, SPhysiNode* pNode,
15,514✔
735
                                                 SExecTaskInfo* pTaskInfo, SOperatorInfo** ppOptrOut) {
736
  SMergeAlignedIntervalPhysiNode* pPhynode = (SMergeAlignedIntervalPhysiNode*)pNode;
15,514✔
737
  int32_t code = 0;
15,514✔
738
  int32_t lino = 0;
15,514✔
739
  SMergeAlignedExternalWindowOperator* pMlExtInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedExternalWindowOperator));
15,514✔
740
  SOperatorInfo*                       pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
15,514✔
741

742
  if (pTaskInfo->pStreamRuntimeInfo != NULL){
15,514✔
743
    pTaskInfo->pStreamRuntimeInfo->funcInfo.withExternalWindow = true;
15,514✔
744
  }
745
  pOperator->pPhyNode = pNode;
15,514✔
746
  if (!pMlExtInfo || !pOperator) {
15,514✔
747
    code = terrno;
×
748
    goto _error;
×
749
  }
750

751
  pMlExtInfo->pExtW = taosMemoryCalloc(1, sizeof(SExternalWindowOperator));
15,514✔
752
  if (!pMlExtInfo->pExtW) {
15,514✔
753
    code = terrno;
×
754
    goto _error;
×
755
  }
756

757
  SExternalWindowOperator* pExtW = pMlExtInfo->pExtW;
15,514✔
758
  SExprSupp* pSup = &pOperator->exprSupp;
15,514✔
759
  pSup->hasWindowOrGroup = true;
15,514✔
760
  pSup->hasWindow = true;
15,514✔
761
  pMlExtInfo->curTs = INT64_MIN;
15,514✔
762

763
  pExtW->primaryTsIndex = ((SColumnNode*)pPhynode->window.pTspk)->slotId;
15,514✔
764
  pExtW->mode = pPhynode->window.pProjs ? EEXT_MODE_SCALAR : EEXT_MODE_AGG;
15,514✔
765
  pExtW->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC;
15,514✔
766
  pExtW->binfo.outputTsOrder = pExtW->binfo.inputTsOrder;
15,514✔
767

768
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
15,514✔
769
  initResultSizeInfo(&pOperator->resultInfo, 4096);
15,514✔
770

771
  int32_t num = 0;
15,514✔
772
  SExprInfo* pExprInfo = NULL;
15,514✔
773
  code = createExprInfo(pPhynode->window.pFuncs, NULL, &pExprInfo, &num);
15,514✔
774
  QUERY_CHECK_CODE(code, lino, _error);
15,514✔
775

776
  if (pExtW->mode == EEXT_MODE_AGG) {
15,514✔
777
    code = initAggSup(pSup, &pExtW->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
15,514✔
778
                      &pTaskInfo->storageAPI.functionStore);
779
    QUERY_CHECK_CODE(code, lino, _error);
15,514✔
780
  }
781

782
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhynode->window.node.pOutputDataBlockDesc);
15,514✔
783
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
15,514✔
784
  initBasicInfo(&pExtW->binfo, pResBlock);
15,514✔
785

786
  pExtW->pWins = taosArrayInit(4096, sizeof(STimeWindow));
15,514✔
787
  if (!pExtW->pWins) QUERY_CHECK_CODE(terrno, lino, _error);
15,514✔
788

789
  pExtW->pWinRowIdx = taosArrayInit(4096, sizeof(int64_t));
15,514✔
790
  TSDB_CHECK_NULL(pExtW->pWinRowIdx, code, lino, _error, terrno);
15,514✔
791

792
  initResultRowInfo(&pExtW->binfo.resultRowInfo);
15,514✔
793
  code = blockDataEnsureCapacity(pExtW->binfo.pRes, pOperator->resultInfo.capacity);
15,514✔
794
  QUERY_CHECK_CODE(code, lino, _error);
15,514✔
795
  setOperatorInfo(pOperator, "MergeAlignedExternalWindowOperator", QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW, false, OP_NOT_OPENED, pMlExtInfo, pTaskInfo);
15,514✔
796
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignExtWinNext, NULL,
15,514✔
797
                                         destroyMergeAlignedExternalWindowOperator, optrDefaultBufFn, NULL,
798
                                         optrDefaultGetNextExtFn, NULL);
799
  setOperatorResetStateFn(pOperator, resetMergeAlignedExtWinOperator);
15,514✔
800

801
  code = appendDownstream(pOperator, &pDownstream, 1);
15,514✔
802
  QUERY_CHECK_CODE(code, lino, _error);
15,514✔
803
  *ppOptrOut = pOperator;
15,514✔
804
  return code;
15,514✔
805
  
806
_error:
×
807
  if (pMlExtInfo) destroyMergeAlignedExternalWindowOperator(pMlExtInfo);
×
808
  destroyOperatorAndDownstreams(pOperator, &pDownstream, 1);
×
809
  pTaskInfo->code = code;
×
810
  return code;
×
811
}
812

813
static int32_t resetExternalWindowExprSupp(SExternalWindowOperator* pExtW, SExecTaskInfo* pTaskInfo,
282,584✔
814
                                           SExternalWindowPhysiNode* pPhynode) {
815
  int32_t    code = 0, lino = 0, num = 0;
282,584✔
816
  SExprInfo* pExprInfo = NULL;
282,584✔
817
  cleanupExprSuppWithoutFilter(&pExtW->scalarSupp);
282,808✔
818

819
  SNodeList* pNodeList = NULL;
282,611✔
820
  if (pPhynode->window.pProjs) {
282,611✔
821
    pNodeList = pPhynode->window.pProjs;
×
822
  } else {
823
    pNodeList = pPhynode->window.pExprs;
282,835✔
824
  }
825

826
  code = createExprInfo(pNodeList, NULL, &pExprInfo, &num);
282,835✔
827
  QUERY_CHECK_CODE(code, lino, _error);
282,835✔
828
  code = initExprSupp(&pExtW->scalarSupp, pExprInfo, num, &pTaskInfo->storageAPI.functionStore);
282,835✔
829
  QUERY_CHECK_CODE(code, lino, _error);
282,835✔
830
  return code;
282,835✔
831
_error:
×
832
  if (code != TSDB_CODE_SUCCESS) {
×
833
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
834
    pTaskInfo->code = code;
×
835
  }
836
  return code;
×
837
}
838

839
static int32_t resetExternalWindowOperator(SOperatorInfo* pOperator) {
282,835✔
840
  int32_t code = 0, lino = 0;
282,835✔
841
  SExternalWindowOperator* pExtW = pOperator->info;
282,835✔
842
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
282,835✔
843
  SExternalWindowPhysiNode* pPhynode = (SExternalWindowPhysiNode*)pOperator->pPhyNode;
282,611✔
844
  pOperator->status = OP_NOT_OPENED;
282,835✔
845

846
  //resetBasicOperatorState(&pExtW->binfo);
847
  initResultRowInfo(&pExtW->binfo.resultRowInfo);
282,611✔
848

849
  pExtW->outputWinId = 0;
282,835✔
850
  pExtW->lastWinId = -1;
282,835✔
851
  pExtW->outputWinNum = 0;
282,835✔
852
  taosArrayClear(pExtW->pWins);
282,835✔
853
  extWinRecycleBlkNode(pExtW, &pExtW->pLastBlkNode);
282,611✔
854

855
/*
856
  int32_t code = blockDataEnsureCapacity(pExtW->binfo.pRes, pOperator->resultInfo.capacity);
857
  if (code == 0) {
858
    code = resetAggSup(&pOperator->exprSupp, &pExtW->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
859
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
860
                       &pTaskInfo->storageAPI.functionStore);
861
  }
862
*/
863
  TAOS_CHECK_EXIT(resetExternalWindowExprSupp(pExtW, pTaskInfo, pPhynode));
282,835✔
864
  colDataDestroy(&pExtW->twAggSup.timeWindowData);
282,611✔
865
  TAOS_CHECK_EXIT(initExecTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pTaskInfo->window));
282,611✔
866

867
  pExtW->outWinIdx = 0;
282,611✔
868
  pExtW->lastSKey = INT64_MIN;
282,835✔
869
  pExtW->isDynWindow = false;
282,354✔
870

871
  qDebug("%s ext window stat at reset, created:%" PRId64 ", destroyed:%" PRId64 ", recycled:%" PRId64 ", reused:%" PRId64 ", append:%" PRId64, 
282,835✔
872
      pTaskInfo->id.str, pExtW->stat.resBlockCreated, pExtW->stat.resBlockDestroyed, pExtW->stat.resBlockRecycled, 
873
      pExtW->stat.resBlockReused, pExtW->stat.resBlockAppend);
874

875
_exit:
7,595✔
876

877
  if (code) {
282,835✔
878
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
879
  }
880
  
881
  return code;
282,835✔
882
}
883

884
static EDealRes extWinHasCountLikeFunc(SNode* pNode, void* res) {
6,984,595✔
885
  if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
6,984,595✔
886
    SFunctionNode* pFunc = (SFunctionNode*)pNode;
2,565,569✔
887
    if (fmIsCountLikeFunc(pFunc->funcId) || (pFunc->hasOriginalFunc && fmIsCountLikeFunc(pFunc->originalFuncId))) {
2,565,569✔
888
      *(bool*)res = true;
744,401✔
889
      return DEAL_RES_END;
745,278✔
890
    }
891
  }
892
  return DEAL_RES_CONTINUE;
6,240,626✔
893
}
894

895

896
static int32_t extWinCreateEmptyInputBlock(SOperatorInfo* pOperator, SSDataBlock** ppBlock) {
745,055✔
897
  int32_t code = TSDB_CODE_SUCCESS;
745,055✔
898
  int32_t lino = 0;
745,055✔
899
  SSDataBlock* pBlock = NULL;
745,055✔
900
  if (!tsCountAlwaysReturnValue) {
744,944✔
901
    return TSDB_CODE_SUCCESS;
×
902
  }
903

904
  SExternalWindowOperator* pExtW = pOperator->info;
744,944✔
905

906
  if (!pExtW->hasCountFunc) {
744,944✔
907
    return TSDB_CODE_SUCCESS;
×
908
  }
909

910
  code = createDataBlock(&pBlock);
744,944✔
911
  if (code) {
745,278✔
912
    return code;
×
913
  }
914

915
  pBlock->info.rows = 1;
745,278✔
916
  pBlock->info.capacity = 0;
745,278✔
917

918
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
3,075,329✔
919
    SColumnInfoData colInfo = {0};
2,330,051✔
920
    colInfo.hasNull = true;
2,329,940✔
921
    colInfo.info.type = TSDB_DATA_TYPE_NULL;
2,329,940✔
922
    colInfo.info.bytes = 1;
2,329,940✔
923

924
    SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
2,329,940✔
925
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
4,687,293✔
926
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
2,357,352✔
927
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
2,357,352✔
928
        int32_t slotId = pFuncParam->pCol->slotId;
2,144,785✔
929
        int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
2,144,785✔
930
        if (slotId >= numOfCols) {
2,144,563✔
931
          code = taosArrayEnsureCap(pBlock->pDataBlock, slotId + 1);
1,641,238✔
932
          QUERY_CHECK_CODE(code, lino, _end);
1,641,238✔
933

934
          for (int32_t k = numOfCols; k < slotId + 1; ++k) {
3,921,158✔
935
            void* tmp = taosArrayPush(pBlock->pDataBlock, &colInfo);
2,280,031✔
936
            QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
2,279,920✔
937
          }
938
        }
939
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
212,790✔
940
        // do nothing
941
      }
942
    }
943
  }
944

945
  code = blockDataEnsureCapacity(pBlock, pBlock->info.rows);
745,278✔
946
  QUERY_CHECK_CODE(code, lino, _end);
745,278✔
947

948
  for (int32_t i = 0; i < blockDataGetNumOfCols(pBlock); ++i) {
3,025,309✔
949
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
2,280,031✔
950
    QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
2,280,031✔
951
    colDataSetNULL(pColInfoData, 0);
952
  }
953
  *ppBlock = pBlock;
745,278✔
954

955
_end:
745,278✔
956
  if (code != TSDB_CODE_SUCCESS) {
745,278✔
957
    blockDataDestroy(pBlock);
×
958
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
959
  }
960
  return code;
745,278✔
961
}
962

963

964

965
static int extWinTsWinCompare(const void* pLeft, const void* pRight) {
9,083,582✔
966
  int64_t ts = *(int64_t*)pLeft;
9,083,582✔
967
  SExtWinTimeWindow* pWin = (SExtWinTimeWindow*)pRight;
9,083,582✔
968
  if (ts < pWin->tw.skey) {
9,083,582✔
969
    return -1;
4,880,905✔
970
  }
971
  if (ts >= pWin->tw.ekey) {
4,202,677✔
972
    return 1;
1,426,060✔
973
  }
974

975
  return 0;
2,776,617✔
976
}
977

978

979
static int32_t extWinGetMultiTbWinFromTs(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW, int64_t* tsCol, int64_t rowNum, int32_t* startPos) {
1,909,307✔
980
  int32_t idx = taosArraySearchIdx(pExtW->pWins, tsCol, extWinTsWinCompare, TD_EQ);
1,909,307✔
981
  if (idx >= 0) {
1,909,307✔
982
    *startPos = 0;
1,863,553✔
983
    return idx;
1,863,553✔
984
  }
985

986
  SExtWinTimeWindow* pWin = NULL;
45,754✔
987
  int32_t w = 0;
45,754✔
988
  for (int64_t i = 1; i < rowNum; ++i) {
46,268✔
989
    for (; w < pExtW->pWins->size; ++w) {
116,831,588✔
990
      pWin = TARRAY_GET_ELEM(pExtW->pWins, w);
116,831,588✔
991
      if (tsCol[i] < pWin->tw.skey) {
116,831,588✔
992
        break;
514✔
993
      }
994
      
995
      if (tsCol[i] < pWin->tw.ekey) {
116,831,074✔
996
        *startPos = i;
45,240✔
997
        return w;
45,240✔
998
      }
999
    }
1000
  }
1001

1002
  return -1;
514✔
1003
}
1004

1005
static int32_t extWinGetNoOvlpWin(SOperatorInfo* pOperator, int64_t* tsCol, int32_t* startPos, SDataBlockInfo* pInfo, SExtWinTimeWindow** ppWin, int32_t* winRows) {
2,147,483,647✔
1006
  SExternalWindowOperator* pExtW = pOperator->info;
2,147,483,647✔
1007
  if ((*startPos) >= pInfo->rows) {
2,147,483,647✔
1008
    qDebug("%s %s blk rowIdx %d reach the end, size: %d, skip block", 
2,463,281✔
1009
        GET_TASKID(pOperator->pTaskInfo), __func__, *startPos, (int32_t)pInfo->rows);
1010
    *ppWin = NULL;
2,463,281✔
1011
    return TSDB_CODE_SUCCESS;
2,463,281✔
1012
  }
1013
  
1014
  if (pExtW->blkWinIdx < 0) {
2,147,483,647✔
1015
    pExtW->blkWinIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
2,899,427✔
1016
  } else {
1017
    pExtW->blkWinIdx++;
2,147,483,647✔
1018
  }
1019

1020
  if (pExtW->blkWinIdx >= pExtW->pWins->size) {
2,147,483,647✔
1021
    qDebug("%s %s ext win blk idx %d reach the end, size: %d, skip block", 
275,466✔
1022
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (int32_t)pExtW->pWins->size);
1023
    *ppWin = NULL;
275,466✔
1024
    return TSDB_CODE_SUCCESS;
275,466✔
1025
  }
1026
  
1027
  SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
2,147,483,647✔
1028
  if (tsCol[pInfo->rows - 1] < pWin->tw.skey) {
2,147,483,647✔
1029
    qDebug("%s %s block end ts %" PRId64 " is small than curr win %d skey %" PRId64 ", skip block", 
159,948✔
1030
        GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[pInfo->rows - 1], pExtW->blkWinIdx, pWin->tw.skey);
1031
    *ppWin = NULL;
159,948✔
1032
    return TSDB_CODE_SUCCESS;
159,948✔
1033
  }
1034

1035
  int32_t r = *startPos;
2,147,483,647✔
1036

1037
  qDebug("%s %s start to get novlp win from winIdx %d rowIdx %d", GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, r);
2,147,483,647✔
1038

1039
  // TODO handle desc order
1040
  for (; pExtW->blkWinIdx < pExtW->pWins->size; ++pExtW->blkWinIdx) {
2,147,483,647✔
1041
    pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
2,147,483,647✔
1042
    for (; r < pInfo->rows; ++r) {
2,147,483,647✔
1043
      if (tsCol[r] < pWin->tw.skey) {
2,147,483,647✔
1044
        continue;
609,647,214✔
1045
      }
1046

1047
      if (tsCol[r] < pWin->tw.ekey) {
2,147,483,647✔
1048
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
2,147,483,647✔
1049
        *ppWin = pWin;
2,147,483,647✔
1050
        *startPos = r;
2,147,483,647✔
1051
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
2,147,483,647✔
1052

1053
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
2,147,483,647✔
1054
            GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, r, tsCol[r], tsCol[r + *winRows - 1]);
1055
        
1056
        return TSDB_CODE_SUCCESS;
2,147,483,647✔
1057
      }
1058

1059
      if (!pOperator->pTaskInfo->pStreamRuntimeInfo && tsCol[r] >= pWin->tw.ekey) {
2,147,483,647✔
1060
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
2,147,483,647✔
1061
        *ppWin = pWin;
2,147,483,647✔
1062
        *startPos = r;
2,147,483,647✔
1063
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
2,147,483,647✔
1064

1065
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk",
2,147,483,647✔
1066
               GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, r, tsCol[r], tsCol[r + *winRows - 1]);
1067

1068
        return TSDB_CODE_SUCCESS;
2,147,483,647✔
1069
      }
1070

1071
      break;
6,153✔
1072
    }
1073

1074
    if (r == pInfo->rows) {
6,885✔
1075
      break;
732✔
1076
    }
1077
  }
1078

1079
  qDebug("%s %s no more ext win in block, TR[%" PRId64 ", %" PRId64 "), skip it", 
732✔
1080
      GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1081

1082
  *ppWin = NULL;
732✔
1083
  return TSDB_CODE_SUCCESS;
732✔
1084
}
1085

1086
static int32_t extWinGetOvlpWin(SOperatorInfo* pOperator, int64_t* tsCol, int32_t* startPos, SDataBlockInfo* pInfo, SExtWinTimeWindow** ppWin, int32_t* winRows) {
224,570✔
1087
  SExternalWindowOperator* pExtW = pOperator->info;
224,570✔
1088
  if (pExtW->blkWinIdx < 0) {
224,570✔
1089
    pExtW->blkWinIdx = pExtW->blkWinStartIdx;
20,456✔
1090
  } else {
1091
    pExtW->blkWinIdx++;
204,114✔
1092
  }
1093

1094
  if (pExtW->blkWinIdx >= pExtW->pWins->size) {
224,570✔
1095
    qDebug("%s %s ext win blk idx %d reach the end, size: %d, skip block", 
19,724✔
1096
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (int32_t)pExtW->pWins->size);
1097
    *ppWin = NULL;
19,724✔
1098
    return TSDB_CODE_SUCCESS;
19,724✔
1099
  }
1100
  
1101
  SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
204,846✔
1102
  if (tsCol[pInfo->rows - 1] < pWin->tw.skey) {
204,846✔
1103
    qDebug("%s %s block end ts %" PRId64 " is small than curr win %d skey %" PRId64 ", skip block", 
×
1104
        GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[pInfo->rows - 1], pExtW->blkWinIdx, pWin->tw.skey);
1105
    *ppWin = NULL;
×
1106
    return TSDB_CODE_SUCCESS;
×
1107
  }
1108

1109
  int64_t r = 0;
204,846✔
1110

1111
  qDebug("%s %s start to get ovlp win from winIdx %d rowIdx %d", GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pExtW->blkRowStartIdx);
204,846✔
1112
  
1113
  // TODO handle desc order
1114
  for (; pExtW->blkWinIdx < pExtW->pWins->size; ++pExtW->blkWinIdx) {
205,578✔
1115
    pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
205,578✔
1116
    for (r = pExtW->blkRowStartIdx; r < pInfo->rows; ++r) {
417,758✔
1117
      if (tsCol[r] < pWin->tw.skey) {
417,026✔
1118
        pExtW->blkRowStartIdx = r + 1;
212,180✔
1119
        continue;
212,180✔
1120
      }
1121

1122
      if (tsCol[r] < pWin->tw.ekey) {
204,846✔
1123
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
204,114✔
1124
        *ppWin = pWin;
204,114✔
1125
        *startPos = r;
204,114✔
1126
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
204,114✔
1127

1128
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
204,114✔
1129
            GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, (int32_t)r, tsCol[r], tsCol[r + *winRows - 1]);
1130
        
1131
        if ((r + *winRows) < pInfo->rows) {
204,114✔
1132
          pExtW->blkWinStartIdx = pExtW->blkWinIdx + 1;
184,390✔
1133
          pExtW->blkWinStartSet = true;
184,390✔
1134
        }
1135
        
1136
        return TSDB_CODE_SUCCESS;
204,114✔
1137
      }
1138

1139
      break;
732✔
1140
    }
1141

1142
    if (r >= pInfo->rows) {
1,464✔
1143
      if (!pExtW->blkWinStartSet) {
732✔
1144
        pExtW->blkWinStartIdx = pExtW->blkWinIdx;
732✔
1145
      }
1146
      
1147
      break;
732✔
1148
    }
1149
  }
1150

1151
  qDebug("%s %s no more ext win in block, TR[%" PRId64 ", %" PRId64 "), skip it", 
732✔
1152
      GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1153

1154
  *ppWin = NULL;
732✔
1155
  return TSDB_CODE_SUCCESS;
732✔
1156
}
1157

1158

1159
static int32_t extWinGetMultiTbNoOvlpWin(SOperatorInfo* pOperator, int64_t* tsCol, int32_t* startPos, SDataBlockInfo* pInfo, SExtWinTimeWindow** ppWin, int32_t* winRows) {
754,838,804✔
1160
  SExternalWindowOperator* pExtW = pOperator->info;
754,838,804✔
1161
  if ((*startPos) >= pInfo->rows) {
754,839,250✔
1162
    qDebug("%s %s blk rowIdx %d reach the end, size: %d, skip block", 
1,863,553✔
1163
        GET_TASKID(pOperator->pTaskInfo), __func__, *startPos, (int32_t)pInfo->rows);
1164
    *ppWin = NULL;
1,863,553✔
1165
    return TSDB_CODE_SUCCESS;
1,863,553✔
1166
  }
1167
  
1168
  if (pExtW->blkWinIdx < 0) {
752,976,143✔
1169
    pExtW->blkWinIdx = extWinGetMultiTbWinFromTs(pOperator, pExtW, tsCol, pInfo->rows, startPos);
1,909,307✔
1170
    if (pExtW->blkWinIdx < 0) {
1,909,307✔
1171
      qDebug("%s %s blk TR[%" PRId64 ", %" PRId64 ") not in any win, skip block", 
514✔
1172
          GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1173
      *ppWin = NULL;
514✔
1174
      return TSDB_CODE_SUCCESS;
514✔
1175
    }
1176

1177
    extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
1,908,793✔
1178
    *ppWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
1,908,793✔
1179
    *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, *startPos, (*ppWin)->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
1,908,793✔
1180

1181
    qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
1,908,570✔
1182
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (*ppWin)->tw.skey, (*ppWin)->tw.ekey, *winRows, *startPos, tsCol[*startPos], tsCol[*startPos + *winRows - 1]);
1183
    
1184
    return TSDB_CODE_SUCCESS;
1,908,793✔
1185
  } else {
1186
    pExtW->blkWinIdx++;
751,068,167✔
1187
  }
1188

1189
  if (pExtW->blkWinIdx >= pExtW->pWins->size) {
751,068,390✔
1190
    qDebug("%s %s ext win blk idx %d reach the end, size: %d, skip block", 
2,320✔
1191
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (int32_t)pExtW->pWins->size);
1192
    *ppWin = NULL;
2,320✔
1193
    return TSDB_CODE_SUCCESS;
2,320✔
1194
  }
1195
  
1196
  SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
751,066,475✔
1197
  if (tsCol[pInfo->rows - 1] < pWin->tw.skey) {
751,066,259✔
1198
    qDebug("%s %s block end ts %" PRId64 " is small than curr win %d skey %" PRId64 ", skip block", 
42,920✔
1199
        GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[pInfo->rows - 1], pExtW->blkWinIdx, pWin->tw.skey);
1200
    *ppWin = NULL;
42,920✔
1201
    return TSDB_CODE_SUCCESS;
42,920✔
1202
  }
1203

1204
  int32_t r = *startPos;
751,022,670✔
1205

1206
  qDebug("%s %s start to get mnovlp win from winIdx %d rowIdx %d", GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, r);
751,022,670✔
1207

1208
  // TODO handle desc order
1209
  for (; pExtW->blkWinIdx < pExtW->pWins->size; ++pExtW->blkWinIdx) {
751,035,260✔
1210
    pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
751,035,044✔
1211
    for (; r < pInfo->rows; ++r) {
916,248,344✔
1212
      if (tsCol[r] < pWin->tw.skey) {
916,248,344✔
1213
        continue;
165,213,300✔
1214
      }
1215

1216
      if (tsCol[r] < pWin->tw.ekey) {
751,035,260✔
1217
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
662,846,628✔
1218
        *ppWin = pWin;
662,845,757✔
1219
        *startPos = r;
662,845,757✔
1220
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
662,845,541✔
1221

1222
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
662,845,764✔
1223
            GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, r, tsCol[r], tsCol[r + *winRows - 1]);
1224
        
1225
        return TSDB_CODE_SUCCESS;
662,846,405✔
1226
      }
1227

1228
      if (!pOperator->pTaskInfo->pStreamRuntimeInfo && tsCol[r] >= pWin->tw.ekey) {
88,188,632✔
1229
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
88,177,400✔
1230
        *ppWin = pWin;
88,177,400✔
1231
        *startPos = r;
88,177,400✔
1232
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
88,177,400✔
1233

1234
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk",
88,177,400✔
1235
               GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, r, tsCol[r], tsCol[r + *winRows - 1]);
1236

1237
        return TSDB_CODE_SUCCESS;
88,177,400✔
1238
      }
1239

1240
      break;
11,232✔
1241
    }
1242

1243
    if (r == pInfo->rows) {
11,232✔
1244
      break;
×
1245
    }
1246
  }
1247

1248
  qDebug("%s %s no more ext win in block, TR[%" PRId64 ", %" PRId64 "), skip it", 
×
1249
      GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1250

1251
  *ppWin = NULL;
×
1252
  return TSDB_CODE_SUCCESS;
×
1253
}
1254

1255
static int32_t extWinGetFirstWinFromTs(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW, int64_t* tsCol,
978,664✔
1256
                                       int64_t rowNum, int32_t* startPos) {
1257
  SExtWinTimeWindow* pWin = NULL;
978,664✔
1258
  int32_t            idx = taosArraySearchIdx(pExtW->pWins, tsCol, extWinTsWinCompare, TD_EQ);
978,664✔
1259
  if (idx >= 0) {
978,664✔
1260
    for (int i = idx - 1; i >= 0; --i) {
913,064✔
1261
      pWin = TARRAY_GET_ELEM(pExtW->pWins, i);
×
1262
      if (extWinTsWinCompare(tsCol, pWin) == 0) {
×
1263
        idx = i;
×
1264
      } else {
1265
        break;
×
1266
      }
1267
    }
1268
    *startPos = 0;
913,064✔
1269
    return idx;
913,064✔
1270
  }
1271

1272
  pWin = NULL;
65,600✔
1273
  int32_t w = 0;
65,600✔
1274
  for (int64_t i = 1; i < rowNum; ++i) {
131,640✔
1275
    for (; w < pExtW->pWins->size; ++w) {
153,640✔
1276
      pWin = TARRAY_GET_ELEM(pExtW->pWins, w);
153,640✔
1277
      if (tsCol[i] < pWin->tw.skey) {
153,640✔
1278
        break;
66,040✔
1279
      }
1280

1281
      if (tsCol[i] < pWin->tw.ekey) {
87,600✔
1282
        *startPos = i;
22,000✔
1283
        return w;
22,000✔
1284
      }
1285
    }
1286
  }
1287

1288
  return -1;
43,600✔
1289
}
1290

1291
static int32_t extWinGetMultiTbOvlpWin(SOperatorInfo* pOperator, int64_t* tsCol, int32_t* startPos, SDataBlockInfo* pInfo, SExtWinTimeWindow** ppWin, int32_t* winRows) {
5,791,030✔
1292
  SExternalWindowOperator* pExtW = pOperator->info;
5,791,030✔
1293
  if (pExtW->blkWinIdx < 0) {
5,791,678✔
1294
    pExtW->blkWinIdx = extWinGetFirstWinFromTs(pOperator, pExtW, tsCol, pInfo->rows, startPos);
978,664✔
1295
    if (pExtW->blkWinIdx < 0) {
978,664✔
1296
      qDebug("%s %s blk TR[%" PRId64 ", %" PRId64 ") not in any win, skip block", 
43,600✔
1297
          GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1298
      *ppWin = NULL;
43,600✔
1299
      return TSDB_CODE_SUCCESS;
43,600✔
1300
    }
1301

1302
    extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
935,064✔
1303
    *ppWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
935,064✔
1304
    *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, *startPos, (*ppWin)->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
935,064✔
1305
    
1306
    qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
935,064✔
1307
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (*ppWin)->tw.skey, (*ppWin)->tw.ekey, *winRows, *startPos, tsCol[*startPos], tsCol[*startPos + *winRows - 1]);
1308
    
1309
    return TSDB_CODE_SUCCESS;
935,064✔
1310
  } else {
1311
    pExtW->blkWinIdx++;
4,813,878✔
1312
  }
1313

1314
  if (pExtW->blkWinIdx >= pExtW->pWins->size) {
4,813,872✔
1315
    qDebug("%s %s ext win blk idx %d reach the end, size: %d, skip block", 
935,064✔
1316
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (int32_t)pExtW->pWins->size);
1317
    *ppWin = NULL;
935,064✔
1318
    return TSDB_CODE_SUCCESS;
935,064✔
1319
  }
1320
  
1321
  SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
3,879,240✔
1322
  if (tsCol[pInfo->rows - 1] < pWin->tw.skey) {
3,879,456✔
1323
    qDebug("%s %s block end ts %" PRId64 " is small than curr win %d skey %" PRId64 ", skip block", 
×
1324
        GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[pInfo->rows - 1], pExtW->blkWinIdx, pWin->tw.skey);
1325
    *ppWin = NULL;
×
1326
    return TSDB_CODE_SUCCESS;
×
1327
  }
1328

1329
  int64_t r = 0;
3,879,240✔
1330

1331
  qDebug("%s %s start to get movlp win from winIdx %d rowIdx %d", GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pExtW->blkRowStartIdx);
3,879,240✔
1332

1333
  // TODO handle desc order
1334
  for (; pExtW->blkWinIdx < pExtW->pWins->size; ++pExtW->blkWinIdx) {
3,942,240✔
1335
    pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
3,942,672✔
1336
    for (r = pExtW->blkRowStartIdx; r < pInfo->rows; ++r) {
13,226,984✔
1337
      if (tsCol[r] < pWin->tw.skey) {
13,226,552✔
1338
        pExtW->blkRowStartIdx = r + 1;
9,284,312✔
1339
        continue;
9,284,312✔
1340
      }
1341

1342
      if (tsCol[r] < pWin->tw.ekey) {
3,942,672✔
1343
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
3,879,672✔
1344
        *ppWin = pWin;
3,879,672✔
1345
        *startPos = r;
3,879,672✔
1346
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
3,879,672✔
1347

1348
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
3,879,672✔
1349
            GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, (int32_t)r, tsCol[r], tsCol[r + *winRows - 1]);
1350
        
1351
        return TSDB_CODE_SUCCESS;
3,879,672✔
1352
      }
1353

1354
      break;
63,000✔
1355
    }
1356

1357
    if (r >= pInfo->rows) {
62,790✔
1358
      break;
×
1359
    }
1360
  }
1361

1362
  qDebug("%s %s no more ext win in block, TR[%" PRId64 ", %" PRId64 "), skip it", 
×
1363
      GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1364

1365
  *ppWin = NULL;
×
1366
  return TSDB_CODE_SUCCESS;
×
1367
}
1368

1369

1370
static int32_t extWinGetWinStartPos(STimeWindow win, const SDataBlockInfo* pBlockInfo, int32_t lastEndPos, int32_t order, int32_t* nextPos, int64_t* tsCol) {
×
1371
  bool ascQuery = order == TSDB_ORDER_ASC;
×
1372

1373
  if (win.ekey <= pBlockInfo->window.skey && ascQuery) {
×
1374
    return -2;
×
1375
  }
1376
//if (win.skey > pBlockInfo->window.ekey && !ascQuery) return -2;
1377

1378
  if (win.skey > pBlockInfo->window.ekey && ascQuery) return -1;
×
1379
//if (win.ekey < pBlockInfo->window.skey && !ascQuery) return -1;
1380

1381
  while (true) {
1382
    if (win.ekey <= tsCol[lastEndPos + 1] && ascQuery) return -2;
×
1383
    if (win.skey <= tsCol[lastEndPos + 1] && ascQuery) break;
×
1384
    lastEndPos++;
×
1385
  }
1386

1387
  *nextPos = lastEndPos + 1;
×
1388
  return 0;
×
1389
}
1390

1391
static int32_t extWinAggSetWinOutputBuf(SOperatorInfo* pOperator, SExtWinTimeWindow* win, SExprSupp* pSupp, 
2,147,483,647✔
1392
                                     SAggSupporter* pAggSup, SExecTaskInfo* pTaskInfo) {
1393
  int32_t code = 0, lino = 0;
2,147,483,647✔
1394
  SResultRow* pResultRow = NULL;
2,147,483,647✔
1395
  SExternalWindowOperator* pExtW = (SExternalWindowOperator*)pOperator->info;
2,147,483,647✔
1396
  
1397
#if 0
1398
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
1399
                                                  true, tableGroupId, pTaskInfo, true, pAggSup, true);
1400
  if (pResultRow == NULL) {
1401
    qError("failed to set result output buffer, error:%s", tstrerror(pTaskInfo->code));
1402
    return pTaskInfo->code;
1403
  }
1404

1405
  qDebug("current result rows num:%d", tSimpleHashGetSize(pAggSup->pResultRowHashTable));
1406

1407
#else
1408
  if (win->winOutIdx >= 0) {
2,147,483,647✔
1409
    pResultRow = (SResultRow*)((char*)pExtW->pResultRow + win->winOutIdx * pAggSup->resultRowSize);
2,147,483,647✔
1410
  } else {
1411
    win->winOutIdx = pExtW->outWinIdx++;
2,147,483,647✔
1412
    
1413
    qDebug("set window [%" PRId64 ", %" PRId64 "] outIdx:%d", win->tw.skey, win->tw.ekey, win->winOutIdx);
2,147,483,647✔
1414

1415
    pResultRow = (SResultRow*)((char*)pExtW->pResultRow + win->winOutIdx * pAggSup->resultRowSize);
2,147,483,647✔
1416
    
1417
    memset(pResultRow, 0, pAggSup->resultRowSize);
2,147,483,647✔
1418

1419
    pResultRow->winIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
2,147,483,647✔
1420
    TAOS_SET_POBJ_ALIGNED(&pResultRow->win, &win->tw);
2,147,483,647✔
1421
  }
1422
#endif
1423

1424
  // set time window for current result
1425
  TAOS_CHECK_EXIT(setResultRowInitCtx(pResultRow, pSupp->pCtx, pSupp->numOfExprs, pSupp->rowEntryInfoOffset));
2,147,483,647✔
1426

1427
_exit:
2,147,483,647✔
1428
  
1429
  if (code) {
2,147,483,647✔
1430
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1431
  }
1432

1433
  return code;
2,147,483,647✔
1434
}
1435

1436
static int32_t extWinAggDo(SOperatorInfo* pOperator, int32_t startPos, int32_t forwardRows,
2,147,483,647✔
1437
                                  SSDataBlock* pInputBlock) {
1438
  if (pOperator->pTaskInfo->pStreamRuntimeInfo && forwardRows == 0) {
2,147,483,647✔
1439
    return TSDB_CODE_SUCCESS;
×
1440
  }
1441

1442
  SExprSupp*               pSup = &pOperator->exprSupp;
2,147,483,647✔
1443
  SExternalWindowOperator* pExtW = pOperator->info;
2,147,483,647✔
1444
  return applyAggFunctionOnPartialTuples(pOperator->pTaskInfo, pSup->pCtx, &pExtW->twAggSup.timeWindowData, startPos,
2,147,483,647✔
1445
                                         forwardRows, pInputBlock->info.rows, pSup->numOfExprs);
2,147,483,647✔
1446

1447
}
1448

1449
static bool extWinLastWinClosed(SExternalWindowOperator* pExtW) {
1,168✔
1450
  if (pExtW->outWinIdx <= 0 || (pExtW->multiTableMode && !pExtW->inputHasOrder)) {
1,168✔
1451
    return false;
1,168✔
1452
  }
1453

1454
  if (NULL == pExtW->timeRangeExpr || !pExtW->timeRangeExpr->needCalc) {
×
1455
    return true;
×
1456
  }
1457

1458
  SList* pList = taosArrayGetP(pExtW->pOutputBlocks, pExtW->outWinIdx - 1);
×
1459
  if (0 == listNEles(pList)) {
×
1460
    return true;
×
1461
  }
1462

1463
  SListNode* pNode = listTail(pList);
×
1464
  SArray* pBlkWinIdx = *((SArray**)pNode->data + 1);
×
1465
  int64_t* pIdx = taosArrayGetLast(pBlkWinIdx);
×
1466
  if (pIdx && *(int32_t*)pIdx < pExtW->blkWinStartIdx) {
×
1467
    return true;
×
1468
  }
1469

1470
  return false;
×
1471
}
1472

1473
static int32_t extWinGetWinResBlock(SOperatorInfo* pOperator, int32_t rows, SExtWinTimeWindow* pWin, SSDataBlock** ppRes, SArray** ppIdx) {
2,506✔
1474
  SExternalWindowOperator* pExtW = pOperator->info;
2,506✔
1475
  SList*                   pList = NULL;
2,506✔
1476
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
2,506✔
1477
  
1478
  if (pWin->winOutIdx >= 0) {
2,506✔
1479
    pList = taosArrayGetP(pExtW->pOutputBlocks, pWin->winOutIdx);
1,338✔
1480
  } else {
1481
    if (extWinLastWinClosed(pExtW)) {
1,168✔
1482
      pWin->winOutIdx = pExtW->outWinIdx - 1;
×
1483
      pList = taosArrayGetP(pExtW->pOutputBlocks, pWin->winOutIdx);
×
1484
    } else {
1485
      pWin->winOutIdx = pExtW->outWinIdx++;
1,168✔
1486
      pList = tdListNew(POINTER_BYTES * 2);
1,168✔
1487
      TSDB_CHECK_NULL(pList, code, lino, _exit, terrno);
1,168✔
1488
      SList** ppList = taosArrayGet(pExtW->pOutputBlocks, pWin->winOutIdx);
1,168✔
1489
      extWinRecycleBlockList(pExtW, ppList);
1,168✔
1490
      *ppList = pList;
1,168✔
1491
    }
1492
  }
1493
  
1494
  TAOS_CHECK_EXIT(extWinGetLastBlockFromList(pExtW, pList, rows, ppRes, ppIdx));
2,506✔
1495

1496
_exit:
2,506✔
1497

1498
  if (code) {
2,506✔
1499
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1500
  }
1501

1502
  return code;
2,506✔
1503
}
1504

1505
static int32_t extWinProjectDo(SOperatorInfo* pOperator, SSDataBlock* pInputBlock, int32_t startPos, int32_t rows, SExtWinTimeWindow* pWin) {
2,506✔
1506
  SExternalWindowOperator* pExtW = pOperator->info;
2,506✔
1507
  SExprSupp*               pExprSup = &pExtW->scalarSupp;
2,506✔
1508
  SSDataBlock*             pResBlock = NULL;
2,506✔
1509
  SArray*                  pIdx = NULL;
2,506✔
1510
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
2,506✔
1511
  
1512
  TAOS_CHECK_EXIT(extWinGetWinResBlock(pOperator, rows, pWin, &pResBlock, &pIdx));
2,506✔
1513

1514
  qDebug("%s %s win[%" PRId64 ", %" PRId64 "] got res block %p winRowIdx %p, winOutIdx:%d, capacity:%d", 
2,506✔
1515
      pOperator->pTaskInfo->id.str, __func__, pWin->tw.skey, pWin->tw.ekey, pResBlock, pIdx, pWin->winOutIdx, pResBlock->info.capacity);
1516
  
1517
  if (!pExtW->pTmpBlock) {
2,506✔
1518
    TAOS_CHECK_EXIT(createOneDataBlock(pInputBlock, false, &pExtW->pTmpBlock));
942✔
1519
  } else {
1520
    blockDataCleanup(pExtW->pTmpBlock);
1,564✔
1521
  }
1522
  
1523
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pExtW->pTmpBlock, TMAX(1, rows)));
2,506✔
1524

1525
  qDebug("%s %s start to copy %d rows to tmp blk", pOperator->pTaskInfo->id.str, __func__, rows);
2,506✔
1526
  TAOS_CHECK_EXIT(blockDataMergeNRows(pExtW->pTmpBlock, pInputBlock, startPos, rows));
2,506✔
1527

1528
  qDebug("%s %s start to apply project to tmp blk", pOperator->pTaskInfo->id.str, __func__);
2,506✔
1529
  TAOS_CHECK_EXIT(projectApplyFunctionsWithSelect(pExprSup->pExprInfo, pResBlock, pExtW->pTmpBlock, pExprSup->pCtx, pExprSup->numOfExprs,
2,506✔
1530
        NULL, GET_STM_RTINFO(pOperator->pTaskInfo), true, pExprSup->hasIndefRowsFunc));
1531

1532
  TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pIdx, pResBlock, extWinGetCurWinIdx(pOperator->pTaskInfo), rows));
2,506✔
1533

1534
_exit:
2,506✔
1535

1536
  if (code) {
2,506✔
1537
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1538
  } else {
1539
    qDebug("%s %s project succeed", pOperator->pTaskInfo->id.str, __func__);
2,506✔
1540
  }
1541
  
1542
  return code;
2,506✔
1543
}
1544

1545
static int32_t extWinProjectOpen(SOperatorInfo* pOperator, SSDataBlock* pInputBlock) {
2,054✔
1546
  SExternalWindowOperator* pExtW = pOperator->info;
2,054✔
1547
  int64_t*                 tsCol = extWinExtractTsCol(pInputBlock, pExtW->primaryTsIndex, pOperator->pTaskInfo);
2,054✔
1548
  SExtWinTimeWindow*       pWin = NULL;
2,054✔
1549
  bool                     ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
2,054✔
1550
  int32_t                  startPos = 0, winRows = 0;
2,054✔
1551
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
2,054✔
1552
  
1553
  while (true) {
1554
    TAOS_CHECK_EXIT((*pExtW->getWinFp)(pOperator, tsCol, &startPos, &pInputBlock->info, &pWin, &winRows));
4,560✔
1555
    if (pWin == NULL) {
4,560✔
1556
      break;
2,054✔
1557
    }
1558

1559
    qDebug("%s ext window [%" PRId64 ", %" PRId64 ") project start, ascScan:%d, startPos:%d, winRows:%d",
2,506✔
1560
           GET_TASKID(pOperator->pTaskInfo), pWin->tw.skey, pWin->tw.ekey, ascScan, startPos, winRows);        
1561
    
1562
    TAOS_CHECK_EXIT(extWinProjectDo(pOperator, pInputBlock, startPos, winRows, pWin));
2,506✔
1563
    
1564
    startPos += winRows;
2,506✔
1565
  }
1566
  
1567
_exit:
2,054✔
1568

1569
  if (code) {
2,054✔
1570
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
1571
  }
1572

1573
  return code;
2,054✔
1574
}
1575

1576
static int32_t extWinIndefRowsDoImpl(SOperatorInfo* pOperator, SSDataBlock* pRes, SSDataBlock* pBlock) {
×
1577
  SExternalWindowOperator* pExtW = pOperator->info;
×
1578
  SOptrBasicInfo*     pInfo = &pExtW->binfo;
×
1579
  SExprSupp*          pSup = &pOperator->exprSupp;
×
1580
  SExecTaskInfo*      pTaskInfo = pOperator->pTaskInfo;
×
1581
  int32_t order = pInfo->inputTsOrder;
×
1582
  int32_t scanFlag = pBlock->info.scanFlag;
×
1583
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
×
1584

1585
  SExprSupp* pScalarSup = &pExtW->scalarSupp;
×
1586
  if (pScalarSup->pExprInfo != NULL) {
×
1587
    TAOS_CHECK_EXIT(projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
×
1588
                                 pExtW->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo)));
1589
  }
1590

1591
  TAOS_CHECK_EXIT(setInputDataBlock(pSup, pBlock, order, scanFlag, false));
×
1592

1593
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pRes, pRes->info.rows + pBlock->info.rows));
×
1594

1595
  TAOS_CHECK_EXIT(projectApplyFunctions(pSup->pExprInfo, pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
×
1596
                               pExtW->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo)));
1597

1598
_exit:
×
1599

1600
  if (code) {
×
1601
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __FUNCTION__, lino, tstrerror(code));
×
1602
  }
1603

1604
  return code;
×
1605
}
1606

1607
static int32_t extWinIndefRowsSetWinOutputBuf(SExternalWindowOperator* pExtW, SExtWinTimeWindow* win, SExprSupp* pSupp, 
×
1608
                                     SAggSupporter* pAggSup, SExecTaskInfo* pTaskInfo, bool reset) {
1609
  int32_t code = 0, lino = 0;
×
1610
  SResultRow* pResultRow = NULL;
×
1611

1612
  pResultRow = (SResultRow*)((char*)pExtW->pResultRow + win->winOutIdx * pAggSup->resultRowSize);
×
1613
  
1614
  qDebug("set window [%" PRId64 ", %" PRId64 "] outIdx:%d", win->tw.skey, win->tw.ekey, win->winOutIdx);
×
1615

1616
  if (reset) {
×
1617
    memset(pResultRow, 0, pAggSup->resultRowSize);
×
1618
    for (int32_t k = 0; k < pSupp->numOfExprs; ++k) {
×
1619
      SqlFunctionCtx* pCtx = &pSupp->pCtx[k];
×
1620
      pCtx->pOutput = NULL;
×
1621
    }
1622
  }
1623

1624
  TAOS_SET_POBJ_ALIGNED(&pResultRow->win, &win->tw);
×
1625

1626
  // set time window for current result
1627
  TAOS_CHECK_EXIT(setResultRowInitCtx(pResultRow, pSupp->pCtx, pSupp->numOfExprs, pSupp->rowEntryInfoOffset));
×
1628

1629
_exit:
×
1630
  
1631
  if (code) {
×
1632
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1633
  }
1634

1635
  return code;
×
1636
}
1637

1638
static int32_t extWinGetSetWinResBlockBuf(SOperatorInfo* pOperator, int32_t rows, SExtWinTimeWindow* pWin, SSDataBlock** ppRes, SArray** ppIdx) {
×
1639
  SExternalWindowOperator* pExtW = pOperator->info;
×
1640
  SList*                   pList = NULL;
×
1641
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
×
1642
  
1643
  if (pWin->winOutIdx >= 0) {
×
1644
    pList = taosArrayGetP(pExtW->pOutputBlocks, pWin->winOutIdx);
×
1645
    TAOS_CHECK_EXIT(extWinIndefRowsSetWinOutputBuf(pExtW, pWin, &pOperator->exprSupp, &pExtW->aggSup, pOperator->pTaskInfo, false));
×
1646
  } else {
1647
    if (extWinLastWinClosed(pExtW)) {
×
1648
      pWin->winOutIdx = pExtW->outWinIdx - 1;
×
1649
      pList = taosArrayGetP(pExtW->pOutputBlocks, pWin->winOutIdx);
×
1650
    } else {
1651
      pWin->winOutIdx = pExtW->outWinIdx++;
×
1652
      pList = tdListNew(POINTER_BYTES * 2);
×
1653
      TSDB_CHECK_NULL(pList, code, lino, _exit, terrno);
×
1654
      SList** ppList = taosArrayGet(pExtW->pOutputBlocks, pWin->winOutIdx);
×
1655
      extWinRecycleBlockList(pExtW, ppList);
×
1656
      *ppList = pList;
×
1657
    }
1658
    TAOS_CHECK_EXIT(extWinIndefRowsSetWinOutputBuf(pExtW, pWin, &pOperator->exprSupp, &pExtW->aggSup, pOperator->pTaskInfo, true));
×
1659
  }
1660
  
1661
  TAOS_CHECK_EXIT(extWinGetLastBlockFromList(pExtW, pList, rows, ppRes, ppIdx));
×
1662

1663
_exit:
×
1664

1665
  if (code) {
×
1666
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1667
  }
1668

1669
  return code;
×
1670
}
1671

1672

1673
static int32_t extWinIndefRowsDo(SOperatorInfo* pOperator, SSDataBlock* pInputBlock, int32_t startPos, int32_t rows, SExtWinTimeWindow* pWin) {
×
1674
  SExternalWindowOperator* pExtW = pOperator->info;
×
1675
  SSDataBlock*             pResBlock = NULL;
×
1676
  SArray*                  pIdx = NULL;
×
1677
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
×
1678
  
1679
  TAOS_CHECK_EXIT(extWinGetSetWinResBlockBuf(pOperator, rows, pWin, &pResBlock, &pIdx));
×
1680
  
1681
  if (!pExtW->pTmpBlock) {
×
1682
    TAOS_CHECK_EXIT(createOneDataBlock(pInputBlock, false, &pExtW->pTmpBlock));
×
1683
  } else {
1684
    blockDataCleanup(pExtW->pTmpBlock);
×
1685
  }
1686
  
1687
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pExtW->pTmpBlock, TMAX(1, rows)));
×
1688

1689
  TAOS_CHECK_EXIT(blockDataMergeNRows(pExtW->pTmpBlock, pInputBlock, startPos, rows));
×
1690
  TAOS_CHECK_EXIT(extWinIndefRowsDoImpl(pOperator, pResBlock, pExtW->pTmpBlock));
×
1691

1692
  TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pIdx, pResBlock, extWinGetCurWinIdx(pOperator->pTaskInfo), rows));
×
1693

1694
_exit:
×
1695

1696
  if (code) {
×
1697
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1698
  }
1699
  
1700
  return code;
×
1701
}
1702

1703

1704
static int32_t extWinIndefRowsOpen(SOperatorInfo* pOperator, SSDataBlock* pInputBlock) {
×
1705
  SExternalWindowOperator* pExtW = pOperator->info;
×
1706
  int64_t*                 tsCol = extWinExtractTsCol(pInputBlock, pExtW->primaryTsIndex, pOperator->pTaskInfo);
×
1707
  SExtWinTimeWindow*       pWin = NULL;
×
1708
  bool                     ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
×
1709
  int32_t                  startPos = 0, winRows = 0;
×
1710
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
×
1711
  
1712
  while (true) {
1713
    TAOS_CHECK_EXIT((*pExtW->getWinFp)(pOperator, tsCol, &startPos, &pInputBlock->info, &pWin, &winRows));
×
1714
    if (pWin == NULL) {
×
1715
      break;
×
1716
    }
1717

1718
    qDebug("%s ext window [%" PRId64 ", %" PRId64 ") indefRows start, ascScan:%d, startPos:%d, winRows:%d",
×
1719
           GET_TASKID(pOperator->pTaskInfo), pWin->tw.skey, pWin->tw.ekey, ascScan, startPos, winRows);        
1720
    
1721
    TAOS_CHECK_EXIT(extWinIndefRowsDo(pOperator, pInputBlock, startPos, winRows, pWin));
×
1722
    
1723
    startPos += winRows;
×
1724
  }
1725
  
1726
_exit:
×
1727

1728
  if (code) {
×
1729
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
1730
  }
1731

1732
  return code;
×
1733
}
1734

1735
static int32_t extWinNonAggOutputRes(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,168✔
1736
  SExternalWindowOperator* pExtW = pOperator->info;
1,168✔
1737
  int32_t                  numOfWin = pExtW->outWinIdx;
1,168✔
1738
  int32_t                  code = TSDB_CODE_SUCCESS;
1,168✔
1739
  int32_t                  lino = 0;
1,168✔
1740
  SSDataBlock*             pRes = NULL;
1,168✔
1741

1742
  for (; pExtW->outputWinId < numOfWin; pExtW->outputWinId++, extWinIncCurWinOutIdx(pOperator->pTaskInfo->pStreamRuntimeInfo)) {
1,168✔
1743
    SList* pList = taosArrayGetP(pExtW->pOutputBlocks, pExtW->outputWinId);
1,168✔
1744
    if (listNEles(pList) <= 0) {
1,168✔
1745
      continue;
×
1746
    }
1747

1748
    SListNode* pNode = tdListPopHead(pList);
1,168✔
1749
    pRes = *(SSDataBlock**)pNode->data;
1,168✔
1750
    pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamBlkWinIdx = *(SArray**)((SArray**)pNode->data + 1);
1,168✔
1751
    pExtW->pLastBlkNode = pNode;
1,168✔
1752

1753
    if (listNEles(pList) <= 0) {
1,168✔
1754
      pExtW->outputWinId++;
1,168✔
1755
      extWinIncCurWinOutIdx(pOperator->pTaskInfo->pStreamRuntimeInfo);
1,168✔
1756
    }
1757

1758
    break;
1,168✔
1759
  }
1760

1761
  if (pRes) {
1,168✔
1762
    qDebug("%s result generated, rows:%" PRId64 , GET_TASKID(pOperator->pTaskInfo), pRes->info.rows);
1,168✔
1763
    pRes->info.version = pOperator->pTaskInfo->version;
1,168✔
1764
    pRes->info.dataLoad = 1;
1,168✔
1765
  } else {
1766
    pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamBlkWinIdx = NULL;
×
1767
    qDebug("%s ext window done", GET_TASKID(pOperator->pTaskInfo));
×
1768
  }
1769

1770
  *ppRes = (pRes && pRes->info.rows > 0) ? pRes : NULL;
1,168✔
1771

1772
_exit:
1,168✔
1773

1774
  if (code != TSDB_CODE_SUCCESS) {
1,168✔
1775
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1776
  }
1777

1778
  return code;
1,168✔
1779
}
1780

1781
static int32_t extWinAggHandleEmptyWins(SOperatorInfo* pOperator, SSDataBlock* pBlock, bool allRemains, SExtWinTimeWindow* pWin) {
2,147,483,647✔
1782
  int32_t code = 0, lino = 0;
2,147,483,647✔
1783
  SExternalWindowOperator* pExtW = (SExternalWindowOperator*)pOperator->info;
2,147,483,647✔
1784
  SExprSupp* pSup = &pOperator->exprSupp;
2,147,483,647✔
1785
  int32_t currIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
2,147,483,647✔
1786

1787
  if (NULL == pExtW->pEmptyInputBlock || (pWin && pWin->tw.skey == pExtW->lastSKey)) {
2,147,483,647✔
1788
    goto _exit;
2,147,483,647✔
1789
  }
1790

1791
  bool ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
2,147,483,647✔
1792
  int32_t endIdx = allRemains ? (pExtW->pWins->size - 1) : (currIdx - 1);
2,147,483,647✔
1793
  SResultRowInfo* pResultRowInfo = &pExtW->binfo.resultRowInfo;
2,147,483,647✔
1794
  SSDataBlock* pInput = pExtW->pEmptyInputBlock;
2,147,483,647✔
1795

1796
  if ((pExtW->lastWinId + 1) <= endIdx) {
2,147,483,647✔
1797
    TAOS_CHECK_EXIT(setInputDataBlock(pSup, pExtW->pEmptyInputBlock, pExtW->binfo.inputTsOrder, MAIN_SCAN, true));
827,328✔
1798
  }
1799
  
1800
  for (int32_t i = pExtW->lastWinId + 1; i <= endIdx; ++i) {
2,147,483,647✔
1801
    SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, i);
2,014,070,922✔
1802

1803
    extWinSetCurWinIdx(pOperator, i);
2,014,070,699✔
1804
    qDebug("%s %dth ext empty window start:%" PRId64 ", end:%" PRId64 ", ascScan:%d",
2,014,070,699✔
1805
           GET_TASKID(pOperator->pTaskInfo), i, pWin->tw.skey, pWin->tw.ekey, ascScan);
1806

1807
    TAOS_CHECK_EXIT(extWinAggSetWinOutputBuf(pOperator, pWin, pSup, &pExtW->aggSup, pOperator->pTaskInfo));
2,014,070,699✔
1808

1809
    updateTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pWin->tw, 1);
2,014,070,922✔
1810
    code = extWinAggDo(pOperator, 0, 1, pInput);
2,014,070,699✔
1811
    pExtW->lastWinId = i;  
2,014,070,922✔
1812
    TAOS_CHECK_EXIT(code);
2,014,070,922✔
1813
  }
1814

1815
  
1816
_exit:
2,147,483,647✔
1817

1818
  if (code) {
2,147,483,647✔
1819
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __FUNCTION__, lino, tstrerror(code));
×
1820
  } else {
1821
    if (pBlock) {
2,147,483,647✔
1822
      TAOS_CHECK_EXIT(setInputDataBlock(pSup, pBlock, pExtW->binfo.inputTsOrder, pBlock->info.scanFlag, true));
2,147,483,647✔
1823
    }
1824

1825
    if (!allRemains) {
2,147,483,647✔
1826
      extWinSetCurWinIdx(pOperator, currIdx);  
2,147,483,647✔
1827
    }
1828
  }
1829

1830
  return code;
2,147,483,647✔
1831
}
1832

1833
static int32_t extWinAggOpen(SOperatorInfo* pOperator, SSDataBlock* pInputBlock) {
5,805,800✔
1834
  SExternalWindowOperator* pExtW = (SExternalWindowOperator*)pOperator->info;
5,805,800✔
1835
  int32_t                  startPos = 0, winRows = 0;
5,805,800✔
1836
  int64_t*                 tsCol = extWinExtractTsCol(pInputBlock, pExtW->primaryTsIndex, pOperator->pTaskInfo);
5,805,800✔
1837
  bool                     ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
5,805,369✔
1838
  int32_t                  code = 0, lino = 0;
5,805,369✔
1839
  SExtWinTimeWindow*       pWin = NULL;
5,805,369✔
1840
  bool                     scalarCalc = false;
5,805,369✔
1841

1842
  while (true) {
1843
    TAOS_CHECK_EXIT((*pExtW->getWinFp)(pOperator, tsCol, &startPos, &pInputBlock->info, &pWin, &winRows));
2,147,483,647✔
1844
    if (pWin == NULL) {
2,147,483,647✔
1845
      break;
5,805,800✔
1846
    }
1847

1848
    TAOS_CHECK_EXIT(extWinAggHandleEmptyWins(pOperator, pInputBlock, false, pWin));
2,147,483,647✔
1849

1850
    qDebug("%s ext window [%" PRId64 ", %" PRId64 ") agg start, ascScan:%d, startPos:%d, winRows:%d",
2,147,483,647✔
1851
           GET_TASKID(pOperator->pTaskInfo), pWin->tw.skey, pWin->tw.ekey, ascScan, startPos, winRows);        
1852

1853
    if (!scalarCalc) {
2,147,483,647✔
1854
      if (pExtW->scalarSupp.pExprInfo) {
5,760,954✔
1855
        SExprSupp* pScalarSup = &pExtW->scalarSupp;
5,832✔
1856
        TAOS_CHECK_EXIT(projectApplyFunctions(pScalarSup->pExprInfo, pInputBlock, pInputBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
5,832✔
1857
                                     pExtW->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo)));
1858
      }
1859
      
1860
      scalarCalc = true;
5,760,954✔
1861
    }
1862

1863
    if (pWin->tw.skey != pExtW->lastSKey || pWin->tw.skey == INT64_MIN) {
2,147,483,647✔
1864
      TAOS_CHECK_EXIT(extWinAggSetWinOutputBuf(pOperator, pWin, &pOperator->exprSupp, &pExtW->aggSup, pOperator->pTaskInfo));
2,147,483,647✔
1865
    }
1866
    
1867
    updateTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pWin->tw, 1);
2,147,483,647✔
1868
    TAOS_CHECK_EXIT(extWinAggDo(pOperator, startPos, winRows, pInputBlock));
2,147,483,647✔
1869
    
1870
    pExtW->lastSKey = pWin->tw.skey;
2,147,483,647✔
1871
    pExtW->lastWinId = extWinGetCurWinIdx(pOperator->pTaskInfo);
2,147,483,647✔
1872
    startPos += winRows;
2,147,483,647✔
1873
  }
1874

1875
_exit:
5,805,800✔
1876

1877
  if (code) {
5,805,800✔
1878
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
1879
  }
1880

1881
  return code;
5,805,800✔
1882
}
1883

1884
static int32_t extWinAggOutputRes(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,919,067✔
1885
  SExternalWindowOperator* pExtW = pOperator->info;
1,919,067✔
1886
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
1,919,067✔
1887
  SSDataBlock*             pBlock = pExtW->binfo.pRes;
1,919,067✔
1888
  int32_t                  code = TSDB_CODE_SUCCESS;
1,919,067✔
1889
  int32_t                  lino = 0;
1,919,067✔
1890
  SExprInfo*               pExprInfo = pOperator->exprSupp.pExprInfo;
1,919,067✔
1891
  int32_t                  numOfExprs = pOperator->exprSupp.numOfExprs;
1,919,067✔
1892
  int32_t*                 rowEntryOffset = pOperator->exprSupp.rowEntryInfoOffset;
1,919,067✔
1893
  SqlFunctionCtx*          pCtx = pOperator->exprSupp.pCtx;
1,919,067✔
1894
  int32_t                  numOfWin = pExtW->outWinIdx;
1,919,067✔
1895

1896
  pBlock->info.version = pTaskInfo->version;
1,919,067✔
1897
  blockDataCleanup(pBlock);
1,919,067✔
1898
  taosArrayClear(pExtW->pWinRowIdx);
1,919,067✔
1899

1900
  for (; pExtW->outputWinId < pExtW->pWins->size; ++pExtW->outputWinId) {
2,147,483,647✔
1901
    SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->outputWinId);
2,147,483,647✔
1902
    int32_t            winIdx = pWin->winOutIdx;
2,147,483,647✔
1903
    if (winIdx < 0) {
2,147,483,647✔
1904
      continue;
23,103,808✔
1905
    }
1906

1907
    pExtW->outputWinNum++;
2,147,483,647✔
1908
    SResultRow* pRow = (SResultRow*)((char*)pExtW->pResultRow + winIdx * pExtW->aggSup.resultRowSize);
2,147,483,647✔
1909

1910
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
2,147,483,647✔
1911

1912
    // no results, continue to check the next one
1913
    if (pRow->numOfRows == 0) {
2,147,483,647✔
1914
      continue;
×
1915
    }
1916

1917
    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
2,147,483,647✔
1918
      uint32_t newSize = pBlock->info.rows + pRow->numOfRows + numOfWin - pExtW->outputWinNum;
1,539,748✔
1919
      TAOS_CHECK_EXIT(blockDataEnsureCapacity(pBlock, newSize));
1,539,748✔
1920
      qDebug("datablock capacity not sufficient, expand to required:%d, current capacity:%d, %s", newSize,
1,539,748✔
1921
             pBlock->info.capacity, GET_TASKID(pTaskInfo));
1922
    }
1923

1924
    TAOS_CHECK_EXIT(copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo));
2,147,483,647✔
1925

1926
    pBlock->info.rows += pRow->numOfRows;
2,147,483,647✔
1927

1928
    TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pExtW->pWinRowIdx, pBlock, pRow->winIdx, pRow->numOfRows));
2,147,483,647✔
1929

1930
    if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
2,147,483,647✔
1931
      ++pExtW->outputWinId;
1,408,810✔
1932
      break;
1,408,810✔
1933
    }
1934
  }
1935

1936
  qDebug("%s result generated, rows:%" PRId64 ", groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
1,918,740✔
1937
         pBlock->info.id.groupId);
1938

1939
  int32_t rowsBeforeFilter = pBlock->info.rows;
1,918,740✔
1940
  SColumnInfoData* pFilterRes = NULL;
1,919,067✔
1941
  TAOS_CHECK_EXIT(doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL, &pFilterRes));
1,919,067✔
1942
  if (pBlock->info.rows < rowsBeforeFilter) {
1,919,067✔
1943
    if (pFilterRes != NULL) {
224✔
1944
      TAOS_CHECK_EXIT(extWinRebuildWinIdxByFilter(pTaskInfo, pExtW->pWinRowIdx, rowsBeforeFilter, pFilterRes));
224✔
1945
    } else {
1946
      // no indicator means all rows are filtered out by short-circuit path
NEW
1947
      taosArrayClear(pExtW->pWinRowIdx);
×
1948
    }
1949
  }
1950

1951
  pBlock->info.dataLoad = 1;
1,919,067✔
1952

1953
  *ppRes = (pBlock->info.rows > 0) ? pBlock : NULL;
1,919,067✔
1954

1955
  if (*ppRes) {
1,919,067✔
1956
    (*ppRes)->info.window.skey = pExtW->orgTableTimeRange.skey;
1,678,900✔
1957
    (*ppRes)->info.window.ekey = pExtW->orgTableTimeRange.ekey;
1,678,900✔
1958
  }
1959
  if (pOperator->pTaskInfo->pStreamRuntimeInfo) {
1,919,067✔
1960
    pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamBlkWinIdx = pExtW->pWinRowIdx;
470,758✔
1961
  }
1962

1963
_exit:
1,916,955✔
1964
  colDataDestroy(pFilterRes);
1,919,067✔
1965
  taosMemoryFree(pFilterRes);
1,919,067✔
1966

1967
  if (code != TSDB_CODE_SUCCESS) {
1,919,067✔
1968
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1969
  }
1970

1971
  return code;
1,919,067✔
1972
}
1973

1974
static int32_t extWinInitResultRow(SExecTaskInfo*        pTaskInfo, SExternalWindowOperator* pExtW, int32_t winNum) {
1,689,194✔
1975
  if (EEXT_MODE_SCALAR == pExtW->mode) {
1,689,194✔
1976
    return TSDB_CODE_SUCCESS;
942✔
1977
  }
1978

1979
  if (winNum <= pExtW->resultRowCapacity) {
1,688,252✔
1980
    return TSDB_CODE_SUCCESS;
137,653✔
1981
  }
1982
  
1983
  taosMemoryFreeClear(pExtW->pResultRow);
1,550,823✔
1984
  pExtW->resultRowCapacity = -1;
1,551,047✔
1985

1986
  int32_t code = 0, lino = 0;
1,550,823✔
1987
  
1988
  pExtW->pResultRow = taosMemoryCalloc(winNum, pExtW->aggSup.resultRowSize);
1,550,823✔
1989
  QUERY_CHECK_NULL(pExtW->pResultRow, code, lino, _exit, terrno);
1,550,823✔
1990

1991
  pExtW->resultRowCapacity = winNum;
1,550,823✔
1992

1993
_exit:
1,550,823✔
1994

1995
  if (code) {
1,550,823✔
1996
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1997
  }
1998

1999
  return code;
1,550,823✔
2000
}
2001

2002
static void extWinFreeResultRow(SExternalWindowOperator* pExtW) {
240,167✔
2003
  if (pExtW->resultRowCapacity * pExtW->aggSup.resultRowSize >= 1048576) {
240,167✔
2004
    taosMemoryFreeClear(pExtW->pResultRow);
1,944✔
2005
    pExtW->resultRowCapacity = -1;
1,944✔
2006
  }
2007
  if (pExtW->binfo.pRes && pExtW->binfo.pRes->info.rows * pExtW->aggSup.resultRowSize >= 1048576) {
240,167✔
2008
    blockDataFreeCols(pExtW->binfo.pRes);
×
2009
  }
2010
}
240,167✔
2011

2012
static int32_t extWinInitWindowList(SExternalWindowOperator* pExtW, SExecTaskInfo*        pTaskInfo) {
240,665✔
2013
  if (taosArrayGetSize(pExtW->pWins) > 0) {
240,665✔
2014
    return TSDB_CODE_SUCCESS;
×
2015
  }
2016
  
2017
  int32_t code = 0, lino = 0;
240,885✔
2018
  SStreamRuntimeFuncInfo* pInfo = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
240,885✔
2019
  size_t size = taosArrayGetSize(pInfo->pStreamPesudoFuncVals);
241,109✔
2020
  SExtWinTimeWindow* pWin = taosArrayReserve(pExtW->pWins, size);
241,109✔
2021
  TSDB_CHECK_NULL(pWin, code, lino, _exit, terrno);
240,885✔
2022

2023
  TAOS_CHECK_EXIT(extWinInitResultRow(pTaskInfo, pExtW, size));
240,885✔
2024

2025
  if (pExtW->timeRangeExpr && pExtW->timeRangeExpr->needCalc) {
241,109✔
2026
    TAOS_CHECK_EXIT(scalarCalculateExtWinsTimeRange(pExtW->timeRangeExpr, pInfo, pWin));
77,082✔
2027
    if (qDebugFlag & DEBUG_DEBUG) {
77,082✔
2028
      for (int32_t i = 0; i < size; ++i) {
265,770✔
2029
        qDebug("%s the %d/%d ext window calced initialized, TR[%" PRId64 ", %" PRId64 ")", 
188,688✔
2030
            pTaskInfo->id.str, i, (int32_t)size, pWin[i].tw.skey, pWin[i].tw.ekey);
2031
      }
2032
    }
2033
  } else {
2034
    for (int32_t i = 0; i < size; ++i) {
12,563,368✔
2035
      SSTriggerCalcParam* pParam = taosArrayGet(pInfo->pStreamPesudoFuncVals, i);
12,399,565✔
2036

2037
      pWin[i].tw.skey = pParam->wstart;
12,399,341✔
2038
      pWin[i].tw.ekey = pParam->wend + ((pInfo->triggerType != STREAM_TRIGGER_SLIDING) ? 1 : 0);
12,399,236✔
2039
      pWin[i].winOutIdx = -1;
12,399,452✔
2040

2041
      qDebug("%s the %d/%d ext window initialized, TR[%" PRId64 ", %" PRId64 ")", 
12,399,117✔
2042
          pTaskInfo->id.str, i, (int32_t)size, pWin[i].tw.skey, pWin[i].tw.ekey);
2043
    }
2044
  }
2045
  
2046
  pExtW->outputWinId = pInfo->curIdx;
240,885✔
2047
  pExtW->lastWinId = -1;
241,109✔
2048
  pExtW->blkWinStartIdx = pInfo->curIdx;
241,109✔
2049

2050
_exit:
241,109✔
2051

2052
  if (code) {
241,109✔
2053
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
2054
  }
2055

2056
  return code;
241,109✔
2057
}
2058

2059
static bool extWinNonAggGotResBlock(SExternalWindowOperator* pExtW) {
2,054✔
2060
  if (pExtW->multiTableMode && !pExtW->inputHasOrder) {
2,054✔
2061
    return false;
1,998✔
2062
  }
2063
  int32_t remainWin = pExtW->outWinIdx - pExtW->outputWinId;
56✔
2064
  if (remainWin > 1 && (NULL == pExtW->timeRangeExpr || !pExtW->timeRangeExpr->needCalc)) {
56✔
2065
    return true;
×
2066
  }
2067
  
2068
  SList* pList = taosArrayGetP(pExtW->pOutputBlocks, pExtW->outputWinId);
56✔
2069
  if (!pList || listNEles(pList) <= 0) {
56✔
2070
    return false;
×
2071
  }
2072
  if (listNEles(pList) > 1) {
56✔
2073
    return true;
×
2074
  }
2075

2076
  SListNode* pNode = listHead(pList);
56✔
2077
  SArray* pIdx = *(SArray**)((SArray**)pNode->data + 1);
56✔
2078
  int32_t* winIdx = taosArrayGetLast(pIdx);
56✔
2079
  if (winIdx && *winIdx < pExtW->blkWinStartIdx) {
56✔
2080
    return true;
×
2081
  }
2082

2083
  return false;
56✔
2084
}
2085

2086
static int32_t getTimeWindowOfBlock(SSDataBlock *pBlock, col_id_t tsSlotId, int64_t *startTs, int64_t *endTs) {
3,399,729✔
2087
  int32_t code = TSDB_CODE_SUCCESS;
3,399,729✔
2088
  int32_t lino = 0;
3,399,729✔
2089
  int32_t tsIndex = -1;
3,399,729✔
2090
  for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
3,942,609✔
2091
    SColumnInfoData *pCol = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
3,942,609✔
2092
    QUERY_CHECK_NULL(pCol, code, lino, _return, terrno)
3,942,609✔
2093
    if (pCol->info.colId == tsSlotId) {
3,942,609✔
2094
      tsIndex = i;
3,399,729✔
2095
      break;
3,399,729✔
2096
    }
2097
  }
2098

2099
  if (tsIndex == -1) {
3,399,729✔
2100
    tsIndex = (int32_t)taosArrayGetSize(pBlock->pDataBlock) - 1;
×
2101
  }
2102

2103
  SColumnInfoData *pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, tsIndex);
3,399,729✔
2104
  QUERY_CHECK_NULL(pColData, code, lino, _return, terrno)
3,399,729✔
2105

2106
  GET_TYPED_DATA(*startTs, int64_t, TSDB_DATA_TYPE_TIMESTAMP, colDataGetNumData(pColData, 0), 0);
3,399,729✔
2107
  GET_TYPED_DATA(*endTs, int64_t, TSDB_DATA_TYPE_TIMESTAMP, colDataGetNumData(pColData, pBlock->info.rows - 1), 0);
3,399,729✔
2108

2109
  return code;
3,399,729✔
2110
_return:
×
2111
  qError("failed to get time window of block, %s code:%s, line:%d", __func__, tstrerror(code), lino);
×
2112
  return code;
×
2113
}
2114

2115
static int32_t extWinOpen(SOperatorInfo* pOperator) {
1,689,418✔
2116
  if (OPTR_IS_OPENED(pOperator) && !pOperator->pOperatorGetParam) {
1,689,418✔
2117
    return TSDB_CODE_SUCCESS;
×
2118
  }
2119
  
2120
  int32_t                  code = 0;
1,689,418✔
2121
  int32_t                  lino = 0;
1,689,418✔
2122
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
1,689,418✔
2123
  SOperatorInfo*           pDownstream = pOperator->pDownstream[0];
1,689,418✔
2124
  SExternalWindowOperator* pExtW = pOperator->info;
1,689,194✔
2125
  SExprSupp*               pSup = &pOperator->exprSupp;
1,689,418✔
2126

2127
  if (pOperator->pOperatorGetParam) {
1,689,194✔
2128
    SOperatorParam*               pParam = (SOperatorParam*)(pOperator->pOperatorGetParam);
1,448,309✔
2129
    SOperatorParam*               pDownParam = (SOperatorParam*)(pOperator->pDownstreamGetParams[0]);
1,448,309✔
2130
    SExchangeOperatorParam*       pExecParam = NULL;
1,448,309✔
2131
    SExternalWindowOperatorParam* pExtPram = (SExternalWindowOperatorParam*)pParam->value;
1,448,309✔
2132

2133
    if (pExtW->pWins) {
1,448,309✔
2134
      taosArrayDestroy(pExtW->pWins);
1,448,309✔
2135
    }
2136

2137
    pExtW->pWins = pExtPram->ExtWins;
1,448,309✔
2138

2139
    TAOS_CHECK_EXIT(extWinInitResultRow(pTaskInfo, pExtW, taosArrayGetSize(pExtW->pWins)));
1,448,309✔
2140
    pExtPram->ExtWins = NULL;
1,448,309✔
2141
    pExtW->outputWinId = 0;
1,448,309✔
2142
    pExtW->lastWinId = -1;
1,448,309✔
2143
    pExtW->blkWinStartIdx = 0;
1,448,309✔
2144
    pExtW->outWinIdx = 0;
1,448,309✔
2145
    pExtW->lastSKey = INT64_MIN;
1,448,309✔
2146
    pExtW->isDynWindow = true;
1,448,309✔
2147
    pExtW->orgTableTimeRange.skey = INT64_MAX;
1,448,309✔
2148
    pExtW->orgTableTimeRange.ekey = INT64_MIN;
1,448,309✔
2149

2150
    QUERY_CHECK_CONDITION(pOperator->numOfDownstream == 1, code, lino, _exit, TSDB_CODE_INVALID_PARA)
1,448,309✔
2151

2152
    switch (pDownParam->opType) {
1,448,309✔
2153
      case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: {
1,448,309✔
2154
        pExecParam = (SExchangeOperatorParam*)((SOperatorParam*)(pOperator->pDownstreamGetParams[0]))->value;
1,448,309✔
2155
        if (!pExecParam->multiParams) {
1,448,309✔
2156
          pExecParam->basic.vgId = pExtW->orgTableVgId;
1,346,229✔
2157
          taosArrayClear(pExecParam->basic.uidList);
1,346,229✔
2158
          QUERY_CHECK_NULL(taosArrayPush(pExecParam->basic.uidList, &pExtW->orgTableUid), code, lino, _exit, terrno)
2,692,458✔
2159
        }
2160
        break;
1,448,309✔
2161
      }
2162
      default:
×
2163
        break;
×
2164
    }
2165

2166
    freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
1,448,309✔
2167
    pOperator->pOperatorGetParam = NULL;
1,448,309✔
2168
  } else {
2169
    TAOS_CHECK_EXIT(extWinInitWindowList(pExtW, pTaskInfo));
240,885✔
2170
  }
2171

2172
  while (1) {
5,807,854✔
2173
    pExtW->blkWinIdx = -1;
7,497,272✔
2174
    pExtW->blkWinStartSet = false;
7,497,272✔
2175
    pExtW->blkRowStartIdx = 0;
7,497,272✔
2176

2177
    SSDataBlock* pBlock = getNextBlockFromDownstreamRemain(pOperator, 0);
7,497,272✔
2178
    if (pOperator->pDownstreamGetParams) {
7,497,272✔
2179
      pOperator->pDownstreamGetParams[0] = NULL;
4,848,038✔
2180
    }
2181
    if (pBlock == NULL) {
7,497,272✔
2182
      if (EEXT_MODE_AGG == pExtW->mode) {
1,689,418✔
2183
        TAOS_CHECK_EXIT(extWinAggHandleEmptyWins(pOperator, pBlock, true, NULL));
1,688,476✔
2184
      }
2185
      pExtW->blkWinStartIdx = pExtW->pWins->size;
1,689,418✔
2186
      break;
1,689,418✔
2187
    }
2188

2189
    if (pExtW->isDynWindow) {
5,807,854✔
2190
      TSKEY skey = 0;
3,399,729✔
2191
      TSKEY ekey = 0;
3,399,729✔
2192
      code = getTimeWindowOfBlock(pBlock, pExtW->primaryTsIndex, &skey, &ekey);
3,399,729✔
2193
      QUERY_CHECK_CODE(code, lino, _exit);
3,399,729✔
2194
      pExtW->orgTableTimeRange.skey = TMIN(pExtW->orgTableTimeRange.skey, skey);
3,399,729✔
2195
      pExtW->orgTableTimeRange.ekey = TMAX(pExtW->orgTableTimeRange.ekey, ekey);
3,399,729✔
2196
    }
2197

2198
    printDataBlock(pBlock, __func__, pTaskInfo->id.str, pTaskInfo->id.queryId);
5,807,854✔
2199

2200
    qDebug("ext window mode:%d got %" PRId64 " rows from downstream", pExtW->mode, pBlock->info.rows);
5,807,854✔
2201
    
2202
    switch (pExtW->mode) {
5,807,638✔
2203
      case EEXT_MODE_SCALAR:
2,054✔
2204
        TAOS_CHECK_EXIT(extWinProjectOpen(pOperator, pBlock));
2,054✔
2205
        if (extWinNonAggGotResBlock(pExtW)) {
2,054✔
2206
          return code;
×
2207
        }
2208
        break;
2,054✔
2209
      case EEXT_MODE_AGG:
5,805,800✔
2210
        TAOS_CHECK_EXIT(extWinAggOpen(pOperator, pBlock));
5,805,800✔
2211
        break;
5,805,800✔
2212
      case EEXT_MODE_INDEFR_FUNC:
×
2213
        TAOS_CHECK_EXIT(extWinIndefRowsOpen(pOperator, pBlock));
×
2214
        if (extWinNonAggGotResBlock(pExtW)) {
×
2215
          return code;
×
2216
        }
2217
        break;
×
2218
      default:
×
2219
        break;
×
2220
    }
2221
  }
2222

2223
  if (pOperator->pOperatorGetParam) {
1,689,418✔
2224
    freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
×
2225
    pOperator->pOperatorGetParam = NULL;
×
2226
  }
2227
  OPTR_SET_OPENED(pOperator);
1,689,418✔
2228

2229
#if 0
2230
  if (pExtW->mode == EEXT_MODE_AGG) {
2231
    qDebug("ext window before dump final rows num:%d", tSimpleHashGetSize(pExtW->aggSup.pResultRowHashTable));
2232

2233
    code = initGroupedResultInfo(&pExtW->groupResInfo, pExtW->aggSup.pResultRowHashTable, pExtW->binfo.inputTsOrder);
2234
    QUERY_CHECK_CODE(code, lino, _exit);
2235

2236
    qDebug("ext window after dump final rows num:%d", tSimpleHashGetSize(pExtW->aggSup.pResultRowHashTable));
2237
  }
2238
#endif
2239

2240
_exit:
1,689,418✔
2241

2242
  if (code != 0) {
1,689,418✔
2243
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
2244
    pTaskInfo->code = code;
×
2245
    T_LONG_JMP(pTaskInfo->env, code);
×
2246
  }
2247
  
2248
  return code;
1,689,418✔
2249
}
2250

2251
static int32_t extWinNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,920,235✔
2252
  int32_t                  code = 0;
1,920,235✔
2253
  int32_t                  lino = 0;
1,920,235✔
2254
  SExternalWindowOperator* pExtW = pOperator->info;
1,920,235✔
2255
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
1,920,235✔
2256

2257
  if (pOperator->status == OP_EXEC_DONE && !pOperator->pOperatorGetParam) {
1,920,235✔
2258
    *ppRes = NULL;
×
2259
    return code;
×
2260
  }
2261

2262
  if (pOperator->pOperatorGetParam) {
1,920,235✔
2263
    if (pOperator->status == OP_EXEC_DONE) {
1,448,309✔
2264
      pOperator->status = OP_NOT_OPENED;
46,947✔
2265
    }
2266
  }
2267

2268
  extWinRecycleBlkNode(pExtW, &pExtW->pLastBlkNode);
1,920,011✔
2269

2270
  if (pOperator->status == OP_NOT_OPENED) {
1,920,235✔
2271
    TAOS_CHECK_EXIT(pOperator->fpSet._openFn(pOperator));
1,689,418✔
2272
  }
2273

2274
  if (pExtW->mode == EEXT_MODE_SCALAR || pExtW->mode == EEXT_MODE_INDEFR_FUNC) {
1,920,235✔
2275
    TAOS_CHECK_EXIT(extWinNonAggOutputRes(pOperator, ppRes));
1,168✔
2276
    if (NULL == *ppRes) {
1,168✔
2277
      setOperatorCompleted(pOperator);
×
2278
      extWinFreeResultRow(pExtW);
×
2279
    }
2280
  } else {
2281
#if 0    
2282
    doBuildResultDatablock(pOperator, &pExtW->binfo, &pExtW->groupResInfo, pExtW->aggSup.pResultBuf);
2283
    bool hasRemain = hasRemainResults(&pExtW->groupResInfo);
2284
    if (!hasRemain) {
2285
      setOperatorCompleted(pOperator);
2286
      break;
2287
    }
2288
    if (pExtW->binfo.pRes->info.rows > 0) break;
2289
#else
2290
    while (1) {
2291
      TAOS_CHECK_EXIT(extWinAggOutputRes(pOperator, ppRes));
1,919,067✔
2292
      if (NULL != *ppRes) {
1,919,067✔
2293
        break;
1,678,900✔
2294
      }
2295

2296
      if (pExtW->outputWinId >= pExtW->pWins->size) {
240,167✔
2297
        setOperatorCompleted(pOperator);
240,167✔
2298
        if (pTaskInfo->pStreamRuntimeInfo) {
240,167✔
2299
          extWinFreeResultRow(pExtW);
240,167✔
2300
        }
2301
        break;
239,943✔
2302
      }
2303
    }
2304
#endif      
2305
  }
2306

2307
  if (*ppRes) {
1,920,011✔
2308
    pOperator->resultInfo.totalRows += (*ppRes)->info.rows;
1,680,068✔
2309
    printDataBlock(*ppRes, __func__, GET_TASKID(pTaskInfo), pTaskInfo->id.queryId);
1,680,068✔
2310
  }
2311
  
2312
_exit:
239,943✔
2313

2314
  if (code) {
1,920,235✔
2315
    qError("%s %s failed at line %d since %s", GET_TASKID(pTaskInfo), __func__, lino, tstrerror(code));
×
2316
    pTaskInfo->code = code;
×
2317
    T_LONG_JMP(pTaskInfo->env, code);
×
2318
  }
2319

2320
  if ((*ppRes) && (*ppRes)->info.rows <= 0) {
1,920,235✔
2321
    *ppRes = NULL;
×
2322
  }
2323

2324
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM && (*ppRes)) {
1,920,235✔
2325
    printDataBlock(*ppRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo), pTaskInfo->id.queryId);
231,759✔
2326
  }
2327
  
2328
  return code;
1,920,011✔
2329
}
2330

2331

2332
int32_t createExternalWindowOperator(SOperatorInfo* pDownstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo,
1,539,751✔
2333
                                     SOperatorInfo** pOptrOut) {
2334
  SExternalWindowPhysiNode* pPhynode = (SExternalWindowPhysiNode*)pNode;
1,539,751✔
2335
  QRY_PARAM_CHECK(pOptrOut);
1,539,751✔
2336
  int32_t                  code = 0;
1,539,751✔
2337
  int32_t                  lino = 0;
1,539,751✔
2338
  SExternalWindowOperator* pExtW = taosMemoryCalloc(1, sizeof(SExternalWindowOperator));
1,539,751✔
2339
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1,539,751✔
2340
  pOperator->pPhyNode = pNode;
1,539,751✔
2341
  if (!pExtW || !pOperator) {
1,539,751✔
2342
    code = terrno;
×
2343
    lino = __LINE__;
×
2344
    goto _error;
×
2345
  }
2346
  
2347
  setOperatorInfo(pOperator, "ExternalWindowOperator", QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW, true, OP_NOT_OPENED,
1,539,751✔
2348
                  pExtW, pTaskInfo);
2349
                  
2350
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhynode->window.node.pOutputDataBlockDesc);
1,539,751✔
2351
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
1,539,751✔
2352
  initBasicInfo(&pExtW->binfo, pResBlock);
1,539,751✔
2353

2354
  pExtW->primaryTsIndex = ((SColumnNode*)pPhynode->window.pTspk)->slotId;
1,539,751✔
2355
  pExtW->mode = pPhynode->window.pProjs ? EEXT_MODE_SCALAR : (pPhynode->window.indefRowsFunc ? EEXT_MODE_INDEFR_FUNC : EEXT_MODE_AGG);
1,539,751✔
2356
  pExtW->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC;
1,539,751✔
2357
  pExtW->binfo.outputTsOrder = pExtW->binfo.inputTsOrder;
1,539,751✔
2358
  pExtW->isDynWindow = false;
1,539,751✔
2359

2360
  if (pTaskInfo->pStreamRuntimeInfo != NULL){
1,539,751✔
2361
    pTaskInfo->pStreamRuntimeInfo->funcInfo.withExternalWindow = true;
138,389✔
2362
  }
2363

2364
  // pExtW->limitInfo = (SLimitInfo){0};
2365
  // initLimitInfo(pPhynode->window.node.pLimit, pPhynode->window.node.pSlimit, &pExtW->limitInfo);
2366

2367
  if (pPhynode->window.pProjs) {
1,539,751✔
2368
    int32_t    numOfScalarExpr = 0;
942✔
2369
    SExprInfo* pScalarExprInfo = NULL;
942✔
2370
    code = createExprInfo(pPhynode->window.pProjs, NULL, &pScalarExprInfo, &numOfScalarExpr);
942✔
2371
    QUERY_CHECK_CODE(code, lino, _error);
942✔
2372

2373
    code = initExprSupp(&pExtW->scalarSupp, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
942✔
2374
    QUERY_CHECK_CODE(code, lino, _error);
942✔
2375

2376
  //if (pExtW->multiTableMode) {
2377
    pExtW->pOutputBlocks = taosArrayInit_s(POINTER_BYTES, STREAM_CALC_REQ_MAX_WIN_NUM);
942✔
2378
    if (!pExtW->pOutputBlocks) QUERY_CHECK_CODE(terrno, lino, _error);
942✔
2379
  //}
2380
    pExtW->pFreeBlocks = tdListNew(POINTER_BYTES * 2);
942✔
2381
    QUERY_CHECK_NULL(pExtW->pFreeBlocks, code, lino, _error, terrno);
942✔
2382
  } else if (pExtW->mode == EEXT_MODE_AGG) {
1,538,809✔
2383
    if (pPhynode->window.pExprs != NULL) {
1,538,809✔
2384
      int32_t    num = 0;
1,944✔
2385
      SExprInfo* pSExpr = NULL;
1,944✔
2386
      code = createExprInfo(pPhynode->window.pExprs, NULL, &pSExpr, &num);
1,944✔
2387
      QUERY_CHECK_CODE(code, lino, _error);
1,944✔
2388
    
2389
      code = initExprSupp(&pExtW->scalarSupp, pSExpr, num, &pTaskInfo->storageAPI.functionStore);
1,944✔
2390
      if (code != TSDB_CODE_SUCCESS) {
1,944✔
2391
        goto _error;
×
2392
      }
2393
      checkIndefRowsFuncs(&pExtW->scalarSupp);
1,944✔
2394
    }
2395
    
2396
    size_t keyBufSize = sizeof(int64_t) * 2 + POINTER_BYTES;
1,538,809✔
2397
    initResultSizeInfo(&pOperator->resultInfo, 4096);
1,538,809✔
2398
    //code = blockDataEnsureCapacity(pExtW->binfo.pRes, pOperator->resultInfo.capacity);
2399
    //QUERY_CHECK_CODE(code, lino, _error);
2400

2401
    pExtW->pWinRowIdx = taosArrayInit(4096, sizeof(int64_t));
1,538,809✔
2402
    TSDB_CHECK_NULL(pExtW->pWinRowIdx, code, lino, _error, terrno);
1,538,809✔
2403
    
2404
    int32_t num = 0;
1,538,809✔
2405
    SExprInfo* pExprInfo = NULL;
1,538,809✔
2406
    code = createExprInfo(pPhynode->window.pFuncs, NULL, &pExprInfo, &num);
1,538,809✔
2407
    QUERY_CHECK_CODE(code, lino, _error);
1,538,593✔
2408
    pOperator->exprSupp.hasWindow = true;
1,538,593✔
2409
    pOperator->exprSupp.hasWindowOrGroup = true;
1,538,809✔
2410
    code = initAggSup(&pOperator->exprSupp, &pExtW->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, 0, 0);
1,538,809✔
2411
    QUERY_CHECK_CODE(code, lino, _error);
1,538,809✔
2412

2413
    code = filterInitFromNode((SNode*)pNode->pConditions, &pOperator->exprSupp.pFilterInfo, 0,
1,538,698✔
2414
                              pTaskInfo->pStreamRuntimeInfo);
1,538,809✔
2415
    QUERY_CHECK_CODE(code, lino, _error);
1,538,809✔
2416

2417
    nodesWalkExprs(pPhynode->window.pFuncs, extWinHasCountLikeFunc, &pExtW->hasCountFunc);
1,538,809✔
2418
    if (pExtW->hasCountFunc) {
1,538,698✔
2419
      code = extWinCreateEmptyInputBlock(pOperator, &pExtW->pEmptyInputBlock);
745,167✔
2420
      QUERY_CHECK_CODE(code, lino, _error);
745,278✔
2421
      qDebug("%s ext window has CountLikeFunc", pOperator->pTaskInfo->id.str);
745,278✔
2422
    } else {
2423
      qDebug("%s ext window doesn't have CountLikeFunc", pOperator->pTaskInfo->id.str);
793,531✔
2424
    }
2425

2426
    code = initExecTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pTaskInfo->window);
1,538,809✔
2427
    QUERY_CHECK_CODE(code, lino, _error);
1,538,809✔
2428

2429
    pExtW->lastSKey = INT64_MIN;
1,538,809✔
2430
  } else {
2431
    size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
×
2432
    
2433
    if (pPhynode->window.pExprs != NULL) {
×
2434
      int32_t    num = 0;
×
2435
      SExprInfo* pSExpr = NULL;
×
2436
      code = createExprInfo(pPhynode->window.pExprs, NULL, &pSExpr, &num);
×
2437
      QUERY_CHECK_CODE(code, lino, _error);
×
2438
    
2439
      code = initExprSupp(&pExtW->scalarSupp, pSExpr, num, &pTaskInfo->storageAPI.functionStore);
×
2440
      if (code != TSDB_CODE_SUCCESS) {
×
2441
        goto _error;
×
2442
      }
2443
    }
2444
    
2445
    int32_t    numOfExpr = 0;
×
2446
    SExprInfo* pExprInfo = NULL;
×
2447
    code = createExprInfo(pPhynode->window.pFuncs, NULL, &pExprInfo, &numOfExpr);
×
2448
    TSDB_CHECK_CODE(code, lino, _error);
×
2449
    
2450
    code = initAggSup(&pOperator->exprSupp, &pExtW->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str,
×
2451
                              pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
×
2452
    TSDB_CHECK_CODE(code, lino, _error);
×
2453
    pOperator->exprSupp.hasWindowOrGroup = false;
×
2454
    
2455
    //code = setFunctionResultOutput(pOperator, &pExtW->binfo, &pExtW->aggSup, MAIN_SCAN, numOfExpr);
2456
    //TSDB_CHECK_CODE(code, lino, _error);
2457
    
2458
    code = filterInitFromNode((SNode*)pNode->pConditions, &pOperator->exprSupp.pFilterInfo, 0,
×
2459
                              pTaskInfo->pStreamRuntimeInfo);
×
2460
    TSDB_CHECK_CODE(code, lino, _error);
×
2461
    
2462
    pExtW->binfo.inputTsOrder = pNode->inputTsOrder;
×
2463
    pExtW->binfo.outputTsOrder = pNode->outputTsOrder;
×
2464
    code = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfExpr, &pExtW->pPseudoColInfo);
×
2465
    TSDB_CHECK_CODE(code, lino, _error);
×
2466

2467
  //if (pExtW->multiTableMode) {
2468
    pExtW->pOutputBlocks = taosArrayInit_s(POINTER_BYTES, STREAM_CALC_REQ_MAX_WIN_NUM);
×
2469
    if (!pExtW->pOutputBlocks) QUERY_CHECK_CODE(terrno, lino, _error);
×
2470
  //}
2471
    pExtW->pFreeBlocks = tdListNew(POINTER_BYTES * 2);
×
2472
    QUERY_CHECK_NULL(pExtW->pFreeBlocks, code, lino, _error, terrno);  
×
2473
  }
2474

2475
  pExtW->pWins = taosArrayInit(4096, sizeof(SExtWinTimeWindow));
1,539,751✔
2476
  if (!pExtW->pWins) QUERY_CHECK_CODE(terrno, lino, _error);
1,539,751✔
2477
  
2478
  //initResultRowInfo(&pExtW->binfo.resultRowInfo);
2479

2480
  pExtW->timeRangeExpr = (STimeRangeNode*)pPhynode->pTimeRange;
1,539,751✔
2481
  if (pExtW->timeRangeExpr) {
1,539,751✔
2482
    QUERY_CHECK_NULL(pExtW->timeRangeExpr->pStart, code, lino, _error, TSDB_CODE_STREAM_INTERNAL_ERROR);
138,389✔
2483
    QUERY_CHECK_NULL(pExtW->timeRangeExpr->pEnd, code, lino, _error, TSDB_CODE_STREAM_INTERNAL_ERROR);
138,389✔
2484
  }
2485

2486
  if (pPhynode->isSingleTable) {
1,539,751✔
2487
    pExtW->getWinFp = (pExtW->timeRangeExpr && (pExtW->timeRangeExpr->needCalc || (pTaskInfo->pStreamRuntimeInfo->funcInfo.addOptions & CALC_SLIDING_OVERLAP))) ? extWinGetOvlpWin : extWinGetNoOvlpWin;
1,346,647✔
2488
    pExtW->multiTableMode = false;
1,346,647✔
2489
  } else {
2490
    pExtW->getWinFp = (pExtW->timeRangeExpr && (pExtW->timeRangeExpr->needCalc || (pTaskInfo->pStreamRuntimeInfo->funcInfo.addOptions & CALC_SLIDING_OVERLAP))) ? extWinGetMultiTbOvlpWin : extWinGetMultiTbNoOvlpWin;
193,104✔
2491
    pExtW->multiTableMode = true;
193,104✔
2492
  }
2493
  pExtW->inputHasOrder = pPhynode->inputHasOrder;
1,539,751✔
2494
  pExtW->orgTableUid = pPhynode->orgTableUid;
1,539,751✔
2495
  pExtW->orgTableVgId = pPhynode->orgTableVgId;
1,539,751✔
2496

2497
  pOperator->fpSet = createOperatorFpSet(extWinOpen, extWinNext, NULL, destroyExternalWindowOperatorInfo,
1,539,751✔
2498
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2499
  setOperatorResetStateFn(pOperator, resetExternalWindowOperator);
1,539,751✔
2500
  code = appendDownstream(pOperator, &pDownstream, 1);
1,539,751✔
2501
  if (code != 0) {
1,539,751✔
2502
    goto _error;
×
2503
  }
2504

2505
  *pOptrOut = pOperator;
1,539,751✔
2506
  return code;
1,539,751✔
2507

2508
_error:
×
2509

2510
  if (pExtW != NULL) {
×
2511
    destroyExternalWindowOperatorInfo(pExtW);
×
2512
  }
2513

2514
  destroyOperatorAndDownstreams(pOperator, &pDownstream, 1);
×
2515
  pTaskInfo->code = code;
×
2516
  qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code));
×
2517
  return code;
×
2518
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc