• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4934

21 Jan 2026 06:06AM UTC coverage: 66.691% (+0.02%) from 66.671%
#4934

push

travis-ci

web-flow
 enh:stmt support interval opt (#34335)

0 of 26 new or added lines in 3 files covered. (0.0%)

562 existing lines in 98 files now uncovered.

203203 of 304692 relevant lines covered (66.69%)

129902664.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.63
/source/libs/executor/src/externalwindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "operator.h"
18
#include "querytask.h"
19
#include "tdatablock.h"
20
#include "stream.h"
21
#include "filter.h"
22
#include "cmdnodes.h"
23

24
typedef struct SBlockList {
25
  const SSDataBlock* pSrcBlock;
26
  SList*             pBlocks;
27
  int32_t            blockRowNumThreshold;
28
} SBlockList;
29

30

31
typedef int32_t (*extWinGetWinFp)(SOperatorInfo*, int64_t*, int32_t*, SDataBlockInfo*, SExtWinTimeWindow**, int32_t*);
32

33
typedef struct SExtWindowStat {
34
  int64_t resBlockCreated;
35
  int64_t resBlockDestroyed;
36
  int64_t resBlockRecycled;
37
  int64_t resBlockReused;
38
  int64_t resBlockAppend;
39
} SExtWindowStat;
40

41
typedef struct SExternalWindowOperator {
42
  SOptrBasicInfo     binfo;
43
  SExprSupp          scalarSupp;
44
  int32_t            primaryTsIndex;
45
  EExtWinMode        mode;
46
  bool               multiTableMode;
47
  bool               inputHasOrder;
48
  SArray*            pWins;           // SArray<SExtWinTimeWindow>
49
  SArray*            pPseudoColInfo;  
50
  STimeRangeNode*    timeRangeExpr;
51
  
52
  extWinGetWinFp     getWinFp;
53

54
  bool               blkWinStartSet;
55
  int32_t            blkWinStartIdx;
56
  int32_t            blkWinIdx;
57
  int32_t            blkRowStartIdx;
58
  int32_t            outputWinId;
59
  int32_t            outputWinNum;
60
  int32_t            outWinIdx;
61

62
  // for project&indefRows
63
  SList*             pFreeBlocks;    // SList<SSDatablock*+SAarray*>
64
  SArray*            pOutputBlocks;  // SArray<SList*>, for each window, we have a list of blocks
65
  SListNode*         pLastBlkNode; 
66
  SSDataBlock*       pTmpBlock;
67
  
68
  // for agg
69
  SAggSupporter      aggSup;
70
  STimeWindowAggSupp twAggSup;
71

72
  int32_t            resultRowCapacity;
73
  SResultRow*        pResultRow;
74

75
  int64_t            lastSKey;
76
  int32_t            lastWinId;
77
  SSDataBlock*       pEmptyInputBlock;
78
  bool               hasCountFunc;
79
  SExtWindowStat     stat;
80
  SArray*            pWinRowIdx;
81

82
  // for vtable window query
83
  bool               isDynWindow;
84
  int32_t            orgTableVgId;
85
  tb_uid_t           orgTableUid;
86
  STimeWindow        orgTableTimeRange;
87
} SExternalWindowOperator;
88

89

90
static int32_t extWinBlockListAddBlock(SExternalWindowOperator* pExtW, SList* pList, int32_t rows, SSDataBlock** ppBlock, SArray** ppIdx) {
490✔
91
  SSDataBlock* pRes = NULL;
490✔
92
  int32_t code = 0, lino = 0;
490✔
93

94
  if (listNEles(pExtW->pFreeBlocks) > 0) {
490✔
95
    SListNode* pNode = tdListPopHead(pExtW->pFreeBlocks);
×
96
    *ppBlock = *(SSDataBlock**)pNode->data;
×
97
    *ppIdx = *(SArray**)((SArray**)pNode->data + 1);
×
98
    tdListAppendNode(pList, pNode);
×
99
    pExtW->stat.resBlockReused++;
×
100
  } else {
101
    TAOS_CHECK_EXIT(createOneDataBlock(pExtW->binfo.pRes, false, &pRes));
490✔
102
    TAOS_CHECK_EXIT(blockDataEnsureCapacity(pRes, TMAX(rows, 4096)));
490✔
103
    SArray* pIdx = taosArrayInit(10, sizeof(int64_t));
490✔
104
    TSDB_CHECK_NULL(pIdx, code, lino, _exit, terrno);
490✔
105
    void* res[2] = {pRes, pIdx};
490✔
106
    TAOS_CHECK_EXIT(tdListAppend(pList, res));
490✔
107

108
    *ppBlock = pRes;
490✔
109
    *ppIdx = pIdx;
490✔
110
    pExtW->stat.resBlockCreated++;
490✔
111
  }
112
  
113
_exit:
490✔
114

115
  if (code) {
490✔
116
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
117
    blockDataDestroy(pRes);
×
118
  }
119
  
120
  return code;
490✔
121
}
122

123
static int32_t extWinGetLastBlockFromList(SExternalWindowOperator* pExtW, SList* pList, int32_t rows, SSDataBlock** ppBlock, SArray** ppIdx) {
1,147✔
124
  int32_t    code = 0, lino = 0;
1,147✔
125
  SSDataBlock* pRes = NULL;
1,147✔
126

127
  SListNode* pNode = TD_DLIST_TAIL(pList);
1,147✔
128
  if (NULL == pNode) {
1,147✔
129
    TAOS_CHECK_EXIT(extWinBlockListAddBlock(pExtW, pList, rows, ppBlock, ppIdx));
490✔
130
    return code;
490✔
131
  }
132

133
  pRes = *(SSDataBlock**)pNode->data;
657✔
134
  if ((pRes->info.rows + rows) > pRes->info.capacity) {
657✔
135
    TAOS_CHECK_EXIT(extWinBlockListAddBlock(pExtW, pList, rows, ppBlock, ppIdx));
×
136
    return code;
×
137
  }
138

139
  *ppIdx = *(SArray**)((SSDataBlock**)pNode->data + 1);
657✔
140
  *ppBlock = pRes;
657✔
141
  pExtW->stat.resBlockAppend++;
657✔
142

143
_exit:
657✔
144

145
  if (code) {
657✔
146
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
147
  }
148
  
149
  return code;
657✔
150
}
151

152
static void extWinDestroyBlockList(void* p) {
11,100,160✔
153
  if (NULL == p) {
11,100,160✔
154
    return;
×
155
  }
156

157
  SListNode* pTmp = NULL;
11,100,160✔
158
  SList** ppList = (SList**)p;
11,100,160✔
159
  if ((*ppList) && TD_DLIST_NELES(*ppList) > 0) {
11,100,160✔
160
    SListNode* pNode = TD_DLIST_HEAD(*ppList);
×
161
    while (pNode) {
×
162
      SSDataBlock* pBlock = *(SSDataBlock**)pNode->data;
×
163
      blockDataDestroy(pBlock);
×
164
      SArray* pIdx = *(SArray**)((SArray**)pNode->data + 1);
×
165
      taosArrayDestroy(pIdx);
×
166
      pTmp = pNode;
×
167
      pNode = pNode->dl_next_;
×
168
      taosMemoryFree(pTmp);
×
169
    }
170
  }
171
  taosMemoryFree(*ppList);
11,100,160✔
172
}
173

174

175
static void extWinRecycleBlkNode(SExternalWindowOperator* pExtW, SListNode** ppNode) {
1,357,307✔
176
  if (NULL == ppNode || NULL == *ppNode) {
1,357,307✔
177
    return;
1,357,510✔
178
  }
179

180
  SSDataBlock* pBlock = *(SSDataBlock**)(*ppNode)->data;
5✔
181
  SArray* pIdx = *(SArray**)((SArray**)(*ppNode)->data + 1);
219✔
182
  
183
  if (listNEles(pExtW->pFreeBlocks) >= 10) {
219✔
184
    blockDataDestroy(pBlock);
×
185
    taosArrayDestroy(pIdx);
×
186
    taosMemoryFreeClear(*ppNode);
×
187
    pExtW->stat.resBlockDestroyed++;
×
188
    return;
×
189
  }
190
  
191
  blockDataCleanup(pBlock);
219✔
192
  taosArrayClear(pIdx);
219✔
193
  tdListPrependNode(pExtW->pFreeBlocks, *ppNode);
219✔
194
  *ppNode = NULL;
219✔
195
  pExtW->stat.resBlockRecycled++;
219✔
196
}
197

198
static void extWinRecycleBlockList(SExternalWindowOperator* pExtW, void* p) {
490✔
199
  if (NULL == p) {
490✔
200
    return;
×
201
  }
202

203
  SListNode* pTmp = NULL;
490✔
204
  SList** ppList = (SList**)p;
490✔
205
  if ((*ppList) && TD_DLIST_NELES(*ppList) > 0) {
490✔
206
    SListNode* pNode = TD_DLIST_HEAD(*ppList);
×
207
    while (pNode) {
×
208
      pTmp = pNode;
×
209
      pNode = pNode->dl_next_;
×
210
      extWinRecycleBlkNode(pExtW, &pTmp);
×
211
    }
212
  }
213
  taosMemoryFree(*ppList);
490✔
214
}
215
static void extWinDestroyBlkNode(SExternalWindowOperator* pInfo, SListNode* pNode) {
843,330✔
216
  if (NULL == pNode) {
843,330✔
217
    return;
842,840✔
218
  }
219

220
  SSDataBlock* pBlock = *(SSDataBlock**)pNode->data;
490✔
221
  SArray* pIdx = *(SArray**)((SArray**)pNode->data + 1);
490✔
222
  
223
  blockDataDestroy(pBlock);
490✔
224
  taosArrayDestroy(pIdx);
490✔
225

226
  taosMemoryFree(pNode);
490✔
227

228
  pInfo->stat.resBlockDestroyed++;
490✔
229
}
230

231

232
void destroyExternalWindowOperatorInfo(void* param) {
843,111✔
233
  if (NULL == param) {
843,111✔
234
    return;
×
235
  }
236
  SExternalWindowOperator* pInfo = (SExternalWindowOperator*)param;
843,111✔
237
  cleanupBasicInfo(&pInfo->binfo);
843,111✔
238

239
  taosArrayDestroyEx(pInfo->pOutputBlocks, extWinDestroyBlockList);
843,111✔
240
  taosArrayDestroy(pInfo->pWins);
843,111✔
241
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
843,111✔
242
  taosArrayDestroy(pInfo->pWinRowIdx);
843,111✔
243
  
244
  taosArrayDestroy(pInfo->pPseudoColInfo);
843,111✔
245
  blockDataDestroy(pInfo->pTmpBlock);
843,111✔
246
  blockDataDestroy(pInfo->pEmptyInputBlock);
843,111✔
247

248
  extWinDestroyBlkNode(pInfo, pInfo->pLastBlkNode);
843,111✔
249
  if (pInfo->pFreeBlocks) {
843,111✔
250
    SListNode *node;
251
    while ((node = TD_DLIST_HEAD(pInfo->pFreeBlocks)) != NULL) {
490✔
252
      TD_DLIST_POP(pInfo->pFreeBlocks, node);
219✔
253
      extWinDestroyBlkNode(pInfo, node);
219✔
254
    }
255
    taosMemoryFree(pInfo->pFreeBlocks);
271✔
256
  }
257
  
258
  cleanupAggSup(&pInfo->aggSup);
843,111✔
259
  cleanupExprSupp(&pInfo->scalarSupp);
843,111✔
260
  taosMemoryFreeClear(pInfo->pResultRow);
843,111✔
261

262
  pInfo->binfo.resultRowInfo.openWindow = tdListFree(pInfo->binfo.resultRowInfo.openWindow);
843,111✔
263

264
  qDebug("ext window stat at destroy, created:%" PRId64 ", destroyed:%" PRId64 ", recycled:%" PRId64 ", reused:%" PRId64 ", append:%" PRId64, 
843,111✔
265
      pInfo->stat.resBlockCreated, pInfo->stat.resBlockDestroyed, pInfo->stat.resBlockRecycled, 
266
      pInfo->stat.resBlockReused, pInfo->stat.resBlockAppend);
267

268
  taosMemoryFreeClear(pInfo);
843,111✔
269
}
270

271
static int32_t extWinOpen(SOperatorInfo* pOperator);
272
static int32_t extWinNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
273

274
typedef struct SMergeAlignedExternalWindowOperator {
275
  SExternalWindowOperator* pExtW;
276
  int64_t curTs;
277
  SResultRow*  pResultRow;
278
} SMergeAlignedExternalWindowOperator;
279

280
void destroyMergeAlignedExternalWindowOperator(void* pOperator) {
2,128✔
281
  SMergeAlignedExternalWindowOperator* pMlExtInfo = (SMergeAlignedExternalWindowOperator*)pOperator;
2,128✔
282
  destroyExternalWindowOperatorInfo(pMlExtInfo->pExtW);
2,128✔
283
  taosMemoryFreeClear(pMlExtInfo);
2,128✔
284
}
2,128✔
285

286
int64_t* extWinExtractTsCol(SSDataBlock* pBlock, int32_t primaryTsIndex, SExecTaskInfo* pTaskInfo) {
4,284,526✔
287
  TSKEY* tsCols = NULL;
4,284,526✔
288

289
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
4,284,526✔
290
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, primaryTsIndex);
4,284,948✔
291
    if (!pColDataInfo) {
4,284,948✔
292
      pTaskInfo->code = terrno;
×
293
      T_LONG_JMP(pTaskInfo->env, terrno);
×
294
    }
295

296
    tsCols = (int64_t*)pColDataInfo->pData;
4,284,948✔
297
    if (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0) {
4,284,740✔
298
      int32_t code = blockDataUpdateTsWindow(pBlock, primaryTsIndex);
4,284,734✔
299
      if (code != TSDB_CODE_SUCCESS) {
4,284,743✔
UNCOV
300
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
301
        pTaskInfo->code = code;
×
302
        T_LONG_JMP(pTaskInfo->env, code);
×
303
      }
304
    }
305
  }
306

307
  return tsCols;
4,284,535✔
308
}
309

310
static int32_t extWinGetCurWinIdx(SExecTaskInfo* pTaskInfo) {
2,147,483,647✔
311
  if (!pTaskInfo->pStreamRuntimeInfo) {
2,147,483,647✔
312
    return 0;
2,147,483,647✔
313
  }
314
  return pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx;
52,142,771✔
315
}
316

317
static void extWinIncCurWinIdx(SOperatorInfo* pOperator) {
×
318
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
×
319
  pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx++;
×
320
}
×
321

322
static void extWinSetCurWinIdx(SOperatorInfo* pOperator, int32_t idx) {
2,147,483,647✔
323
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2,147,483,647✔
324
  if (pTaskInfo->pStreamRuntimeInfo) {
2,147,483,647✔
325
    pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx = idx;
38,164,162✔
326
  }
327
}
2,147,483,647✔
328

329

330
static void extWinIncCurWinOutIdx(SStreamRuntimeInfo* pStreamRuntimeInfo) {
490✔
331
  pStreamRuntimeInfo->funcInfo.curOutIdx++;
490✔
332
}
490✔
333

334

335
static const STimeWindow* extWinGetNextWin(SExternalWindowOperator* pExtW, SExecTaskInfo* pTaskInfo) {
×
336
  int32_t curIdx = extWinGetCurWinIdx(pTaskInfo);
×
337
  if (curIdx + 1 >= pExtW->pWins->size) return NULL;
×
338
  return taosArrayGet(pExtW->pWins, curIdx + 1);
×
339
}
340

341

342
static int32_t extWinAppendWinIdx(SExecTaskInfo*       pTaskInfo, SArray* pIdx, SSDataBlock* pBlock, int32_t currWinIdx, int32_t rows) {
2,147,483,647✔
343
  int32_t  code = 0, lino = 0;
2,147,483,647✔
344
  int64_t* lastRes = taosArrayGetLast(pIdx);
2,147,483,647✔
345
  int32_t* lastWinIdx = (int32_t*)lastRes;
2,147,483,647✔
346
  int32_t* lastRowIdx = lastWinIdx ? (lastWinIdx + 1) : NULL;
2,147,483,647✔
347
  int64_t  res = 0;
2,147,483,647✔
348
  int32_t* pWinIdx = (int32_t*)&res;
2,147,483,647✔
349
  int32_t* pRowIdx = pWinIdx + 1;
2,147,483,647✔
350

351
  if (lastWinIdx && *lastWinIdx == currWinIdx) {
2,147,483,647✔
352
    return code;
2,147,483,647✔
353
  }
354

355
  *pWinIdx = currWinIdx;
14,735,263✔
356
  *pRowIdx = pBlock->info.rows - rows;
14,734,845✔
357

358
  TSDB_CHECK_NULL(taosArrayPush(pIdx, &res), code, lino, _exit, terrno);
14,725,858✔
359

360
_exit:
14,725,858✔
361

362
  if (code) {
14,726,067✔
363
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
364
  }
365

366
  return code;
14,725,858✔
367
}
368

369

370
static int32_t mergeAlignExtWinSetOutputBuf(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, const STimeWindow* pWin, SResultRow** pResult,
2,515,387✔
371
                                       SExprSupp* pExprSup, SAggSupporter* pAggSup) {
372
  if (*pResult == NULL) {
2,515,387✔
373
    *pResult = getNewResultRow(pAggSup->pResultBuf, &pAggSup->currentPageId, pAggSup->resultRowSize);
2,550✔
374
    if (!*pResult) {
2,550✔
375
      qError("get new resultRow failed, err:%s", tstrerror(terrno));
×
376
      return terrno;
×
377
    }
378
    pResultRowInfo->cur = (SResultRowPosition){.pageId = (*pResult)->pageId, .offset = (*pResult)->offset};
2,550✔
379
  }
380
  
381
  (*pResult)->win = *pWin;
2,515,387✔
382
  (*pResult)->winIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
2,515,178✔
383
  
384
  return setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
2,514,969✔
385
}
386

387

388
static int32_t mergeAlignExtWinGetWinFromTs(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW, TSKEY ts, STimeWindow** ppWin) {
2,510,789✔
389
  int32_t blkWinIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
2,510,789✔
390
  
391
  // TODO handle desc order
392
  for (int32_t i = blkWinIdx; i < pExtW->pWins->size; ++i) {
5,008,787✔
393
    STimeWindow* pWin = taosArrayGet(pExtW->pWins, i);
5,004,816✔
394
    if (ts == pWin->skey) {
5,002,099✔
395
      extWinSetCurWinIdx(pOperator, i);
2,507,445✔
396
      *ppWin = pWin;
2,508,281✔
397
      return TSDB_CODE_SUCCESS;
2,508,699✔
398
    } else if (ts < pWin->skey) {
2,500,506✔
399
      qError("invalid ts %" PRId64 " for current window idx %d skey %" PRId64, ts, i, pWin->skey);
627✔
400
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
401
    }
402
  }
403
  
404
  qError("invalid ts %" PRId64 " to find merge aligned ext window, size:%d", ts, (int32_t)pExtW->pWins->size);
×
405
  return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
406
}
407

408
static int32_t mergeAlignExtWinFinalizeResult(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, SSDataBlock* pResultBlock) {
2,509,535✔
409
  int32_t        code = 0, lino = 0;
2,509,535✔
410
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
2,509,535✔
411
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
2,509,535✔
412
  SExprSupp*     pSup = &pOperator->exprSupp;
2,509,744✔
413
  SResultRow*  pResultRow = pMlExtInfo->pResultRow;
2,509,953✔
414
  
415
  finalizeResultRows(pExtW->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pOperator->pTaskInfo);
2,509,744✔
416
  
417
  if (pResultRow->numOfRows > 0) {
2,508,072✔
418
    TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pExtW->pWinRowIdx, pResultBlock, pResultRow->winIdx, pResultRow->numOfRows));
2,508,072✔
419
  }
420

421
_exit:
2,504,310✔
422

423
  if (code) {
2,504,310✔
424
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
425
  }
426

427
  return code;
2,504,310✔
428
}
429

430
static int32_t mergeAlignExtWinAggDo(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock, SSDataBlock* pResultBlock) {
9,447✔
431
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
9,447✔
432
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
9,447✔
433

434
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
9,447✔
435
  SExprSupp*     pSup = &pOperator->exprSupp;
9,447✔
436
  int32_t        code = 0, lino = 0;
9,447✔
437
  STimeWindow *pWin = NULL;
9,447✔
438

439
  int32_t startPos = 0;
9,447✔
440
  int64_t* tsCols = extWinExtractTsCol(pBlock, pExtW->primaryTsIndex, pTaskInfo);
9,447✔
441
  TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
9,447✔
442
  
443
  code = mergeAlignExtWinGetWinFromTs(pOperator, pExtW, ts, &pWin);
9,447✔
444
  if (code) {
9,447✔
445
    qError("failed to get time window for ts:%" PRId64 ", prim ts index:%d, error:%s", ts, pExtW->primaryTsIndex, tstrerror(code));
×
446
    TAOS_CHECK_EXIT(code);
×
447
  }
448

449
  if (pMlExtInfo->curTs != INT64_MIN && pMlExtInfo->curTs != pWin->skey) {
9,447✔
450
    TAOS_CHECK_EXIT(mergeAlignExtWinFinalizeResult(pOperator, pResultRowInfo, pResultBlock));
1,881✔
451
    resetResultRow(pMlExtInfo->pResultRow, pExtW->aggSup.resultRowSize - sizeof(SResultRow));
1,881✔
452
  }
453
  
454
  TAOS_CHECK_EXIT(mergeAlignExtWinSetOutputBuf(pOperator, pResultRowInfo, pWin, &pMlExtInfo->pResultRow, pSup, &pExtW->aggSup));
9,447✔
455

456
  int32_t currPos = startPos;
9,447✔
457
  pMlExtInfo->curTs = pWin->skey;
9,447✔
458
  
459
  while (++currPos < pBlock->info.rows) {
7,527,894✔
460
    if (tsCols[currPos] == pMlExtInfo->curTs) continue;
7,518,447✔
461

462
    qDebug("current ts:%" PRId64 ", startPos:%d, currPos:%d, tsCols[currPos]:%" PRId64,
2,506,149✔
463
      pMlExtInfo->curTs, startPos, currPos, tsCols[currPos]); 
464
    TAOS_CHECK_EXIT(applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pExtW->twAggSup.timeWindowData, startPos,
2,506,149✔
465
                                           currPos - startPos, pBlock->info.rows, pSup->numOfExprs));
466

467
    TAOS_CHECK_EXIT(mergeAlignExtWinFinalizeResult(pOperator, pResultRowInfo, pResultBlock));
2,504,895✔
468
    resetResultRow(pMlExtInfo->pResultRow, pExtW->aggSup.resultRowSize - sizeof(SResultRow));
2,499,670✔
469

470
    TAOS_CHECK_EXIT(mergeAlignExtWinGetWinFromTs(pOperator, pExtW, tsCols[currPos], &pWin));
2,500,297✔
471
    
472
    qDebug("ext window align2 start:%" PRId64 ", end:%" PRId64, pWin->skey, pWin->ekey);
2,499,043✔
473
    startPos = currPos;
2,505,940✔
474
    
475
    TAOS_CHECK_EXIT(mergeAlignExtWinSetOutputBuf(pOperator, pResultRowInfo, pWin, &pMlExtInfo->pResultRow, pSup, &pExtW->aggSup));
2,505,940✔
476

477
    pMlExtInfo->curTs = pWin->skey;
2,506,149✔
478
  }
479

480
  code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pExtW->twAggSup.timeWindowData, startPos,
18,894✔
481
                                         currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
9,447✔
482

483
_exit:
9,447✔
484

485
  if (code != 0) {
9,447✔
486
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
487
    T_LONG_JMP(pTaskInfo->env, code);
×
488
  }
489
  
490
  return code;
9,447✔
491
}
492

493
static int32_t mergeAlignExtWinBuildWinRowIdx(SOperatorInfo* pOperator, SSDataBlock* pInput, SSDataBlock* pResult) {
×
494
  SExternalWindowOperator* pExtW = pOperator->info;
×
495
  int64_t* tsCols = extWinExtractTsCol(pInput, pExtW->primaryTsIndex, pOperator->pTaskInfo);
×
496
  STimeWindow* pWin = NULL;
×
497
  int32_t code = 0, lino = 0;
×
498
  int64_t prevTs = INT64_MIN;
×
499
  
500
  for (int32_t i = 0; i < pInput->info.rows; ++i) {
×
501
    if (prevTs == tsCols[i]) {
×
502
      continue;
×
503
    }
504
    
505
    TAOS_CHECK_EXIT(mergeAlignExtWinGetWinFromTs(pOperator, pExtW, tsCols[i], &pWin));
×
506
    TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pExtW->pWinRowIdx, pResult, extWinGetCurWinIdx(pOperator->pTaskInfo), pInput->info.rows - i));
×
507

508
    prevTs = tsCols[i];
×
509
  }
510

511
_exit:
×
512

513
  if (code != 0) {
×
514
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
515
  }
516

517
  return code;  
×
518
}
519

520
static int32_t mergeAlignExtWinProjectDo(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
×
521
                                            SSDataBlock* pResultBlock) {
522
  SExternalWindowOperator* pExtW = pOperator->info;
×
523
  SExprSupp*               pExprSup = &pExtW->scalarSupp;
×
524
  int32_t                  code = 0, lino = 0;
×
525
  
526
  TAOS_CHECK_EXIT(projectApplyFunctions(pExprSup->pExprInfo, pResultBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
×
527
                        GET_STM_RTINFO(pOperator->pTaskInfo)));
528

529
  TAOS_CHECK_EXIT(mergeAlignExtWinBuildWinRowIdx(pOperator, pBlock, pResultBlock));
×
530

531
_exit:
×
532

533
  if (code != 0) {
×
534
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
535
  }
536

537
  return code;
×
538
}
539

540
void mergeAlignExtWinDo(SOperatorInfo* pOperator) {
3,599✔
541
  SExecTaskInfo*                       pTaskInfo = pOperator->pTaskInfo;
3,599✔
542
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
3,599✔
543
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
3,599✔
544
  SResultRow*                          pResultRow = NULL;
3,599✔
545
  int32_t                              code = 0;
3,599✔
546
  SSDataBlock*                         pRes = pExtW->binfo.pRes;
3,599✔
547
  SExprSupp*                           pSup = &pOperator->exprSupp;
3,599✔
548
  int32_t                              lino = 0;
3,599✔
549

550
  taosArrayClear(pExtW->pWinRowIdx);
3,599✔
551
  blockDataCleanup(pRes);
3,599✔
552

553
  while (1) {
8,820✔
554
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
12,419✔
555

556
    if (pBlock == NULL) {
12,419✔
557
      // close last time window
558
      if (pMlExtInfo->curTs != INT64_MIN && EEXT_MODE_AGG == pExtW->mode) {
2,972✔
559
        TAOS_CHECK_EXIT(mergeAlignExtWinFinalizeResult(pOperator, &pExtW->binfo.resultRowInfo, pRes));
2,550✔
560
      }
561
      setOperatorCompleted(pOperator);
2,972✔
562
      break;
2,972✔
563
    }
564

565
    pRes->info.scanFlag = pBlock->info.scanFlag;
9,447✔
566
    code = setInputDataBlock(pSup, pBlock, pExtW->binfo.inputTsOrder, pBlock->info.scanFlag, true);
9,447✔
567
    QUERY_CHECK_CODE(code, lino, _exit);
9,447✔
568

569
    printDataBlock(pBlock, __func__, "externalwindowAlign", pTaskInfo->id.queryId);
9,447✔
570
    qDebug("ext windowpExtWAlign->scalarMode:%d", pExtW->mode);
9,447✔
571

572
    if (EEXT_MODE_SCALAR == pExtW->mode) {
9,447✔
573
      TAOS_CHECK_EXIT(mergeAlignExtWinProjectDo(pOperator, &pExtW->binfo.resultRowInfo, pBlock, pRes));
×
574
    } else {
575
      TAOS_CHECK_EXIT(mergeAlignExtWinAggDo(pOperator, &pExtW->binfo.resultRowInfo, pBlock, pRes));
9,447✔
576
    }
577

578
    if (pRes->info.rows >= pOperator->resultInfo.threshold) {
9,447✔
579
      break;
627✔
580
    }
581
  }
582

583
  pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamBlkWinIdx = pExtW->pWinRowIdx;
3,599✔
584
  
585
_exit:
3,599✔
586

587
  if (code != 0) {
3,599✔
588
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
589
    pTaskInfo->code = code;
×
590
    T_LONG_JMP(pTaskInfo->env, code);
×
591
  }
592
}
3,599✔
593

594
static int32_t mergeAlignExtWinNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
6,149✔
595
  SExecTaskInfo*                       pTaskInfo = pOperator->pTaskInfo;
6,149✔
596
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
6,149✔
597
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
6,149✔
598
  int32_t                              code = 0;
6,149✔
599
  int32_t lino = 0;
6,149✔
600

601
  if (pOperator->status == OP_EXEC_DONE) {
6,149✔
602
    (*ppRes) = NULL;
2,550✔
603
    return TSDB_CODE_SUCCESS;
2,550✔
604
  }
605

606
  SSDataBlock* pRes = pExtW->binfo.pRes;
3,599✔
607
  blockDataCleanup(pRes);
3,599✔
608

609
  if (taosArrayGetSize(pExtW->pWins) <= 0) {
3,599✔
610
    size_t size = taosArrayGetSize(pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPesudoFuncVals);
2,972✔
611
    STimeWindow* pWin = taosArrayReserve(pExtW->pWins, size);
2,972✔
612
    TSDB_CHECK_NULL(pWin, code, lino, _exit, terrno);
2,972✔
613

614
    for (int32_t i = 0; i < size; ++i) {
2,513,974✔
615
      SSTriggerCalcParam* pParam = taosArrayGet(pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPesudoFuncVals, i);
2,511,002✔
616
      pWin[i].skey = pParam->wstart;
2,511,002✔
617
      pWin[i].ekey = pParam->wstart + 1;
2,511,002✔
618
    }
619
    
620
    pExtW->outputWinId = pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx;
2,972✔
621
  }
622

623
  mergeAlignExtWinDo(pOperator);
3,599✔
624
  
625
  size_t rows = pRes->info.rows;
3,599✔
626
  pOperator->resultInfo.totalRows += rows;
3,599✔
627
  (*ppRes) = (rows == 0) ? NULL : pRes;
3,599✔
628

629
_exit:
3,599✔
630

631
  if (code != 0) {
3,599✔
632
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
633
    pTaskInfo->code = code;
×
634
    T_LONG_JMP(pTaskInfo->env, code);
×
635
  }
636
  return code;
3,599✔
637
}
638

639
int32_t resetMergeAlignedExtWinOperator(SOperatorInfo* pOperator) {
3,816✔
640
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
3,816✔
641
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
3,816✔
642
  SExecTaskInfo*                       pTaskInfo = pOperator->pTaskInfo;
3,816✔
643
  SMergeAlignedIntervalPhysiNode * pPhynode = (SMergeAlignedIntervalPhysiNode*)pOperator->pPhyNode;
3,816✔
644
  pOperator->status = OP_NOT_OPENED;
3,816✔
645

646
  taosArrayClear(pExtW->pWins);
3,816✔
647

648
  resetBasicOperatorState(&pExtW->binfo);
3,816✔
649
  pMlExtInfo->pResultRow = NULL;
3,816✔
650
  pMlExtInfo->curTs = INT64_MIN;
3,816✔
651

652
  int32_t code = resetAggSup(&pOperator->exprSupp, &pExtW->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
7,632✔
653
                             sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
3,816✔
654
                             &pTaskInfo->storageAPI.functionStore);
655
  if (code == 0) {
3,816✔
656
    colDataDestroy(&pExtW->twAggSup.timeWindowData);
3,816✔
657
    code = initExecTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pTaskInfo->window);
3,816✔
658
  }
659
  return code;
3,816✔
660
}
661

662
int32_t createMergeAlignedExternalWindowOperator(SOperatorInfo* pDownstream, SPhysiNode* pNode,
2,128✔
663
                                                 SExecTaskInfo* pTaskInfo, SOperatorInfo** ppOptrOut) {
664
  SMergeAlignedIntervalPhysiNode* pPhynode = (SMergeAlignedIntervalPhysiNode*)pNode;
2,128✔
665
  int32_t code = 0;
2,128✔
666
  int32_t lino = 0;
2,128✔
667
  SMergeAlignedExternalWindowOperator* pMlExtInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedExternalWindowOperator));
2,128✔
668
  SOperatorInfo*                       pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2,128✔
669

670
  if (pTaskInfo->pStreamRuntimeInfo != NULL){
2,128✔
671
    pTaskInfo->pStreamRuntimeInfo->funcInfo.withExternalWindow = true;
2,128✔
672
  }
673
  pOperator->pPhyNode = pNode;
2,128✔
674
  if (!pMlExtInfo || !pOperator) {
2,128✔
675
    code = terrno;
×
676
    goto _error;
×
677
  }
678

679
  pMlExtInfo->pExtW = taosMemoryCalloc(1, sizeof(SExternalWindowOperator));
2,128✔
680
  if (!pMlExtInfo->pExtW) {
2,128✔
681
    code = terrno;
×
682
    goto _error;
×
683
  }
684

685
  SExternalWindowOperator* pExtW = pMlExtInfo->pExtW;
2,128✔
686
  SExprSupp* pSup = &pOperator->exprSupp;
2,128✔
687
  pSup->hasWindowOrGroup = true;
2,128✔
688
  pSup->hasWindow = true;
2,128✔
689
  pMlExtInfo->curTs = INT64_MIN;
2,128✔
690

691
  pExtW->primaryTsIndex = ((SColumnNode*)pPhynode->window.pTspk)->slotId;
2,128✔
692
  pExtW->mode = pPhynode->window.pProjs ? EEXT_MODE_SCALAR : EEXT_MODE_AGG;
2,128✔
693
  pExtW->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC;
2,128✔
694
  pExtW->binfo.outputTsOrder = pExtW->binfo.inputTsOrder;
2,128✔
695

696
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
2,128✔
697
  initResultSizeInfo(&pOperator->resultInfo, 4096);
2,128✔
698

699
  int32_t num = 0;
2,128✔
700
  SExprInfo* pExprInfo = NULL;
2,128✔
701
  code = createExprInfo(pPhynode->window.pFuncs, NULL, &pExprInfo, &num);
2,128✔
702
  QUERY_CHECK_CODE(code, lino, _error);
2,128✔
703

704
  if (pExtW->mode == EEXT_MODE_AGG) {
2,128✔
705
    code = initAggSup(pSup, &pExtW->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
2,128✔
706
                      &pTaskInfo->storageAPI.functionStore);
707
    QUERY_CHECK_CODE(code, lino, _error);
2,128✔
708
  }
709

710
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhynode->window.node.pOutputDataBlockDesc);
2,128✔
711
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
2,128✔
712
  initBasicInfo(&pExtW->binfo, pResBlock);
2,128✔
713

714
  pExtW->pWins = taosArrayInit(4096, sizeof(STimeWindow));
2,128✔
715
  if (!pExtW->pWins) QUERY_CHECK_CODE(terrno, lino, _error);
2,128✔
716

717
  pExtW->pWinRowIdx = taosArrayInit(4096, sizeof(int64_t));
2,128✔
718
  TSDB_CHECK_NULL(pExtW->pWinRowIdx, code, lino, _error, terrno);
2,128✔
719

720
  initResultRowInfo(&pExtW->binfo.resultRowInfo);
2,128✔
721
  code = blockDataEnsureCapacity(pExtW->binfo.pRes, pOperator->resultInfo.capacity);
2,128✔
722
  QUERY_CHECK_CODE(code, lino, _error);
2,128✔
723
  setOperatorInfo(pOperator, "MergeAlignedExternalWindowOperator", QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW, false, OP_NOT_OPENED, pMlExtInfo, pTaskInfo);
2,128✔
724
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignExtWinNext, NULL,
2,128✔
725
                                         destroyMergeAlignedExternalWindowOperator, optrDefaultBufFn, NULL,
726
                                         optrDefaultGetNextExtFn, NULL);
727
  setOperatorResetStateFn(pOperator, resetMergeAlignedExtWinOperator);
2,128✔
728

729
  code = appendDownstream(pOperator, &pDownstream, 1);
2,128✔
730
  QUERY_CHECK_CODE(code, lino, _error);
2,128✔
731
  *ppOptrOut = pOperator;
2,128✔
732
  return code;
2,128✔
733
  
734
_error:
×
735
  if (pMlExtInfo) destroyMergeAlignedExternalWindowOperator(pMlExtInfo);
×
736
  destroyOperatorAndDownstreams(pOperator, &pDownstream, 1);
×
737
  pTaskInfo->code = code;
×
738
  return code;
×
739
}
740

741
static int32_t resetExternalWindowExprSupp(SExternalWindowOperator* pExtW, SExecTaskInfo* pTaskInfo,
241,023✔
742
                                           SExternalWindowPhysiNode* pPhynode) {
743
  int32_t    code = 0, lino = 0, num = 0;
241,023✔
744
  SExprInfo* pExprInfo = NULL;
241,023✔
745
  cleanupExprSuppWithoutFilter(&pExtW->scalarSupp);
241,023✔
746

747
  SNodeList* pNodeList = NULL;
241,023✔
748
  if (pPhynode->window.pProjs) {
241,023✔
749
    pNodeList = pPhynode->window.pProjs;
×
750
  } else {
751
    pNodeList = pPhynode->window.pExprs;
241,023✔
752
  }
753

754
  code = createExprInfo(pNodeList, NULL, &pExprInfo, &num);
241,023✔
755
  QUERY_CHECK_CODE(code, lino, _error);
241,023✔
756
  code = initExprSupp(&pExtW->scalarSupp, pExprInfo, num, &pTaskInfo->storageAPI.functionStore);
241,023✔
757
  QUERY_CHECK_CODE(code, lino, _error);
241,023✔
758
  return code;
241,023✔
759
_error:
×
760
  if (code != TSDB_CODE_SUCCESS) {
×
761
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
762
    pTaskInfo->code = code;
×
763
  }
764
  return code;
×
765
}
766

767
static int32_t resetExternalWindowOperator(SOperatorInfo* pOperator) {
241,023✔
768
  int32_t code = 0, lino = 0;
241,023✔
769
  SExternalWindowOperator* pExtW = pOperator->info;
241,023✔
770
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
241,023✔
771
  SExternalWindowPhysiNode* pPhynode = (SExternalWindowPhysiNode*)pOperator->pPhyNode;
241,023✔
772
  pOperator->status = OP_NOT_OPENED;
241,023✔
773

774
  //resetBasicOperatorState(&pExtW->binfo);
775
  initResultRowInfo(&pExtW->binfo.resultRowInfo);
241,023✔
776

777
  pExtW->outputWinId = 0;
241,023✔
778
  pExtW->lastWinId = -1;
241,023✔
779
  pExtW->outputWinNum = 0;
241,023✔
780
  taosArrayClear(pExtW->pWins);
241,023✔
781
  extWinRecycleBlkNode(pExtW, &pExtW->pLastBlkNode);
241,023✔
782

783
/*
784
  int32_t code = blockDataEnsureCapacity(pExtW->binfo.pRes, pOperator->resultInfo.capacity);
785
  if (code == 0) {
786
    code = resetAggSup(&pOperator->exprSupp, &pExtW->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
787
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
788
                       &pTaskInfo->storageAPI.functionStore);
789
  }
790
*/
791
  TAOS_CHECK_EXIT(resetExternalWindowExprSupp(pExtW, pTaskInfo, pPhynode));
241,023✔
792
  colDataDestroy(&pExtW->twAggSup.timeWindowData);
241,023✔
793
  TAOS_CHECK_EXIT(initExecTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pTaskInfo->window));
241,023✔
794

795
  pExtW->outWinIdx = 0;
241,023✔
796
  pExtW->lastSKey = INT64_MIN;
241,023✔
797
  pExtW->isDynWindow = false;
241,023✔
798

799
  qDebug("%s ext window stat at reset, created:%" PRId64 ", destroyed:%" PRId64 ", recycled:%" PRId64 ", reused:%" PRId64 ", append:%" PRId64, 
241,023✔
800
      pTaskInfo->id.str, pExtW->stat.resBlockCreated, pExtW->stat.resBlockDestroyed, pExtW->stat.resBlockRecycled, 
801
      pExtW->stat.resBlockReused, pExtW->stat.resBlockAppend);
802

803
_exit:
7,482✔
804

805
  if (code) {
241,023✔
806
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
807
  }
808
  
809
  return code;
241,023✔
810
}
811

812
static EDealRes extWinHasCountLikeFunc(SNode* pNode, void* res) {
3,799,884✔
813
  if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
3,799,884✔
814
    SFunctionNode* pFunc = (SFunctionNode*)pNode;
1,401,931✔
815
    if (fmIsCountLikeFunc(pFunc->funcId) || (pFunc->hasOriginalFunc && fmIsCountLikeFunc(pFunc->originalFuncId))) {
1,401,931✔
816
      *(bool*)res = true;
435,611✔
817
      return DEAL_RES_END;
435,611✔
818
    }
819
  }
820
  return DEAL_RES_CONTINUE;
3,367,042✔
821
}
822

823

824
static int32_t extWinCreateEmptyInputBlock(SOperatorInfo* pOperator, SSDataBlock** ppBlock) {
435,611✔
825
  int32_t code = TSDB_CODE_SUCCESS;
435,611✔
826
  int32_t lino = 0;
435,611✔
827
  SSDataBlock* pBlock = NULL;
435,611✔
828
  if (!tsCountAlwaysReturnValue) {
435,611✔
829
    return TSDB_CODE_SUCCESS;
×
830
  }
831

832
  SExternalWindowOperator* pExtW = pOperator->info;
435,611✔
833

834
  if (!pExtW->hasCountFunc) {
435,611✔
835
    return TSDB_CODE_SUCCESS;
×
836
  }
837

838
  code = createDataBlock(&pBlock);
435,611✔
839
  if (code) {
435,611✔
840
    return code;
×
841
  }
842

843
  pBlock->info.rows = 1;
435,611✔
844
  pBlock->info.capacity = 0;
435,611✔
845

846
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
1,847,764✔
847
    SColumnInfoData colInfo = {0};
1,412,153✔
848
    colInfo.hasNull = true;
1,412,153✔
849
    colInfo.info.type = TSDB_DATA_TYPE_NULL;
1,412,153✔
850
    colInfo.info.bytes = 1;
1,412,153✔
851

852
    SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
1,412,153✔
853
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
2,837,527✔
854
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
1,425,374✔
855
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
1,425,374✔
856
        int32_t slotId = pFuncParam->pCol->slotId;
1,336,232✔
857
        int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
1,336,232✔
858
        if (slotId >= numOfCols) {
1,336,232✔
859
          code = taosArrayEnsureCap(pBlock->pDataBlock, slotId + 1);
1,038,615✔
860
          QUERY_CHECK_CODE(code, lino, _end);
1,038,615✔
861

862
          for (int32_t k = numOfCols; k < slotId + 1; ++k) {
2,453,381✔
863
            void* tmp = taosArrayPush(pBlock->pDataBlock, &colInfo);
1,414,766✔
864
            QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1,414,766✔
865
          }
866
        }
867
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
89,142✔
868
        // do nothing
869
      }
870
    }
871
  }
872

873
  code = blockDataEnsureCapacity(pBlock, pBlock->info.rows);
435,611✔
874
  QUERY_CHECK_CODE(code, lino, _end);
435,611✔
875

876
  for (int32_t i = 0; i < blockDataGetNumOfCols(pBlock); ++i) {
1,850,377✔
877
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
1,414,766✔
878
    QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
1,414,766✔
879
    colDataSetNULL(pColInfoData, 0);
880
  }
881
  *ppBlock = pBlock;
435,611✔
882

883
_end:
435,611✔
884
  if (code != TSDB_CODE_SUCCESS) {
435,611✔
885
    blockDataDestroy(pBlock);
×
886
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
887
  }
888
  return code;
435,611✔
889
}
890

891

892

893
static int extWinTsWinCompare(const void* pLeft, const void* pRight) {
8,696,393✔
894
  int64_t ts = *(int64_t*)pLeft;
8,696,393✔
895
  SExtWinTimeWindow* pWin = (SExtWinTimeWindow*)pRight;
8,696,601✔
896
  if (ts < pWin->tw.skey) {
8,696,601✔
897
    return -1;
4,699,303✔
898
  }
899
  if (ts >= pWin->tw.ekey) {
3,998,154✔
900
    return 1;
1,351,622✔
901
  }
902

903
  return 0;
2,646,318✔
904
}
905

906

907
static int32_t extWinGetMultiTbWinFromTs(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW, int64_t* tsCol, int64_t rowNum, int32_t* startPos) {
1,802,451✔
908
  int32_t idx = taosArraySearchIdx(pExtW->pWins, tsCol, extWinTsWinCompare, TD_EQ);
1,802,451✔
909
  if (idx >= 0) {
1,802,451✔
910
    *startPos = 0;
1,757,696✔
911
    return idx;
1,757,696✔
912
  }
913

914
  SExtWinTimeWindow* pWin = NULL;
44,755✔
915
  int32_t w = 0;
44,755✔
916
  for (int64_t i = 1; i < rowNum; ++i) {
45,245✔
917
    for (; w < pExtW->pWins->size; ++w) {
114,313,640✔
918
      pWin = TARRAY_GET_ELEM(pExtW->pWins, w);
114,313,640✔
919
      if (tsCol[i] < pWin->tw.skey) {
114,313,640✔
920
        break;
490✔
921
      }
922
      
923
      if (tsCol[i] < pWin->tw.ekey) {
114,313,150✔
924
        *startPos = i;
44,265✔
925
        return w;
44,265✔
926
      }
927
    }
928
  }
929

930
  return -1;
490✔
931
}
932

933
static int32_t extWinGetNoOvlpWin(SOperatorInfo* pOperator, int64_t* tsCol, int32_t* startPos, SDataBlockInfo* pInfo, SExtWinTimeWindow** ppWin, int32_t* winRows) {
2,147,483,647✔
934
  SExternalWindowOperator* pExtW = pOperator->info;
2,147,483,647✔
935
  if ((*startPos) >= pInfo->rows) {
2,147,483,647✔
936
    qDebug("%s %s blk rowIdx %d reach the end, size: %d, skip block", 
1,287,056✔
937
        GET_TASKID(pOperator->pTaskInfo), __func__, *startPos, (int32_t)pInfo->rows);
938
    *ppWin = NULL;
1,287,056✔
939
    return TSDB_CODE_SUCCESS;
1,287,056✔
940
  }
941
  
942
  if (pExtW->blkWinIdx < 0) {
2,147,483,647✔
943
    pExtW->blkWinIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
1,510,920✔
944
  } else {
945
    pExtW->blkWinIdx++;
2,147,483,647✔
946
  }
947

948
  if (pExtW->blkWinIdx >= pExtW->pWins->size) {
2,147,483,647✔
949
    qDebug("%s %s ext win blk idx %d reach the end, size: %d, skip block", 
141,174✔
950
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (int32_t)pExtW->pWins->size);
951
    *ppWin = NULL;
141,174✔
952
    return TSDB_CODE_SUCCESS;
141,174✔
953
  }
954
  
955
  SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
2,147,483,647✔
956
  if (tsCol[pInfo->rows - 1] < pWin->tw.skey) {
2,147,483,647✔
957
    qDebug("%s %s block end ts %" PRId64 " is small than curr win %d skey %" PRId64 ", skip block", 
81,972✔
958
        GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[pInfo->rows - 1], pExtW->blkWinIdx, pWin->tw.skey);
959
    *ppWin = NULL;
81,972✔
960
    return TSDB_CODE_SUCCESS;
81,972✔
961
  }
962

963
  int32_t r = *startPos;
2,147,483,647✔
964

965
  qDebug("%s %s start to get novlp win from winIdx %d rowIdx %d", GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, r);
2,147,483,647✔
966

967
  // TODO handle desc order
968
  for (; pExtW->blkWinIdx < pExtW->pWins->size; ++pExtW->blkWinIdx) {
2,147,483,647✔
969
    pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
2,147,483,647✔
970
    for (; r < pInfo->rows; ++r) {
2,147,483,647✔
971
      if (tsCol[r] < pWin->tw.skey) {
2,147,483,647✔
972
        continue;
312,474,406✔
973
      }
974

975
      if (tsCol[r] < pWin->tw.ekey) {
2,147,483,647✔
976
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
1,891,528,179✔
977
        *ppWin = pWin;
1,891,528,179✔
978
        *startPos = r;
1,891,528,179✔
979
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
1,891,528,179✔
980

981
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
1,891,528,179✔
982
            GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, r, tsCol[r], tsCol[r + *winRows - 1]);
983
        
984
        return TSDB_CODE_SUCCESS;
1,891,528,179✔
985
      }
986

987
      if (!pOperator->pTaskInfo->pStreamRuntimeInfo && tsCol[r] >= pWin->tw.ekey) {
1,799,414,266✔
988
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
1,799,408,358✔
989
        *ppWin = pWin;
1,799,408,358✔
990
        *startPos = r;
1,799,408,358✔
991
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
1,799,408,358✔
992

993
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk",
1,799,408,358✔
994
               GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, r, tsCol[r], tsCol[r + *winRows - 1]);
995

996
        return TSDB_CODE_SUCCESS;
1,799,408,358✔
997
      }
998

999
      break;
5,908✔
1000
    }
1001

1002
    if (r == pInfo->rows) {
6,626✔
1003
      break;
718✔
1004
    }
1005
  }
1006

1007
  qDebug("%s %s no more ext win in block, TR[%" PRId64 ", %" PRId64 "), skip it", 
718✔
1008
      GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1009

1010
  *ppWin = NULL;
718✔
1011
  return TSDB_CODE_SUCCESS;
718✔
1012
}
1013

1014
static int32_t extWinGetOvlpWin(SOperatorInfo* pOperator, int64_t* tsCol, int32_t* startPos, SDataBlockInfo* pInfo, SExtWinTimeWindow** ppWin, int32_t* winRows) {
190,554✔
1015
  SExternalWindowOperator* pExtW = pOperator->info;
190,554✔
1016
  if (pExtW->blkWinIdx < 0) {
190,554✔
1017
    pExtW->blkWinIdx = pExtW->blkWinStartIdx;
10,194✔
1018
  } else {
1019
    pExtW->blkWinIdx++;
180,360✔
1020
  }
1021

1022
  if (pExtW->blkWinIdx >= pExtW->pWins->size) {
190,554✔
1023
    qDebug("%s %s ext win blk idx %d reach the end, size: %d, skip block", 
9,476✔
1024
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (int32_t)pExtW->pWins->size);
1025
    *ppWin = NULL;
9,476✔
1026
    return TSDB_CODE_SUCCESS;
9,476✔
1027
  }
1028
  
1029
  SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
181,078✔
1030
  if (tsCol[pInfo->rows - 1] < pWin->tw.skey) {
181,078✔
1031
    qDebug("%s %s block end ts %" PRId64 " is small than curr win %d skey %" PRId64 ", skip block", 
×
1032
        GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[pInfo->rows - 1], pExtW->blkWinIdx, pWin->tw.skey);
1033
    *ppWin = NULL;
×
1034
    return TSDB_CODE_SUCCESS;
×
1035
  }
1036

1037
  int64_t r = 0;
181,078✔
1038

1039
  qDebug("%s %s start to get ovlp win from winIdx %d rowIdx %d", GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pExtW->blkRowStartIdx);
181,078✔
1040
  
1041
  // TODO handle desc order
1042
  for (; pExtW->blkWinIdx < pExtW->pWins->size; ++pExtW->blkWinIdx) {
181,796✔
1043
    pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
181,796✔
1044
    for (r = pExtW->blkRowStartIdx; r < pInfo->rows; ++r) {
315,344✔
1045
      if (tsCol[r] < pWin->tw.skey) {
314,626✔
1046
        pExtW->blkRowStartIdx = r + 1;
133,548✔
1047
        continue;
133,548✔
1048
      }
1049

1050
      if (tsCol[r] < pWin->tw.ekey) {
181,078✔
1051
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
180,360✔
1052
        *ppWin = pWin;
180,360✔
1053
        *startPos = r;
180,360✔
1054
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
180,360✔
1055

1056
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
180,360✔
1057
            GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, (int32_t)r, tsCol[r], tsCol[r + *winRows - 1]);
1058
        
1059
        if ((r + *winRows) < pInfo->rows) {
180,360✔
1060
          pExtW->blkWinStartIdx = pExtW->blkWinIdx + 1;
170,884✔
1061
          pExtW->blkWinStartSet = true;
170,884✔
1062
        }
1063
        
1064
        return TSDB_CODE_SUCCESS;
180,360✔
1065
      }
1066

1067
      break;
718✔
1068
    }
1069

1070
    if (r >= pInfo->rows) {
1,436✔
1071
      if (!pExtW->blkWinStartSet) {
718✔
1072
        pExtW->blkWinStartIdx = pExtW->blkWinIdx;
718✔
1073
      }
1074
      
1075
      break;
718✔
1076
    }
1077
  }
1078

1079
  qDebug("%s %s no more ext win in block, TR[%" PRId64 ", %" PRId64 "), skip it", 
718✔
1080
      GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1081

1082
  *ppWin = NULL;
718✔
1083
  return TSDB_CODE_SUCCESS;
718✔
1084
}
1085

1086

1087
static int32_t extWinGetMultiTbNoOvlpWin(SOperatorInfo* pOperator, int64_t* tsCol, int32_t* startPos, SDataBlockInfo* pInfo, SExtWinTimeWindow** ppWin, int32_t* winRows) {
738,116,412✔
1088
  SExternalWindowOperator* pExtW = pOperator->info;
738,116,412✔
1089
  if ((*startPos) >= pInfo->rows) {
738,118,746✔
1090
    qDebug("%s %s blk rowIdx %d reach the end, size: %d, skip block", 
1,757,696✔
1091
        GET_TASKID(pOperator->pTaskInfo), __func__, *startPos, (int32_t)pInfo->rows);
1092
    *ppWin = NULL;
1,757,482✔
1093
    return TSDB_CODE_SUCCESS;
1,757,482✔
1094
  }
1095
  
1096
  if (pExtW->blkWinIdx < 0) {
736,363,200✔
1097
    pExtW->blkWinIdx = extWinGetMultiTbWinFromTs(pOperator, pExtW, tsCol, pInfo->rows, startPos);
1,802,451✔
1098
    if (pExtW->blkWinIdx < 0) {
1,802,451✔
1099
      qDebug("%s %s blk TR[%" PRId64 ", %" PRId64 ") not in any win, skip block", 
490✔
1100
          GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1101
      *ppWin = NULL;
490✔
1102
      return TSDB_CODE_SUCCESS;
490✔
1103
    }
1104

1105
    extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
1,801,533✔
1106
    *ppWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
1,801,747✔
1107
    *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, *startPos, (*ppWin)->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
1,801,747✔
1108

1109
    qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
1,801,961✔
1110
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (*ppWin)->tw.skey, (*ppWin)->tw.ekey, *winRows, *startPos, tsCol[*startPos], tsCol[*startPos + *winRows - 1]);
1111
    
1112
    return TSDB_CODE_SUCCESS;
1,801,961✔
1113
  } else {
1114
    pExtW->blkWinIdx++;
734,562,456✔
1115
  }
1116

1117
  if (pExtW->blkWinIdx >= pExtW->pWins->size) {
734,561,819✔
1118
    qDebug("%s %s ext win blk idx %d reach the end, size: %d, skip block", 
2,270✔
1119
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (int32_t)pExtW->pWins->size);
1120
    *ppWin = NULL;
2,270✔
1121
    return TSDB_CODE_SUCCESS;
2,270✔
1122
  }
1123
  
1124
  SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
734,559,340✔
1125
  if (tsCol[pInfo->rows - 1] < pWin->tw.skey) {
734,558,703✔
1126
    qDebug("%s %s block end ts %" PRId64 " is small than curr win %d skey %" PRId64 ", skip block", 
41,995✔
1127
        GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[pInfo->rows - 1], pExtW->blkWinIdx, pWin->tw.skey);
1128
    *ppWin = NULL;
41,995✔
1129
    return TSDB_CODE_SUCCESS;
41,995✔
1130
  }
1131

1132
  int32_t r = *startPos;
734,516,499✔
1133

1134
  qDebug("%s %s start to get mnovlp win from winIdx %d rowIdx %d", GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, r);
734,516,713✔
1135

1136
  // TODO handle desc order
1137
  for (; pExtW->blkWinIdx < pExtW->pWins->size; ++pExtW->blkWinIdx) {
734,531,119✔
1138
    pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
734,530,263✔
1139
    for (; r < pInfo->rows; ++r) {
896,182,655✔
1140
      if (tsCol[r] < pWin->tw.skey) {
896,182,655✔
1141
        continue;
161,652,596✔
1142
      }
1143

1144
      if (tsCol[r] < pWin->tw.ekey) {
734,529,850✔
1145
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
648,241,793✔
1146
        *ppWin = pWin;
648,240,534✔
1147
        *startPos = r;
648,240,534✔
1148
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
648,241,161✔
1149

1150
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
648,240,504✔
1151
            GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, r, tsCol[r], tsCol[r + *winRows - 1]);
1152
        
1153
        return TSDB_CODE_SUCCESS;
648,242,425✔
1154
      }
1155

1156
      if (!pOperator->pTaskInfo->pStreamRuntimeInfo && tsCol[r] >= pWin->tw.ekey) {
86,288,689✔
1157
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
86,277,025✔
1158
        *ppWin = pWin;
86,277,025✔
1159
        *startPos = r;
86,277,025✔
1160
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
86,277,025✔
1161

1162
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk",
86,277,025✔
1163
               GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, r, tsCol[r], tsCol[r + *winRows - 1]);
1164

1165
        return TSDB_CODE_SUCCESS;
86,277,025✔
1166
      }
1167

1168
      break;
11,664✔
1169
    }
1170

1171
    if (r == pInfo->rows) {
11,664✔
1172
      break;
×
1173
    }
1174
  }
1175

1176
  qDebug("%s %s no more ext win in block, TR[%" PRId64 ", %" PRId64 "), skip it", 
428✔
1177
      GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1178

1179
  *ppWin = NULL;
428✔
1180
  return TSDB_CODE_SUCCESS;
×
1181
}
1182

1183
static int32_t extWinGetFirstWinFromTs(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW, int64_t* tsCol,
951,728✔
1184
                                       int64_t rowNum, int32_t* startPos) {
1185
  SExtWinTimeWindow* pWin = NULL;
951,728✔
1186
  int32_t            idx = taosArraySearchIdx(pExtW->pWins, tsCol, extWinTsWinCompare, TD_EQ);
951,728✔
1187
  if (idx >= 0) {
951,936✔
1188
    for (int i = idx - 1; i >= 0; --i) {
888,836✔
1189
      pWin = TARRAY_GET_ELEM(pExtW->pWins, i);
×
1190
      if (extWinTsWinCompare(tsCol, pWin) == 0) {
×
1191
        idx = i;
×
1192
      } else {
1193
        break;
×
1194
      }
1195
    }
1196
    *startPos = 0;
888,836✔
1197
    return idx;
888,628✔
1198
  }
1199

1200
  pWin = NULL;
63,100✔
1201
  int32_t w = 0;
63,100✔
1202
  for (int64_t i = 1; i < rowNum; ++i) {
126,622✔
1203
    for (; w < pExtW->pWins->size; ++w) {
147,722✔
1204
      pWin = TARRAY_GET_ELEM(pExtW->pWins, w);
147,722✔
1205
      if (tsCol[i] < pWin->tw.skey) {
147,722✔
1206
        break;
63,522✔
1207
      }
1208

1209
      if (tsCol[i] < pWin->tw.ekey) {
84,200✔
1210
        *startPos = i;
21,100✔
1211
        return w;
21,100✔
1212
      }
1213
    }
1214
  }
1215

1216
  return -1;
42,000✔
1217
}
1218

1219
static int32_t extWinGetMultiTbOvlpWin(SOperatorInfo* pOperator, int64_t* tsCol, int32_t* startPos, SDataBlockInfo* pInfo, SExtWinTimeWindow** ppWin, int32_t* winRows) {
5,615,727✔
1220
  SExternalWindowOperator* pExtW = pOperator->info;
5,615,727✔
1221
  if (pExtW->blkWinIdx < 0) {
5,616,980✔
1222
    pExtW->blkWinIdx = extWinGetFirstWinFromTs(pOperator, pExtW, tsCol, pInfo->rows, startPos);
951,936✔
1223
    if (pExtW->blkWinIdx < 0) {
951,936✔
1224
      qDebug("%s %s blk TR[%" PRId64 ", %" PRId64 ") not in any win, skip block", 
42,000✔
1225
          GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1226
      *ppWin = NULL;
42,000✔
1227
      return TSDB_CODE_SUCCESS;
42,000✔
1228
    }
1229

1230
    extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
909,728✔
1231
    *ppWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
909,936✔
1232
    *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, *startPos, (*ppWin)->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
909,936✔
1233
    
1234
    qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
909,728✔
1235
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (*ppWin)->tw.skey, (*ppWin)->tw.ekey, *winRows, *startPos, tsCol[*startPos], tsCol[*startPos + *winRows - 1]);
1236
    
1237
    return TSDB_CODE_SUCCESS;
909,936✔
1238
  } else {
1239
    pExtW->blkWinIdx++;
4,665,044✔
1240
  }
1241

1242
  if (pExtW->blkWinIdx >= pExtW->pWins->size) {
4,665,046✔
1243
    qDebug("%s %s ext win blk idx %d reach the end, size: %d, skip block", 
909,936✔
1244
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (int32_t)pExtW->pWins->size);
1245
    *ppWin = NULL;
909,936✔
1246
    return TSDB_CODE_SUCCESS;
909,936✔
1247
  }
1248
  
1249
  SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
3,755,319✔
1250
  if (tsCol[pInfo->rows - 1] < pWin->tw.skey) {
3,754,905✔
1251
    qDebug("%s %s block end ts %" PRId64 " is small than curr win %d skey %" PRId64 ", skip block", 
×
1252
        GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[pInfo->rows - 1], pExtW->blkWinIdx, pWin->tw.skey);
1253
    *ppWin = NULL;
×
1254
    return TSDB_CODE_SUCCESS;
×
1255
  }
1256

1257
  int64_t r = 0;
3,755,110✔
1258

1259
  qDebug("%s %s start to get movlp win from winIdx %d rowIdx %d", GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pExtW->blkRowStartIdx);
3,755,110✔
1260

1261
  // TODO handle desc order
1262
  for (; pExtW->blkWinIdx < pExtW->pWins->size; ++pExtW->blkWinIdx) {
3,817,028✔
1263
    pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
3,816,819✔
1264
    for (r = pExtW->blkRowStartIdx; r < pInfo->rows; ++r) {
12,845,563✔
1265
      if (tsCol[r] < pWin->tw.skey) {
12,844,518✔
1266
        pExtW->blkRowStartIdx = r + 1;
9,028,744✔
1267
        continue;
9,028,744✔
1268
      }
1269

1270
      if (tsCol[r] < pWin->tw.ekey) {
3,817,028✔
1271
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
3,755,528✔
1272
        *ppWin = pWin;
3,755,528✔
1273
        *startPos = r;
3,755,528✔
1274
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
3,755,528✔
1275

1276
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
3,754,901✔
1277
            GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, (int32_t)r, tsCol[r], tsCol[r + *winRows - 1]);
1278
        
1279
        return TSDB_CODE_SUCCESS;
3,755,528✔
1280
      }
1281

1282
      break;
61,500✔
1283
    }
1284

1285
    if (r >= pInfo->rows) {
61,500✔
1286
      break;
×
1287
    }
1288
  }
1289

1290
  qDebug("%s %s no more ext win in block, TR[%" PRId64 ", %" PRId64 "), skip it", 
209✔
1291
      GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1292

1293
  *ppWin = NULL;
209✔
1294
  return TSDB_CODE_SUCCESS;
×
1295
}
1296

1297

1298
static int32_t extWinGetWinStartPos(STimeWindow win, const SDataBlockInfo* pBlockInfo, int32_t lastEndPos, int32_t order, int32_t* nextPos, int64_t* tsCol) {
×
1299
  bool ascQuery = order == TSDB_ORDER_ASC;
×
1300

1301
  if (win.ekey <= pBlockInfo->window.skey && ascQuery) {
×
1302
    return -2;
×
1303
  }
1304
//if (win.skey > pBlockInfo->window.ekey && !ascQuery) return -2;
1305

1306
  if (win.skey > pBlockInfo->window.ekey && ascQuery) return -1;
×
1307
//if (win.ekey < pBlockInfo->window.skey && !ascQuery) return -1;
1308

1309
  while (true) {
1310
    if (win.ekey <= tsCol[lastEndPos + 1] && ascQuery) return -2;
×
1311
    if (win.skey <= tsCol[lastEndPos + 1] && ascQuery) break;
×
1312
    lastEndPos++;
×
1313
  }
1314

1315
  *nextPos = lastEndPos + 1;
×
1316
  return 0;
×
1317
}
1318

1319
static int32_t extWinAggSetWinOutputBuf(SOperatorInfo* pOperator, SExtWinTimeWindow* win, SExprSupp* pSupp, 
2,147,483,647✔
1320
                                     SAggSupporter* pAggSup, SExecTaskInfo* pTaskInfo) {
1321
  int32_t code = 0, lino = 0;
2,147,483,647✔
1322
  SResultRow* pResultRow = NULL;
2,147,483,647✔
1323
  SExternalWindowOperator* pExtW = (SExternalWindowOperator*)pOperator->info;
2,147,483,647✔
1324
  
1325
#if 0
1326
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
1327
                                                  true, tableGroupId, pTaskInfo, true, pAggSup, true);
1328
  if (pResultRow == NULL) {
1329
    qError("failed to set result output buffer, error:%s", tstrerror(pTaskInfo->code));
1330
    return pTaskInfo->code;
1331
  }
1332

1333
  qDebug("current result rows num:%d", tSimpleHashGetSize(pAggSup->pResultRowHashTable));
1334

1335
#else
1336
  if (win->winOutIdx >= 0) {
2,147,483,647✔
1337
    pResultRow = (SResultRow*)((char*)pExtW->pResultRow + win->winOutIdx * pAggSup->resultRowSize);
2,147,483,647✔
1338
  } else {
1339
    win->winOutIdx = pExtW->outWinIdx++;
2,147,483,647✔
1340
    
1341
    qDebug("set window [%" PRId64 ", %" PRId64 "] outIdx:%d", win->tw.skey, win->tw.ekey, win->winOutIdx);
2,147,483,647✔
1342

1343
    pResultRow = (SResultRow*)((char*)pExtW->pResultRow + win->winOutIdx * pAggSup->resultRowSize);
2,147,483,647✔
1344
    
1345
    memset(pResultRow, 0, pAggSup->resultRowSize);
2,147,483,647✔
1346

1347
    pResultRow->winIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
2,147,483,647✔
1348
    TAOS_SET_POBJ_ALIGNED(&pResultRow->win, &win->tw);
2,147,483,647✔
1349
  }
1350
#endif
1351

1352
  // set time window for current result
1353
  TAOS_CHECK_EXIT(setResultRowInitCtx(pResultRow, pSupp->pCtx, pSupp->numOfExprs, pSupp->rowEntryInfoOffset));
2,147,483,647✔
1354

1355
_exit:
2,147,483,647✔
1356
  
1357
  if (code) {
2,147,483,647✔
1358
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1359
  }
1360

1361
  return code;
2,147,483,647✔
1362
}
1363

1364
static int32_t extWinAggDo(SOperatorInfo* pOperator, int32_t startPos, int32_t forwardRows,
2,147,483,647✔
1365
                                  SSDataBlock* pInputBlock) {
1366
  if (pOperator->pTaskInfo->pStreamRuntimeInfo && forwardRows == 0) {
2,147,483,647✔
1367
    return TSDB_CODE_SUCCESS;
×
1368
  }
1369

1370
  SExprSupp*               pSup = &pOperator->exprSupp;
2,147,483,647✔
1371
  SExternalWindowOperator* pExtW = pOperator->info;
2,147,483,647✔
1372
  return applyAggFunctionOnPartialTuples(pOperator->pTaskInfo, pSup->pCtx, &pExtW->twAggSup.timeWindowData, startPos,
2,147,483,647✔
1373
                                         forwardRows, pInputBlock->info.rows, pSup->numOfExprs);
2,147,483,647✔
1374

1375
}
1376

1377
static bool extWinLastWinClosed(SExternalWindowOperator* pExtW) {
490✔
1378
  if (pExtW->outWinIdx <= 0 || (pExtW->multiTableMode && !pExtW->inputHasOrder)) {
490✔
1379
    return false;
490✔
1380
  }
1381

1382
  if (NULL == pExtW->timeRangeExpr || !pExtW->timeRangeExpr->needCalc) {
×
1383
    return true;
×
1384
  }
1385

1386
  SList* pList = taosArrayGetP(pExtW->pOutputBlocks, pExtW->outWinIdx - 1);
×
1387
  if (0 == listNEles(pList)) {
×
1388
    return true;
×
1389
  }
1390

1391
  SListNode* pNode = listTail(pList);
×
1392
  SArray* pBlkWinIdx = *((SArray**)pNode->data + 1);
×
1393
  int64_t* pIdx = taosArrayGetLast(pBlkWinIdx);
×
1394
  if (pIdx && *(int32_t*)pIdx < pExtW->blkWinStartIdx) {
×
1395
    return true;
×
1396
  }
1397

1398
  return false;
×
1399
}
1400

1401
static int32_t extWinGetWinResBlock(SOperatorInfo* pOperator, int32_t rows, SExtWinTimeWindow* pWin, SSDataBlock** ppRes, SArray** ppIdx) {
1,147✔
1402
  SExternalWindowOperator* pExtW = pOperator->info;
1,147✔
1403
  SList*                   pList = NULL;
1,147✔
1404
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
1,147✔
1405
  
1406
  if (pWin->winOutIdx >= 0) {
1,147✔
1407
    pList = taosArrayGetP(pExtW->pOutputBlocks, pWin->winOutIdx);
657✔
1408
  } else {
1409
    if (extWinLastWinClosed(pExtW)) {
490✔
1410
      pWin->winOutIdx = pExtW->outWinIdx - 1;
×
1411
      pList = taosArrayGetP(pExtW->pOutputBlocks, pWin->winOutIdx);
×
1412
    } else {
1413
      pWin->winOutIdx = pExtW->outWinIdx++;
490✔
1414
      pList = tdListNew(POINTER_BYTES * 2);
490✔
1415
      TSDB_CHECK_NULL(pList, code, lino, _exit, terrno);
490✔
1416
      SList** ppList = taosArrayGet(pExtW->pOutputBlocks, pWin->winOutIdx);
490✔
1417
      extWinRecycleBlockList(pExtW, ppList);
490✔
1418
      *ppList = pList;
490✔
1419
    }
1420
  }
1421
  
1422
  TAOS_CHECK_EXIT(extWinGetLastBlockFromList(pExtW, pList, rows, ppRes, ppIdx));
1,147✔
1423

1424
_exit:
1,147✔
1425

1426
  if (code) {
1,147✔
1427
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1428
  }
1429

1430
  return code;
1,147✔
1431
}
1432

1433
static int32_t extWinProjectDo(SOperatorInfo* pOperator, SSDataBlock* pInputBlock, int32_t startPos, int32_t rows, SExtWinTimeWindow* pWin) {
1,147✔
1434
  SExternalWindowOperator* pExtW = pOperator->info;
1,147✔
1435
  SExprSupp*               pExprSup = &pExtW->scalarSupp;
1,147✔
1436
  SSDataBlock*             pResBlock = NULL;
1,147✔
1437
  SArray*                  pIdx = NULL;
1,147✔
1438
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
1,147✔
1439
  
1440
  TAOS_CHECK_EXIT(extWinGetWinResBlock(pOperator, rows, pWin, &pResBlock, &pIdx));
1,147✔
1441

1442
  qDebug("%s %s win[%" PRId64 ", %" PRId64 "] got res block %p winRowIdx %p, winOutIdx:%d, capacity:%d", 
1,147✔
1443
      pOperator->pTaskInfo->id.str, __func__, pWin->tw.skey, pWin->tw.ekey, pResBlock, pIdx, pWin->winOutIdx, pResBlock->info.capacity);
1444
  
1445
  if (!pExtW->pTmpBlock) {
1,147✔
1446
    TAOS_CHECK_EXIT(createOneDataBlock(pInputBlock, false, &pExtW->pTmpBlock));
271✔
1447
  } else {
1448
    blockDataCleanup(pExtW->pTmpBlock);
876✔
1449
  }
1450
  
1451
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pExtW->pTmpBlock, TMAX(1, rows)));
1,147✔
1452

1453
  qDebug("%s %s start to copy %d rows to tmp blk", pOperator->pTaskInfo->id.str, __func__, rows);
1,147✔
1454
  TAOS_CHECK_EXIT(blockDataMergeNRows(pExtW->pTmpBlock, pInputBlock, startPos, rows));
1,147✔
1455

1456
  qDebug("%s %s start to apply project to tmp blk", pOperator->pTaskInfo->id.str, __func__);
1,147✔
1457
  TAOS_CHECK_EXIT(projectApplyFunctionsWithSelect(pExprSup->pExprInfo, pResBlock, pExtW->pTmpBlock, pExprSup->pCtx, pExprSup->numOfExprs,
1,147✔
1458
        NULL, GET_STM_RTINFO(pOperator->pTaskInfo), true, pExprSup->hasIndefRowsFunc));
1459

1460
  TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pIdx, pResBlock, extWinGetCurWinIdx(pOperator->pTaskInfo), rows));
1,147✔
1461

1462
_exit:
1,147✔
1463

1464
  if (code) {
1,147✔
1465
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1466
  } else {
1467
    qDebug("%s %s project succeed", pOperator->pTaskInfo->id.str, __func__);
1,147✔
1468
  }
1469
  
1470
  return code;
1,147✔
1471
}
1472

1473
static int32_t extWinProjectOpen(SOperatorInfo* pOperator, SSDataBlock* pInputBlock) {
709✔
1474
  SExternalWindowOperator* pExtW = pOperator->info;
709✔
1475
  int64_t*                 tsCol = extWinExtractTsCol(pInputBlock, pExtW->primaryTsIndex, pOperator->pTaskInfo);
709✔
1476
  SExtWinTimeWindow*       pWin = NULL;
709✔
1477
  bool                     ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
709✔
1478
  int32_t                  startPos = 0, winRows = 0;
709✔
1479
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
709✔
1480
  
1481
  while (true) {
1482
    TAOS_CHECK_EXIT((*pExtW->getWinFp)(pOperator, tsCol, &startPos, &pInputBlock->info, &pWin, &winRows));
1,856✔
1483
    if (pWin == NULL) {
1,856✔
1484
      break;
709✔
1485
    }
1486

1487
    qDebug("%s ext window [%" PRId64 ", %" PRId64 ") project start, ascScan:%d, startPos:%d, winRows:%d",
1,147✔
1488
           GET_TASKID(pOperator->pTaskInfo), pWin->tw.skey, pWin->tw.ekey, ascScan, startPos, winRows);        
1489
    
1490
    TAOS_CHECK_EXIT(extWinProjectDo(pOperator, pInputBlock, startPos, winRows, pWin));
1,147✔
1491
    
1492
    startPos += winRows;
1,147✔
1493
  }
1494
  
1495
_exit:
709✔
1496

1497
  if (code) {
709✔
1498
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
1499
  }
1500

1501
  return code;
709✔
1502
}
1503

1504
static int32_t extWinIndefRowsDoImpl(SOperatorInfo* pOperator, SSDataBlock* pRes, SSDataBlock* pBlock) {
×
1505
  SExternalWindowOperator* pExtW = pOperator->info;
×
1506
  SOptrBasicInfo*     pInfo = &pExtW->binfo;
×
1507
  SExprSupp*          pSup = &pOperator->exprSupp;
×
1508
  SExecTaskInfo*      pTaskInfo = pOperator->pTaskInfo;
×
1509
  int32_t order = pInfo->inputTsOrder;
×
1510
  int32_t scanFlag = pBlock->info.scanFlag;
×
1511
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
×
1512

1513
  SExprSupp* pScalarSup = &pExtW->scalarSupp;
×
1514
  if (pScalarSup->pExprInfo != NULL) {
×
1515
    TAOS_CHECK_EXIT(projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
×
1516
                                 pExtW->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo)));
1517
  }
1518

1519
  TAOS_CHECK_EXIT(setInputDataBlock(pSup, pBlock, order, scanFlag, false));
×
1520

1521
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pRes, pRes->info.rows + pBlock->info.rows));
×
1522

1523
  TAOS_CHECK_EXIT(projectApplyFunctions(pSup->pExprInfo, pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
×
1524
                               pExtW->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo)));
1525

1526
_exit:
×
1527

1528
  if (code) {
×
1529
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __FUNCTION__, lino, tstrerror(code));
×
1530
  }
1531

1532
  return code;
×
1533
}
1534

1535
static int32_t extWinIndefRowsSetWinOutputBuf(SExternalWindowOperator* pExtW, SExtWinTimeWindow* win, SExprSupp* pSupp, 
×
1536
                                     SAggSupporter* pAggSup, SExecTaskInfo* pTaskInfo, bool reset) {
1537
  int32_t code = 0, lino = 0;
×
1538
  SResultRow* pResultRow = NULL;
×
1539

1540
  pResultRow = (SResultRow*)((char*)pExtW->pResultRow + win->winOutIdx * pAggSup->resultRowSize);
×
1541
  
1542
  qDebug("set window [%" PRId64 ", %" PRId64 "] outIdx:%d", win->tw.skey, win->tw.ekey, win->winOutIdx);
×
1543

1544
  if (reset) {
×
1545
    memset(pResultRow, 0, pAggSup->resultRowSize);
×
1546
    for (int32_t k = 0; k < pSupp->numOfExprs; ++k) {
×
1547
      SqlFunctionCtx* pCtx = &pSupp->pCtx[k];
×
1548
      pCtx->pOutput = NULL;
×
1549
    }
1550
  }
1551

1552
  TAOS_SET_POBJ_ALIGNED(&pResultRow->win, &win->tw);
×
1553

1554
  // set time window for current result
1555
  TAOS_CHECK_EXIT(setResultRowInitCtx(pResultRow, pSupp->pCtx, pSupp->numOfExprs, pSupp->rowEntryInfoOffset));
×
1556

1557
_exit:
×
1558
  
1559
  if (code) {
×
1560
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1561
  }
1562

1563
  return code;
×
1564
}
1565

1566
static int32_t extWinGetSetWinResBlockBuf(SOperatorInfo* pOperator, int32_t rows, SExtWinTimeWindow* pWin, SSDataBlock** ppRes, SArray** ppIdx) {
×
1567
  SExternalWindowOperator* pExtW = pOperator->info;
×
1568
  SList*                   pList = NULL;
×
1569
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
×
1570
  
1571
  if (pWin->winOutIdx >= 0) {
×
1572
    pList = taosArrayGetP(pExtW->pOutputBlocks, pWin->winOutIdx);
×
1573
    TAOS_CHECK_EXIT(extWinIndefRowsSetWinOutputBuf(pExtW, pWin, &pOperator->exprSupp, &pExtW->aggSup, pOperator->pTaskInfo, false));
×
1574
  } else {
1575
    if (extWinLastWinClosed(pExtW)) {
×
1576
      pWin->winOutIdx = pExtW->outWinIdx - 1;
×
1577
      pList = taosArrayGetP(pExtW->pOutputBlocks, pWin->winOutIdx);
×
1578
    } else {
1579
      pWin->winOutIdx = pExtW->outWinIdx++;
×
1580
      pList = tdListNew(POINTER_BYTES * 2);
×
1581
      TSDB_CHECK_NULL(pList, code, lino, _exit, terrno);
×
1582
      SList** ppList = taosArrayGet(pExtW->pOutputBlocks, pWin->winOutIdx);
×
1583
      extWinRecycleBlockList(pExtW, ppList);
×
1584
      *ppList = pList;
×
1585
    }
1586
    TAOS_CHECK_EXIT(extWinIndefRowsSetWinOutputBuf(pExtW, pWin, &pOperator->exprSupp, &pExtW->aggSup, pOperator->pTaskInfo, true));
×
1587
  }
1588
  
1589
  TAOS_CHECK_EXIT(extWinGetLastBlockFromList(pExtW, pList, rows, ppRes, ppIdx));
×
1590

1591
_exit:
×
1592

1593
  if (code) {
×
1594
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1595
  }
1596

1597
  return code;
×
1598
}
1599

1600

1601
static int32_t extWinIndefRowsDo(SOperatorInfo* pOperator, SSDataBlock* pInputBlock, int32_t startPos, int32_t rows, SExtWinTimeWindow* pWin) {
×
1602
  SExternalWindowOperator* pExtW = pOperator->info;
×
1603
  SSDataBlock*             pResBlock = NULL;
×
1604
  SArray*                  pIdx = NULL;
×
1605
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
×
1606
  
1607
  TAOS_CHECK_EXIT(extWinGetSetWinResBlockBuf(pOperator, rows, pWin, &pResBlock, &pIdx));
×
1608
  
1609
  if (!pExtW->pTmpBlock) {
×
1610
    TAOS_CHECK_EXIT(createOneDataBlock(pInputBlock, false, &pExtW->pTmpBlock));
×
1611
  } else {
1612
    blockDataCleanup(pExtW->pTmpBlock);
×
1613
  }
1614
  
1615
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pExtW->pTmpBlock, TMAX(1, rows)));
×
1616

1617
  TAOS_CHECK_EXIT(blockDataMergeNRows(pExtW->pTmpBlock, pInputBlock, startPos, rows));
×
1618
  TAOS_CHECK_EXIT(extWinIndefRowsDoImpl(pOperator, pResBlock, pExtW->pTmpBlock));
×
1619

1620
  TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pIdx, pResBlock, extWinGetCurWinIdx(pOperator->pTaskInfo), rows));
×
1621

1622
_exit:
×
1623

1624
  if (code) {
×
1625
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1626
  }
1627
  
1628
  return code;
×
1629
}
1630

1631

1632
static int32_t extWinIndefRowsOpen(SOperatorInfo* pOperator, SSDataBlock* pInputBlock) {
×
1633
  SExternalWindowOperator* pExtW = pOperator->info;
×
1634
  int64_t*                 tsCol = extWinExtractTsCol(pInputBlock, pExtW->primaryTsIndex, pOperator->pTaskInfo);
×
1635
  SExtWinTimeWindow*       pWin = NULL;
×
1636
  bool                     ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
×
1637
  int32_t                  startPos = 0, winRows = 0;
×
1638
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
×
1639
  
1640
  while (true) {
1641
    TAOS_CHECK_EXIT((*pExtW->getWinFp)(pOperator, tsCol, &startPos, &pInputBlock->info, &pWin, &winRows));
×
1642
    if (pWin == NULL) {
×
1643
      break;
×
1644
    }
1645

1646
    qDebug("%s ext window [%" PRId64 ", %" PRId64 ") indefRows start, ascScan:%d, startPos:%d, winRows:%d",
×
1647
           GET_TASKID(pOperator->pTaskInfo), pWin->tw.skey, pWin->tw.ekey, ascScan, startPos, winRows);        
1648
    
1649
    TAOS_CHECK_EXIT(extWinIndefRowsDo(pOperator, pInputBlock, startPos, winRows, pWin));
×
1650
    
1651
    startPos += winRows;
×
1652
  }
1653
  
1654
_exit:
×
1655

1656
  if (code) {
×
1657
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
1658
  }
1659

1660
  return code;
×
1661
}
1662

1663
static int32_t extWinNonAggOutputRes(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
490✔
1664
  SExternalWindowOperator* pExtW = pOperator->info;
490✔
1665
  int32_t                  numOfWin = pExtW->outWinIdx;
490✔
1666
  int32_t                  code = TSDB_CODE_SUCCESS;
490✔
1667
  int32_t                  lino = 0;
490✔
1668
  SSDataBlock*             pRes = NULL;
490✔
1669

1670
  for (; pExtW->outputWinId < numOfWin; pExtW->outputWinId++, extWinIncCurWinOutIdx(pOperator->pTaskInfo->pStreamRuntimeInfo)) {
490✔
1671
    SList* pList = taosArrayGetP(pExtW->pOutputBlocks, pExtW->outputWinId);
490✔
1672
    if (listNEles(pList) <= 0) {
490✔
1673
      continue;
×
1674
    }
1675

1676
    SListNode* pNode = tdListPopHead(pList);
490✔
1677
    pRes = *(SSDataBlock**)pNode->data;
490✔
1678
    pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamBlkWinIdx = *(SArray**)((SArray**)pNode->data + 1);
490✔
1679
    pExtW->pLastBlkNode = pNode;
490✔
1680

1681
    if (listNEles(pList) <= 0) {
490✔
1682
      pExtW->outputWinId++;
490✔
1683
      extWinIncCurWinOutIdx(pOperator->pTaskInfo->pStreamRuntimeInfo);
490✔
1684
    }
1685

1686
    break;
490✔
1687
  }
1688

1689
  if (pRes) {
490✔
1690
    qDebug("%s result generated, rows:%" PRId64 , GET_TASKID(pOperator->pTaskInfo), pRes->info.rows);
490✔
1691
    pRes->info.version = pOperator->pTaskInfo->version;
490✔
1692
    pRes->info.dataLoad = 1;
490✔
1693
  } else {
1694
    pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamBlkWinIdx = NULL;
×
1695
    qDebug("%s ext window done", GET_TASKID(pOperator->pTaskInfo));
×
1696
  }
1697

1698
  *ppRes = (pRes && pRes->info.rows > 0) ? pRes : NULL;
490✔
1699

1700
_exit:
490✔
1701

1702
  if (code != TSDB_CODE_SUCCESS) {
490✔
1703
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1704
  }
1705

1706
  return code;
490✔
1707
}
1708

1709
static int32_t extWinAggHandleEmptyWins(SOperatorInfo* pOperator, SSDataBlock* pBlock, bool allRemains, SExtWinTimeWindow* pWin) {
2,147,483,647✔
1710
  int32_t code = 0, lino = 0;
2,147,483,647✔
1711
  SExternalWindowOperator* pExtW = (SExternalWindowOperator*)pOperator->info;
2,147,483,647✔
1712
  SExprSupp* pSup = &pOperator->exprSupp;
2,147,483,647✔
1713
  int32_t currIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
2,147,483,647✔
1714

1715
  if (NULL == pExtW->pEmptyInputBlock || (pWin && pWin->tw.skey == pExtW->lastSKey)) {
2,147,483,647✔
1716
    goto _exit;
2,147,483,647✔
1717
  }
1718

1719
  bool ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
2,147,483,647✔
1720
  int32_t endIdx = allRemains ? (pExtW->pWins->size - 1) : (currIdx - 1);
2,147,483,647✔
1721
  SResultRowInfo* pResultRowInfo = &pExtW->binfo.resultRowInfo;
2,147,483,647✔
1722
  SSDataBlock* pInput = pExtW->pEmptyInputBlock;
2,147,483,647✔
1723

1724
  if ((pExtW->lastWinId + 1) <= endIdx) {
2,147,483,647✔
1725
    TAOS_CHECK_EXIT(setInputDataBlock(pSup, pExtW->pEmptyInputBlock, pExtW->binfo.inputTsOrder, MAIN_SCAN, true));
509,228✔
1726
  }
1727
  
1728
  for (int32_t i = pExtW->lastWinId + 1; i <= endIdx; ++i) {
2,147,483,647✔
1729
    SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, i);
1,175,775,627✔
1730

1731
    extWinSetCurWinIdx(pOperator, i);
1,175,775,627✔
1732
    qDebug("%s %dth ext empty window start:%" PRId64 ", end:%" PRId64 ", ascScan:%d",
1,175,775,627✔
1733
           GET_TASKID(pOperator->pTaskInfo), i, pWin->tw.skey, pWin->tw.ekey, ascScan);
1734

1735
    TAOS_CHECK_EXIT(extWinAggSetWinOutputBuf(pOperator, pWin, pSup, &pExtW->aggSup, pOperator->pTaskInfo));
1,175,775,627✔
1736

1737
    updateTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pWin->tw, 1);
1,175,775,627✔
1738
    code = extWinAggDo(pOperator, 0, 1, pInput);
1,175,775,627✔
1739
    pExtW->lastWinId = i;  
1,175,775,627✔
1740
    TAOS_CHECK_EXIT(code);
1,175,775,627✔
1741
  }
1742

1743
  
1744
_exit:
2,147,483,647✔
1745

1746
  if (code) {
2,147,483,647✔
1747
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __FUNCTION__, lino, tstrerror(code));
×
1748
  } else {
1749
    if (pBlock) {
2,147,483,647✔
1750
      TAOS_CHECK_EXIT(setInputDataBlock(pSup, pBlock, pExtW->binfo.inputTsOrder, pBlock->info.scanFlag, true));
2,147,483,647✔
1751
    }
1752

1753
    if (!allRemains) {
2,147,483,647✔
1754
      extWinSetCurWinIdx(pOperator, currIdx);  
2,147,483,647✔
1755
    }
1756
  }
1757

1758
  return code;
2,147,483,647✔
1759
}
1760

1761
static int32_t extWinAggOpen(SOperatorInfo* pOperator, SSDataBlock* pInputBlock) {
4,274,370✔
1762
  SExternalWindowOperator* pExtW = (SExternalWindowOperator*)pOperator->info;
4,274,370✔
1763
  int32_t                  startPos = 0, winRows = 0;
4,274,578✔
1764
  int64_t*                 tsCol = extWinExtractTsCol(pInputBlock, pExtW->primaryTsIndex, pOperator->pTaskInfo);
4,274,792✔
1765
  bool                     ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
4,274,379✔
1766
  int32_t                  code = 0, lino = 0;
4,274,379✔
1767
  SExtWinTimeWindow*       pWin = NULL;
4,274,379✔
1768
  bool                     scalarCalc = false;
4,274,379✔
1769

1770
  while (true) {
1771
    TAOS_CHECK_EXIT((*pExtW->getWinFp)(pOperator, tsCol, &startPos, &pInputBlock->info, &pWin, &winRows));
2,147,483,647✔
1772
    if (pWin == NULL) {
2,147,483,647✔
1773
      break;
4,274,578✔
1774
    }
1775

1776
    TAOS_CHECK_EXIT(extWinAggHandleEmptyWins(pOperator, pInputBlock, false, pWin));
2,147,483,647✔
1777

1778
    qDebug("%s ext window [%" PRId64 ", %" PRId64 ") agg start, ascScan:%d, startPos:%d, winRows:%d",
2,147,483,647✔
1779
           GET_TASKID(pOperator->pTaskInfo), pWin->tw.skey, pWin->tw.ekey, ascScan, startPos, winRows);        
1780

1781
    if (!scalarCalc) {
2,147,483,647✔
1782
      if (pExtW->scalarSupp.pExprInfo) {
4,231,584✔
1783
        SExprSupp* pScalarSup = &pExtW->scalarSupp;
5,643✔
1784
        TAOS_CHECK_EXIT(projectApplyFunctions(pScalarSup->pExprInfo, pInputBlock, pInputBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
5,643✔
1785
                                     pExtW->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo)));
1786
      }
1787
      
1788
      scalarCalc = true;
4,231,584✔
1789
    }
1790

1791
    if (pWin->tw.skey != pExtW->lastSKey || pWin->tw.skey == INT64_MIN) {
2,147,483,647✔
1792
      TAOS_CHECK_EXIT(extWinAggSetWinOutputBuf(pOperator, pWin, &pOperator->exprSupp, &pExtW->aggSup, pOperator->pTaskInfo));
2,147,483,647✔
1793
    }
1794
    
1795
    updateTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pWin->tw, 1);
2,147,483,647✔
1796
    TAOS_CHECK_EXIT(extWinAggDo(pOperator, startPos, winRows, pInputBlock));
2,147,483,647✔
1797
    
1798
    pExtW->lastSKey = pWin->tw.skey;
2,147,483,647✔
1799
    pExtW->lastWinId = extWinGetCurWinIdx(pOperator->pTaskInfo);
2,147,483,647✔
1800
    startPos += winRows;
2,147,483,647✔
1801
  }
1802

1803
_exit:
4,274,578✔
1804

1805
  if (code) {
4,274,578✔
1806
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
1807
  }
1808

1809
  return code;
4,274,578✔
1810
}
1811

1812
static int32_t extWinAggOutputRes(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,116,216✔
1813
  SExternalWindowOperator* pExtW = pOperator->info;
1,116,216✔
1814
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
1,116,216✔
1815
  SSDataBlock*             pBlock = pExtW->binfo.pRes;
1,116,216✔
1816
  int32_t                  code = TSDB_CODE_SUCCESS;
1,116,216✔
1817
  int32_t                  lino = 0;
1,116,216✔
1818
  SExprInfo*               pExprInfo = pOperator->exprSupp.pExprInfo;
1,116,216✔
1819
  int32_t                  numOfExprs = pOperator->exprSupp.numOfExprs;
1,116,216✔
1820
  int32_t*                 rowEntryOffset = pOperator->exprSupp.rowEntryInfoOffset;
1,116,216✔
1821
  SqlFunctionCtx*          pCtx = pOperator->exprSupp.pCtx;
1,116,216✔
1822
  int32_t                  numOfWin = pExtW->outWinIdx;
1,116,216✔
1823

1824
  pBlock->info.version = pTaskInfo->version;
1,116,216✔
1825
  blockDataCleanup(pBlock);
1,116,216✔
1826
  taosArrayClear(pExtW->pWinRowIdx);
1,116,216✔
1827

1828
  for (; pExtW->outputWinId < pExtW->pWins->size; ++pExtW->outputWinId) {
2,147,483,647✔
1829
    SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->outputWinId);
2,147,483,647✔
1830
    int32_t            winIdx = pWin->winOutIdx;
2,147,483,647✔
1831
    if (winIdx < 0) {
2,147,483,647✔
1832
      continue;
11,757,586✔
1833
    }
1834

1835
    pExtW->outputWinNum++;
2,147,483,647✔
1836
    SResultRow* pRow = (SResultRow*)((char*)pExtW->pResultRow + winIdx * pExtW->aggSup.resultRowSize);
2,147,483,647✔
1837

1838
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
2,147,483,647✔
1839

1840
    // no results, continue to check the next one
1841
    if (pRow->numOfRows == 0) {
2,147,483,647✔
1842
      continue;
×
1843
    }
1844

1845
    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
2,147,483,647✔
1846
      uint32_t newSize = pBlock->info.rows + pRow->numOfRows + numOfWin - pExtW->outputWinNum;
849,524✔
1847
      TAOS_CHECK_EXIT(blockDataEnsureCapacity(pBlock, newSize));
849,524✔
1848
      qDebug("datablock capacity not sufficient, expand to required:%d, current capacity:%d, %s", newSize,
849,524✔
1849
             pBlock->info.capacity, GET_TASKID(pTaskInfo));
1850
    }
1851

1852
    TAOS_CHECK_EXIT(copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo));
2,147,483,647✔
1853

1854
    pBlock->info.rows += pRow->numOfRows;
2,147,483,647✔
1855

1856
    TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pExtW->pWinRowIdx, pBlock, pRow->winIdx, pRow->numOfRows));
2,147,483,647✔
1857

1858
    if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
2,147,483,647✔
1859
      ++pExtW->outputWinId;
769,765✔
1860
      break;
769,765✔
1861
    }
1862
  }
1863

1864
  qDebug("%s result generated, rows:%" PRId64 ", groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
1,116,425✔
1865
         pBlock->info.id.groupId);
1866
         
1867
  pBlock->info.dataLoad = 1;
1,116,425✔
1868

1869
  *ppRes = (pBlock->info.rows > 0) ? pBlock : NULL;
1,116,216✔
1870

1871
  if (*ppRes) {
1,116,216✔
1872
    (*ppRes)->info.window.skey = pExtW->orgTableTimeRange.skey;
953,317✔
1873
    (*ppRes)->info.window.ekey = pExtW->orgTableTimeRange.ekey;
953,317✔
1874
  }
1875
  if (pOperator->pTaskInfo->pStreamRuntimeInfo) {
1,116,216✔
1876
    pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamBlkWinIdx = pExtW->pWinRowIdx;
326,405✔
1877
  }
1878

1879
_exit:
789,811✔
1880

1881
  if (code != TSDB_CODE_SUCCESS) {
1,116,216✔
1882
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1883
  }
1884

1885
  return code;
1,116,216✔
1886
}
1887

1888
static int32_t extWinInitResultRow(SExecTaskInfo*        pTaskInfo, SExternalWindowOperator* pExtW, int32_t winNum) {
952,559✔
1889
  if (EEXT_MODE_SCALAR == pExtW->mode) {
952,559✔
1890
    return TSDB_CODE_SUCCESS;
271✔
1891
  }
1892

1893
  if (winNum <= pExtW->resultRowCapacity) {
952,710✔
1894
    return TSDB_CODE_SUCCESS;
102,131✔
1895
  }
1896
  
1897
  taosMemoryFreeClear(pExtW->pResultRow);
850,579✔
1898
  pExtW->resultRowCapacity = -1;
850,579✔
1899

1900
  int32_t code = 0, lino = 0;
850,579✔
1901
  
1902
  pExtW->pResultRow = taosMemoryCalloc(winNum, pExtW->aggSup.resultRowSize);
850,579✔
1903
  QUERY_CHECK_NULL(pExtW->pResultRow, code, lino, _exit, terrno);
850,579✔
1904

1905
  pExtW->resultRowCapacity = winNum;
850,579✔
1906

1907
_exit:
850,579✔
1908

1909
  if (code) {
850,579✔
1910
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1911
  }
1912

1913
  return code;
850,579✔
1914
}
1915

1916
static void extWinFreeResultRow(SExternalWindowOperator* pExtW) {
162,899✔
1917
  if (pExtW->resultRowCapacity * pExtW->aggSup.resultRowSize >= 1048576) {
162,899✔
1918
    taosMemoryFreeClear(pExtW->pResultRow);
1,881✔
1919
    pExtW->resultRowCapacity = -1;
1,881✔
1920
  }
1921
  if (pExtW->binfo.pRes && pExtW->binfo.pRes->info.rows * pExtW->aggSup.resultRowSize >= 1048576) {
162,899✔
1922
    blockDataFreeCols(pExtW->binfo.pRes);
×
1923
  }
1924
}
162,899✔
1925

1926
static int32_t extWinInitWindowList(SExternalWindowOperator* pExtW, SExecTaskInfo*        pTaskInfo) {
163,170✔
1927
  if (taosArrayGetSize(pExtW->pWins) > 0) {
163,170✔
1928
    return TSDB_CODE_SUCCESS;
×
1929
  }
1930
  
1931
  int32_t code = 0, lino = 0;
163,170✔
1932
  SStreamRuntimeFuncInfo* pInfo = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
163,170✔
1933
  size_t size = taosArrayGetSize(pInfo->pStreamPesudoFuncVals);
162,956✔
1934
  SExtWinTimeWindow* pWin = taosArrayReserve(pExtW->pWins, size);
162,956✔
1935
  TSDB_CHECK_NULL(pWin, code, lino, _exit, terrno);
162,956✔
1936

1937
  TAOS_CHECK_EXIT(extWinInitResultRow(pTaskInfo, pExtW, size));
162,956✔
1938

1939
  if (pExtW->timeRangeExpr && pExtW->timeRangeExpr->needCalc) {
163,170✔
1940
    TAOS_CHECK_EXIT(scalarCalculateExtWinsTimeRange(pExtW->timeRangeExpr, pInfo, pWin));
62,785✔
1941
    if (qDebugFlag & DEBUG_DEBUG) {
62,785✔
1942
      for (int32_t i = 0; i < size; ++i) {
219,262✔
1943
        qDebug("%s the %d/%d ext window calced initialized, TR[%" PRId64 ", %" PRId64 ")", 
156,477✔
1944
            pTaskInfo->id.str, i, (int32_t)size, pWin[i].tw.skey, pWin[i].tw.ekey);
1945
      }
1946
    }
1947
  } else {
1948
    for (int32_t i = 0; i < size; ++i) {
11,394,570✔
1949
      SSTriggerCalcParam* pParam = taosArrayGet(pInfo->pStreamPesudoFuncVals, i);
11,294,185✔
1950

1951
      pWin[i].tw.skey = pParam->wstart;
11,294,185✔
1952
      pWin[i].tw.ekey = pParam->wend + ((pInfo->triggerType != STREAM_TRIGGER_SLIDING) ? 1 : 0);
11,294,185✔
1953
      pWin[i].winOutIdx = -1;
11,294,185✔
1954

1955
      qDebug("%s the %d/%d ext window initialized, TR[%" PRId64 ", %" PRId64 ")", 
11,294,185✔
1956
          pTaskInfo->id.str, i, (int32_t)size, pWin[i].tw.skey, pWin[i].tw.ekey);
1957
    }
1958
  }
1959
  
1960
  pExtW->outputWinId = pInfo->curIdx;
163,170✔
1961
  pExtW->lastWinId = -1;
163,170✔
1962
  pExtW->blkWinStartIdx = pInfo->curIdx;
163,170✔
1963

1964
_exit:
163,170✔
1965

1966
  if (code) {
163,170✔
1967
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1968
  }
1969

1970
  return code;
163,170✔
1971
}
1972

1973
static bool extWinNonAggGotResBlock(SExternalWindowOperator* pExtW) {
709✔
1974
  if (pExtW->multiTableMode && !pExtW->inputHasOrder) {
709✔
1975
    return false;
657✔
1976
  }
1977
  int32_t remainWin = pExtW->outWinIdx - pExtW->outputWinId;
52✔
1978
  if (remainWin > 1 && (NULL == pExtW->timeRangeExpr || !pExtW->timeRangeExpr->needCalc)) {
52✔
1979
    return true;
×
1980
  }
1981
  
1982
  SList* pList = taosArrayGetP(pExtW->pOutputBlocks, pExtW->outputWinId);
52✔
1983
  if (!pList || listNEles(pList) <= 0) {
52✔
1984
    return false;
×
1985
  }
1986
  if (listNEles(pList) > 1) {
52✔
1987
    return true;
×
1988
  }
1989

1990
  SListNode* pNode = listHead(pList);
52✔
1991
  SArray* pIdx = *(SArray**)((SArray**)pNode->data + 1);
52✔
1992
  int32_t* winIdx = taosArrayGetLast(pIdx);
52✔
1993
  if (winIdx && *winIdx < pExtW->blkWinStartIdx) {
52✔
1994
    return true;
×
1995
  }
1996

1997
  return false;
52✔
1998
}
1999

2000
static int32_t getTimeWindowOfBlock(SSDataBlock *pBlock, col_id_t tsSlotId, int64_t *startTs, int64_t *endTs) {
1,995,291✔
2001
  int32_t code = TSDB_CODE_SUCCESS;
1,995,291✔
2002
  int32_t lino = 0;
1,995,291✔
2003
  int32_t tsIndex = -1;
1,995,291✔
2004
  for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
2,526,471✔
2005
    SColumnInfoData *pCol = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
2,526,471✔
2006
    QUERY_CHECK_NULL(pCol, code, lino, _return, terrno)
2,526,471✔
2007
    if (pCol->info.colId == tsSlotId) {
2,526,471✔
2008
      tsIndex = i;
1,995,291✔
2009
      break;
1,995,291✔
2010
    }
2011
  }
2012

2013
  if (tsIndex == -1) {
1,995,291✔
2014
    tsIndex = (int32_t)taosArrayGetSize(pBlock->pDataBlock) - 1;
×
2015
  }
2016

2017
  SColumnInfoData *pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, tsIndex);
1,995,291✔
2018
  QUERY_CHECK_NULL(pColData, code, lino, _return, terrno)
1,995,291✔
2019

2020
  GET_TYPED_DATA(*startTs, int64_t, TSDB_DATA_TYPE_TIMESTAMP, colDataGetNumData(pColData, 0), 0);
1,995,291✔
2021
  GET_TYPED_DATA(*endTs, int64_t, TSDB_DATA_TYPE_TIMESTAMP, colDataGetNumData(pColData, pBlock->info.rows - 1), 0);
1,995,291✔
2022

2023
  return code;
1,995,291✔
2024
_return:
×
2025
  qError("failed to get time window of block, %s code:%s, line:%d", __func__, tstrerror(code), lino);
×
2026
  return code;
×
2027
}
2028

2029
static int32_t extWinOpen(SOperatorInfo* pOperator) {
952,981✔
2030
  if (OPTR_IS_OPENED(pOperator) && !pOperator->pOperatorGetParam) {
952,981✔
2031
    return TSDB_CODE_SUCCESS;
×
2032
  }
2033
  
2034
  int32_t                  code = 0;
952,981✔
2035
  int32_t                  lino = 0;
952,981✔
2036
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
952,981✔
2037
  SOperatorInfo*           pDownstream = pOperator->pDownstream[0];
952,981✔
2038
  SExternalWindowOperator* pExtW = pOperator->info;
952,981✔
2039
  SExprSupp*               pSup = &pOperator->exprSupp;
952,981✔
2040

2041
  if (pOperator->pOperatorGetParam) {
952,981✔
2042
    SOperatorParam*               pParam = (SOperatorParam*)(pOperator->pOperatorGetParam);
789,811✔
2043
    SOperatorParam*               pDownParam = (SOperatorParam*)(pOperator->pDownstreamGetParams[0]);
789,811✔
2044
    SExchangeOperatorParam*       pExecParam = NULL;
789,811✔
2045
    SExternalWindowOperatorParam* pExtPram = (SExternalWindowOperatorParam*)pParam->value;
789,811✔
2046

2047
    if (pExtW->pWins) {
789,811✔
2048
      taosArrayDestroy(pExtW->pWins);
789,811✔
2049
    }
2050

2051
    pExtW->pWins = pExtPram->ExtWins;
789,811✔
2052

2053
    TAOS_CHECK_EXIT(extWinInitResultRow(pTaskInfo, pExtW, taosArrayGetSize(pExtW->pWins)));
789,811✔
2054
    pExtPram->ExtWins = NULL;
789,811✔
2055
    pExtW->outputWinId = 0;
789,811✔
2056
    pExtW->lastWinId = -1;
789,811✔
2057
    pExtW->blkWinStartIdx = 0;
789,811✔
2058
    pExtW->outWinIdx = 0;
789,811✔
2059
    pExtW->lastSKey = INT64_MIN;
789,811✔
2060
    pExtW->isDynWindow = true;
789,811✔
2061
    pExtW->orgTableTimeRange.skey = INT64_MAX;
789,811✔
2062
    pExtW->orgTableTimeRange.ekey = INT64_MIN;
789,811✔
2063

2064
    QUERY_CHECK_CONDITION(pOperator->numOfDownstream == 1, code, lino, _exit, TSDB_CODE_INVALID_PARA)
789,811✔
2065

2066
    switch (pDownParam->opType) {
789,811✔
2067
      case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: {
789,811✔
2068
        pExecParam = (SExchangeOperatorParam*)((SOperatorParam*)(pOperator->pDownstreamGetParams[0]))->value;
789,811✔
2069
        if (!pExecParam->multiParams) {
789,811✔
2070
          pExecParam->basic.vgId = pExtW->orgTableVgId;
689,931✔
2071
          taosArrayClear(pExecParam->basic.uidList);
689,931✔
2072
          QUERY_CHECK_NULL(taosArrayPush(pExecParam->basic.uidList, &pExtW->orgTableUid), code, lino, _exit, terrno)
1,379,862✔
2073
        }
2074
        break;
789,811✔
2075
      }
2076
      default:
×
2077
        break;
×
2078
    }
2079

2080
    freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
789,811✔
2081
    pOperator->pOperatorGetParam = NULL;
789,811✔
2082
  } else {
2083
    TAOS_CHECK_EXIT(extWinInitWindowList(pExtW, pTaskInfo));
163,170✔
2084
  }
2085

2086
  while (1) {
4,275,287✔
2087
    pExtW->blkWinIdx = -1;
5,228,268✔
2088
    pExtW->blkWinStartSet = false;
5,228,268✔
2089
    pExtW->blkRowStartIdx = 0;
5,228,268✔
2090

2091
    SSDataBlock* pBlock = getNextBlockFromDownstreamRemain(pOperator, 0);
5,228,268✔
2092
    if (pOperator->pDownstreamGetParams) {
5,228,482✔
2093
      pOperator->pDownstreamGetParams[0] = NULL;
2,785,102✔
2094
    }
2095
    if (pBlock == NULL) {
5,228,482✔
2096
      if (EEXT_MODE_AGG == pExtW->mode) {
952,981✔
2097
        TAOS_CHECK_EXIT(extWinAggHandleEmptyWins(pOperator, pBlock, true, NULL));
952,710✔
2098
      }
2099
      pExtW->blkWinStartIdx = pExtW->pWins->size;
952,981✔
2100
      break;
952,981✔
2101
    }
2102

2103
    if (pExtW->isDynWindow) {
4,275,501✔
2104
      TSKEY skey = 0;
1,995,291✔
2105
      TSKEY ekey = 0;
1,995,291✔
2106
      code = getTimeWindowOfBlock(pBlock, pExtW->primaryTsIndex, &skey, &ekey);
1,995,291✔
2107
      QUERY_CHECK_CODE(code, lino, _exit);
1,995,291✔
2108
      pExtW->orgTableTimeRange.skey = TMIN(pExtW->orgTableTimeRange.skey, skey);
1,995,291✔
2109
      pExtW->orgTableTimeRange.ekey = TMAX(pExtW->orgTableTimeRange.ekey, ekey);
1,995,291✔
2110
    }
2111

2112
    printDataBlock(pBlock, __func__, pTaskInfo->id.str, pTaskInfo->id.queryId);
4,275,501✔
2113

2114
    qDebug("ext window mode:%d got %" PRId64 " rows from downstream", pExtW->mode, pBlock->info.rows);
4,275,501✔
2115
    
2116
    switch (pExtW->mode) {
4,275,501✔
2117
      case EEXT_MODE_SCALAR:
709✔
2118
        TAOS_CHECK_EXIT(extWinProjectOpen(pOperator, pBlock));
709✔
2119
        if (extWinNonAggGotResBlock(pExtW)) {
709✔
2120
          return code;
×
2121
        }
2122
        break;
709✔
2123
      case EEXT_MODE_AGG:
4,274,792✔
2124
        TAOS_CHECK_EXIT(extWinAggOpen(pOperator, pBlock));
4,274,792✔
2125
        break;
4,274,578✔
2126
      case EEXT_MODE_INDEFR_FUNC:
×
2127
        TAOS_CHECK_EXIT(extWinIndefRowsOpen(pOperator, pBlock));
×
2128
        if (extWinNonAggGotResBlock(pExtW)) {
×
2129
          return code;
×
2130
        }
2131
        break;
×
2132
      default:
×
2133
        break;
×
2134
    }
2135
  }
2136

2137
  if (pOperator->pOperatorGetParam) {
952,981✔
2138
    freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
×
2139
    pOperator->pOperatorGetParam = NULL;
×
2140
  }
2141
  OPTR_SET_OPENED(pOperator);
952,981✔
2142

2143
#if 0
2144
  if (pExtW->mode == EEXT_MODE_AGG) {
2145
    qDebug("ext window before dump final rows num:%d", tSimpleHashGetSize(pExtW->aggSup.pResultRowHashTable));
2146

2147
    code = initGroupedResultInfo(&pExtW->groupResInfo, pExtW->aggSup.pResultRowHashTable, pExtW->binfo.inputTsOrder);
2148
    QUERY_CHECK_CODE(code, lino, _exit);
2149

2150
    qDebug("ext window after dump final rows num:%d", tSimpleHashGetSize(pExtW->aggSup.pResultRowHashTable));
2151
  }
2152
#endif
2153

2154
_exit:
952,981✔
2155

2156
  if (code != 0) {
952,981✔
2157
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
2158
    pTaskInfo->code = code;
×
2159
    T_LONG_JMP(pTaskInfo->env, code);
×
2160
  }
2161
  
2162
  return code;
952,981✔
2163
}
2164

2165
static int32_t extWinNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,116,492✔
2166
  int32_t                  code = 0;
1,116,492✔
2167
  int32_t                  lino = 0;
1,116,492✔
2168
  SExternalWindowOperator* pExtW = pOperator->info;
1,116,492✔
2169
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
1,116,706✔
2170

2171
  if (pOperator->status == OP_EXEC_DONE && !pOperator->pOperatorGetParam) {
1,116,706✔
2172
    *ppRes = NULL;
×
2173
    return code;
×
2174
  }
2175

2176
  if (pOperator->pOperatorGetParam) {
1,116,706✔
2177
    if (pOperator->status == OP_EXEC_DONE) {
789,811✔
2178
      pOperator->status = OP_NOT_OPENED;
27,303✔
2179
    }
2180
  }
2181

2182
  extWinRecycleBlkNode(pExtW, &pExtW->pLastBlkNode);
1,116,706✔
2183

2184
  if (pOperator->status == OP_NOT_OPENED) {
1,116,706✔
2185
    TAOS_CHECK_EXIT(pOperator->fpSet._openFn(pOperator));
952,981✔
2186
  }
2187

2188
  if (pExtW->mode == EEXT_MODE_SCALAR || pExtW->mode == EEXT_MODE_INDEFR_FUNC) {
1,116,706✔
2189
    TAOS_CHECK_EXIT(extWinNonAggOutputRes(pOperator, ppRes));
490✔
2190
    if (NULL == *ppRes) {
490✔
2191
      setOperatorCompleted(pOperator);
×
2192
      extWinFreeResultRow(pExtW);
×
2193
    }
2194
  } else {
2195
#if 0    
2196
    doBuildResultDatablock(pOperator, &pExtW->binfo, &pExtW->groupResInfo, pExtW->aggSup.pResultBuf);
2197
    bool hasRemain = hasRemainResults(&pExtW->groupResInfo);
2198
    if (!hasRemain) {
2199
      setOperatorCompleted(pOperator);
2200
      break;
2201
    }
2202
    if (pExtW->binfo.pRes->info.rows > 0) break;
2203
#else
2204
    TAOS_CHECK_EXIT(extWinAggOutputRes(pOperator, ppRes));
1,116,216✔
2205
    if (NULL == *ppRes) {
1,116,216✔
2206
      setOperatorCompleted(pOperator);
162,899✔
2207
      if (pTaskInfo->pStreamRuntimeInfo) {
162,899✔
2208
        extWinFreeResultRow(pExtW);
162,899✔
2209
      }
2210
    }
2211
#endif      
2212
  }
2213

2214
  if (*ppRes) {
1,116,706✔
2215
    pOperator->resultInfo.totalRows += (*ppRes)->info.rows;
953,807✔
2216
    printDataBlock(*ppRes, __func__, GET_TASKID(pTaskInfo), pTaskInfo->id.queryId);
953,807✔
2217
  }
2218
  
2219
_exit:
162,899✔
2220

2221
  if (code) {
1,116,706✔
2222
    qError("%s %s failed at line %d since %s", GET_TASKID(pTaskInfo), __func__, lino, tstrerror(code));
×
2223
    pTaskInfo->code = code;
×
2224
    T_LONG_JMP(pTaskInfo->env, code);
×
2225
  }
2226

2227
  if ((*ppRes) && (*ppRes)->info.rows <= 0) {
1,116,706✔
2228
    *ppRes = NULL;
×
2229
  }
2230

2231
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM && (*ppRes)) {
1,116,706✔
2232
    printDataBlock(*ppRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo), pTaskInfo->id.queryId);
163,996✔
2233
  }
2234
  
2235
  return code;
1,116,706✔
2236
}
2237

2238

2239
int32_t createExternalWindowOperator(SOperatorInfo* pDownstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo,
840,983✔
2240
                                     SOperatorInfo** pOptrOut) {
2241
  SExternalWindowPhysiNode* pPhynode = (SExternalWindowPhysiNode*)pNode;
840,983✔
2242
  QRY_PARAM_CHECK(pOptrOut);
840,983✔
2243
  int32_t                  code = 0;
840,983✔
2244
  int32_t                  lino = 0;
840,983✔
2245
  SExternalWindowOperator* pExtW = taosMemoryCalloc(1, sizeof(SExternalWindowOperator));
840,983✔
2246
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
840,983✔
2247
  pOperator->pPhyNode = pNode;
840,983✔
2248
  if (!pExtW || !pOperator) {
840,983✔
2249
    code = terrno;
×
2250
    lino = __LINE__;
×
2251
    goto _error;
×
2252
  }
2253
  
2254
  setOperatorInfo(pOperator, "ExternalWindowOperator", QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW, true, OP_NOT_OPENED,
840,983✔
2255
                  pExtW, pTaskInfo);
2256
                  
2257
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhynode->window.node.pOutputDataBlockDesc);
840,983✔
2258
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
840,983✔
2259
  initBasicInfo(&pExtW->binfo, pResBlock);
840,983✔
2260

2261
  pExtW->primaryTsIndex = ((SColumnNode*)pPhynode->window.pTspk)->slotId;
840,983✔
2262
  pExtW->mode = pPhynode->window.pProjs ? EEXT_MODE_SCALAR : (pPhynode->window.indefRowsFunc ? EEXT_MODE_INDEFR_FUNC : EEXT_MODE_AGG);
840,983✔
2263
  pExtW->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC;
840,983✔
2264
  pExtW->binfo.outputTsOrder = pExtW->binfo.inputTsOrder;
840,983✔
2265
  pExtW->isDynWindow = false;
840,983✔
2266

2267
  if (pTaskInfo->pStreamRuntimeInfo != NULL){
840,983✔
2268
    pTaskInfo->pStreamRuntimeInfo->funcInfo.withExternalWindow = true;
78,475✔
2269
  }
2270

2271
  // pExtW->limitInfo = (SLimitInfo){0};
2272
  // initLimitInfo(pPhynode->window.node.pLimit, pPhynode->window.node.pSlimit, &pExtW->limitInfo);
2273

2274
  if (pPhynode->window.pProjs) {
840,983✔
2275
    int32_t    numOfScalarExpr = 0;
271✔
2276
    SExprInfo* pScalarExprInfo = NULL;
271✔
2277
    code = createExprInfo(pPhynode->window.pProjs, NULL, &pScalarExprInfo, &numOfScalarExpr);
271✔
2278
    QUERY_CHECK_CODE(code, lino, _error);
271✔
2279

2280
    code = initExprSupp(&pExtW->scalarSupp, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
271✔
2281
    QUERY_CHECK_CODE(code, lino, _error);
271✔
2282

2283
  //if (pExtW->multiTableMode) {
2284
    pExtW->pOutputBlocks = taosArrayInit_s(POINTER_BYTES, STREAM_CALC_REQ_MAX_WIN_NUM);
271✔
2285
    if (!pExtW->pOutputBlocks) QUERY_CHECK_CODE(terrno, lino, _error);
271✔
2286
  //}
2287
    pExtW->pFreeBlocks = tdListNew(POINTER_BYTES * 2);
271✔
2288
    QUERY_CHECK_NULL(pExtW->pFreeBlocks, code, lino, _error, terrno);
271✔
2289
  } else if (pExtW->mode == EEXT_MODE_AGG) {
840,712✔
2290
    if (pPhynode->window.pExprs != NULL) {
840,712✔
2291
      int32_t    num = 0;
1,881✔
2292
      SExprInfo* pSExpr = NULL;
1,881✔
2293
      code = createExprInfo(pPhynode->window.pExprs, NULL, &pSExpr, &num);
1,881✔
2294
      QUERY_CHECK_CODE(code, lino, _error);
1,881✔
2295
    
2296
      code = initExprSupp(&pExtW->scalarSupp, pSExpr, num, &pTaskInfo->storageAPI.functionStore);
1,881✔
2297
      if (code != TSDB_CODE_SUCCESS) {
1,881✔
2298
        goto _error;
×
2299
      }
2300
      checkIndefRowsFuncs(&pExtW->scalarSupp);
1,881✔
2301
    }
2302
    
2303
    size_t keyBufSize = sizeof(int64_t) * 2 + POINTER_BYTES;
840,712✔
2304
    initResultSizeInfo(&pOperator->resultInfo, 4096);
840,712✔
2305
    //code = blockDataEnsureCapacity(pExtW->binfo.pRes, pOperator->resultInfo.capacity);
2306
    //QUERY_CHECK_CODE(code, lino, _error);
2307

2308
    pExtW->pWinRowIdx = taosArrayInit(4096, sizeof(int64_t));
840,712✔
2309
    TSDB_CHECK_NULL(pExtW->pWinRowIdx, code, lino, _error, terrno);
840,712✔
2310
    
2311
    int32_t num = 0;
840,712✔
2312
    SExprInfo* pExprInfo = NULL;
840,712✔
2313
    code = createExprInfo(pPhynode->window.pFuncs, NULL, &pExprInfo, &num);
840,712✔
2314
    QUERY_CHECK_CODE(code, lino, _error);
840,712✔
2315
    pOperator->exprSupp.hasWindow = true;
840,712✔
2316
    pOperator->exprSupp.hasWindowOrGroup = true;
840,712✔
2317
    code = initAggSup(&pOperator->exprSupp, &pExtW->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, 0, 0);
840,501✔
2318
    QUERY_CHECK_CODE(code, lino, _error);
840,712✔
2319

2320
    nodesWalkExprs(pPhynode->window.pFuncs, extWinHasCountLikeFunc, &pExtW->hasCountFunc);
840,712✔
2321
    if (pExtW->hasCountFunc) {
840,712✔
2322
      code = extWinCreateEmptyInputBlock(pOperator, &pExtW->pEmptyInputBlock);
435,611✔
2323
      QUERY_CHECK_CODE(code, lino, _error);
435,611✔
2324
      qDebug("%s ext window has CountLikeFunc", pOperator->pTaskInfo->id.str);
435,611✔
2325
    } else {
2326
      qDebug("%s ext window doesn't have CountLikeFunc", pOperator->pTaskInfo->id.str);
405,101✔
2327
    }
2328

2329
    code = initExecTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pTaskInfo->window);
840,712✔
2330
    QUERY_CHECK_CODE(code, lino, _error);
840,712✔
2331

2332
    pExtW->lastSKey = INT64_MIN;
840,712✔
2333
  } else {
2334
    size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
×
2335
    
2336
    if (pPhynode->window.pExprs != NULL) {
×
2337
      int32_t    num = 0;
×
2338
      SExprInfo* pSExpr = NULL;
×
2339
      code = createExprInfo(pPhynode->window.pExprs, NULL, &pSExpr, &num);
×
2340
      QUERY_CHECK_CODE(code, lino, _error);
×
2341
    
2342
      code = initExprSupp(&pExtW->scalarSupp, pSExpr, num, &pTaskInfo->storageAPI.functionStore);
×
2343
      if (code != TSDB_CODE_SUCCESS) {
×
2344
        goto _error;
×
2345
      }
2346
    }
2347
    
2348
    int32_t    numOfExpr = 0;
×
2349
    SExprInfo* pExprInfo = NULL;
×
2350
    code = createExprInfo(pPhynode->window.pFuncs, NULL, &pExprInfo, &numOfExpr);
×
2351
    TSDB_CHECK_CODE(code, lino, _error);
×
2352
    
2353
    code = initAggSup(&pOperator->exprSupp, &pExtW->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str,
×
2354
                              pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
×
2355
    TSDB_CHECK_CODE(code, lino, _error);
×
2356
    pOperator->exprSupp.hasWindowOrGroup = false;
×
2357
    
2358
    //code = setFunctionResultOutput(pOperator, &pExtW->binfo, &pExtW->aggSup, MAIN_SCAN, numOfExpr);
2359
    //TSDB_CHECK_CODE(code, lino, _error);
2360
    
2361
    code = filterInitFromNode((SNode*)pNode->pConditions, &pOperator->exprSupp.pFilterInfo, 0,
×
2362
                              pTaskInfo->pStreamRuntimeInfo);
×
2363
    TSDB_CHECK_CODE(code, lino, _error);
×
2364
    
2365
    pExtW->binfo.inputTsOrder = pNode->inputTsOrder;
×
2366
    pExtW->binfo.outputTsOrder = pNode->outputTsOrder;
×
2367
    code = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfExpr, &pExtW->pPseudoColInfo);
×
2368
    TSDB_CHECK_CODE(code, lino, _error);
×
2369

2370
  //if (pExtW->multiTableMode) {
2371
    pExtW->pOutputBlocks = taosArrayInit_s(POINTER_BYTES, STREAM_CALC_REQ_MAX_WIN_NUM);
×
2372
    if (!pExtW->pOutputBlocks) QUERY_CHECK_CODE(terrno, lino, _error);
×
2373
  //}
2374
    pExtW->pFreeBlocks = tdListNew(POINTER_BYTES * 2);
×
2375
    QUERY_CHECK_NULL(pExtW->pFreeBlocks, code, lino, _error, terrno);  
×
2376
  }
2377

2378
  pExtW->pWins = taosArrayInit(4096, sizeof(SExtWinTimeWindow));
840,983✔
2379
  if (!pExtW->pWins) QUERY_CHECK_CODE(terrno, lino, _error);
840,983✔
2380
  
2381
  //initResultRowInfo(&pExtW->binfo.resultRowInfo);
2382

2383
  pExtW->timeRangeExpr = (STimeRangeNode*)pPhynode->pTimeRange;
840,983✔
2384
  if (pExtW->timeRangeExpr) {
840,983✔
2385
    QUERY_CHECK_NULL(pExtW->timeRangeExpr->pStart, code, lino, _error, TSDB_CODE_STREAM_INTERNAL_ERROR);
78,475✔
2386
    QUERY_CHECK_NULL(pExtW->timeRangeExpr->pEnd, code, lino, _error, TSDB_CODE_STREAM_INTERNAL_ERROR);
78,475✔
2387
  }
2388

2389
  if (pPhynode->isSingleTable) {
840,983✔
2390
    pExtW->getWinFp = (pExtW->timeRangeExpr && (pExtW->timeRangeExpr->needCalc || (pTaskInfo->pStreamRuntimeInfo->funcInfo.addOptions & CALC_SLIDING_OVERLAP))) ? extWinGetOvlpWin : extWinGetNoOvlpWin;
711,183✔
2391
    pExtW->multiTableMode = false;
711,183✔
2392
  } else {
2393
    pExtW->getWinFp = (pExtW->timeRangeExpr && (pExtW->timeRangeExpr->needCalc || (pTaskInfo->pStreamRuntimeInfo->funcInfo.addOptions & CALC_SLIDING_OVERLAP))) ? extWinGetMultiTbOvlpWin : extWinGetMultiTbNoOvlpWin;
129,800✔
2394
    pExtW->multiTableMode = true;
129,800✔
2395
  }
2396
  pExtW->inputHasOrder = pPhynode->inputHasOrder;
840,983✔
2397
  pExtW->orgTableUid = pPhynode->orgTableUid;
840,983✔
2398
  pExtW->orgTableVgId = pPhynode->orgTableVgId;
840,983✔
2399

2400
  pOperator->fpSet = createOperatorFpSet(extWinOpen, extWinNext, NULL, destroyExternalWindowOperatorInfo,
840,983✔
2401
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2402
  setOperatorResetStateFn(pOperator, resetExternalWindowOperator);
840,983✔
2403
  code = appendDownstream(pOperator, &pDownstream, 1);
840,983✔
2404
  if (code != 0) {
840,983✔
2405
    goto _error;
×
2406
  }
2407

2408
  *pOptrOut = pOperator;
840,983✔
2409
  return code;
840,983✔
2410

2411
_error:
×
2412

2413
  if (pExtW != NULL) {
×
2414
    destroyExternalWindowOperatorInfo(pExtW);
×
2415
  }
2416

2417
  destroyOperatorAndDownstreams(pOperator, &pDownstream, 1);
×
2418
  pTaskInfo->code = code;
×
2419
  qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code));
×
2420
  return code;
×
2421
}
2422

2423

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc