• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4844

09 Nov 2025 03:44PM UTC coverage: 63.058% (-0.5%) from 63.514%
#4844

push

travis-ci

web-flow
test: minor changes (#33510)

117164 of 185804 relevant lines covered (63.06%)

115657269.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.27
/source/libs/executor/src/externalwindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "operator.h"
18
#include "querytask.h"
19
#include "tdatablock.h"
20
#include "stream.h"
21
#include "filter.h"
22
#include "cmdnodes.h"
23

24
typedef struct SBlockList {
25
  const SSDataBlock* pSrcBlock;
26
  SList*             pBlocks;
27
  int32_t            blockRowNumThreshold;
28
} SBlockList;
29

30

31
typedef int32_t (*extWinGetWinFp)(SOperatorInfo*, int64_t*, int32_t*, SDataBlockInfo*, SExtWinTimeWindow**, int32_t*);
32

33
typedef struct SExtWindowStat {
34
  int64_t resBlockCreated;
35
  int64_t resBlockDestroyed;
36
  int64_t resBlockRecycled;
37
  int64_t resBlockReused;
38
  int64_t resBlockAppend;
39
} SExtWindowStat;
40

41
typedef struct SExternalWindowOperator {
42
  SOptrBasicInfo     binfo;
43
  SExprSupp          scalarSupp;
44
  int32_t            primaryTsIndex;
45
  EExtWinMode        mode;
46
  bool               multiTableMode;
47
  bool               inputHasOrder;
48
  SArray*            pWins;           // SArray<SExtWinTimeWindow>
49
  SArray*            pPseudoColInfo;  
50
  STimeRangeNode*    timeRangeExpr;
51
  
52
  extWinGetWinFp     getWinFp;
53

54
  bool               blkWinStartSet;
55
  int32_t            blkWinStartIdx;
56
  int32_t            blkWinIdx;
57
  int32_t            blkRowStartIdx;
58
  int32_t            outputWinId;
59
  int32_t            outWinIdx;
60

61
  // for project&indefRows
62
  SList*             pFreeBlocks;    // SList<SSDatablock*+SAarray*>
63
  SArray*            pOutputBlocks;  // SArray<SList*>, for each window, we have a list of blocks
64
  SListNode*         pLastBlkNode; 
65
  SSDataBlock*       pTmpBlock;
66
  
67
  // for agg
68
  SAggSupporter      aggSup;
69
  STimeWindowAggSupp twAggSup;
70

71
  int32_t            resultRowCapacity;
72
  SResultRow*        pResultRow;
73

74
  int64_t            lastSKey;
75
  int32_t            lastWinId;
76
  SSDataBlock*       pEmptyInputBlock;
77
  bool               hasCountFunc;
78
  SExtWindowStat     stat;
79
  SArray*            pWinRowIdx;
80
} SExternalWindowOperator;
81

82

83
static int32_t extWinBlockListAddBlock(SExternalWindowOperator* pExtW, SList* pList, int32_t rows, SSDataBlock** ppBlock, SArray** ppIdx) {
836✔
84
  SSDataBlock* pRes = NULL;
836✔
85
  int32_t code = 0, lino = 0;
836✔
86

87
  if (listNEles(pExtW->pFreeBlocks) > 0) {
836✔
88
    SListNode* pNode = tdListPopHead(pExtW->pFreeBlocks);
×
89
    *ppBlock = *(SSDataBlock**)pNode->data;
×
90
    *ppIdx = *(SArray**)((SArray**)pNode->data + 1);
×
91
    tdListAppendNode(pList, pNode);
×
92
    pExtW->stat.resBlockReused++;
×
93
  } else {
94
    TAOS_CHECK_EXIT(createOneDataBlock(pExtW->binfo.pRes, false, &pRes));
836✔
95
    TAOS_CHECK_EXIT(blockDataEnsureCapacity(pRes, TMAX(rows, 4096)));
836✔
96
    SArray* pIdx = taosArrayInit(10, sizeof(int64_t));
836✔
97
    TSDB_CHECK_NULL(pIdx, code, lino, _exit, terrno);
836✔
98
    void* res[2] = {pRes, pIdx};
836✔
99
    TAOS_CHECK_EXIT(tdListAppend(pList, res));
836✔
100

101
    *ppBlock = pRes;
836✔
102
    *ppIdx = pIdx;
836✔
103
    pExtW->stat.resBlockCreated++;
836✔
104
  }
105
  
106
_exit:
836✔
107

108
  if (code) {
836✔
109
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
110
    blockDataDestroy(pRes);
×
111
  }
112
  
113
  return code;
836✔
114
}
115

116
static int32_t extWinGetLastBlockFromList(SExternalWindowOperator* pExtW, SList* pList, int32_t rows, SSDataBlock** ppBlock, SArray** ppIdx) {
2,090✔
117
  int32_t    code = 0, lino = 0;
2,090✔
118
  SSDataBlock* pRes = NULL;
2,090✔
119

120
  SListNode* pNode = TD_DLIST_TAIL(pList);
2,090✔
121
  if (NULL == pNode) {
2,090✔
122
    TAOS_CHECK_EXIT(extWinBlockListAddBlock(pExtW, pList, rows, ppBlock, ppIdx));
836✔
123
    return code;
836✔
124
  }
125

126
  pRes = *(SSDataBlock**)pNode->data;
1,254✔
127
  if ((pRes->info.rows + rows) > pRes->info.capacity) {
1,254✔
128
    TAOS_CHECK_EXIT(extWinBlockListAddBlock(pExtW, pList, rows, ppBlock, ppIdx));
×
129
    return code;
×
130
  }
131

132
  *ppIdx = *(SArray**)((SSDataBlock**)pNode->data + 1);
1,254✔
133
  *ppBlock = pRes;
1,254✔
134
  pExtW->stat.resBlockAppend++;
1,254✔
135

136
_exit:
1,254✔
137

138
  if (code) {
1,254✔
139
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
140
  }
141
  
142
  return code;
1,254✔
143
}
144

145
static void extWinDestroyBlockList(void* p) {
17,121,280✔
146
  if (NULL == p) {
17,121,280✔
147
    return;
×
148
  }
149

150
  SListNode* pTmp = NULL;
17,121,280✔
151
  SList** ppList = (SList**)p;
17,121,280✔
152
  if ((*ppList) && TD_DLIST_NELES(*ppList) > 0) {
17,121,280✔
153
    SListNode* pNode = TD_DLIST_HEAD(*ppList);
×
154
    while (pNode) {
×
155
      SSDataBlock* pBlock = *(SSDataBlock**)pNode->data;
×
156
      blockDataDestroy(pBlock);
×
157
      SArray* pIdx = *(SArray**)((SArray**)pNode->data + 1);
×
158
      taosArrayDestroy(pIdx);
×
159
      pTmp = pNode;
×
160
      pNode = pNode->dl_next_;
×
161
      taosMemoryFree(pTmp);
×
162
    }
163
  }
164
  taosMemoryFree(*ppList);
17,121,280✔
165
}
166

167

168
static void extWinRecycleBlkNode(SExternalWindowOperator* pExtW, SListNode** ppNode) {
270,539✔
169
  if (NULL == ppNode || NULL == *ppNode) {
270,539✔
170
    return;
270,909✔
171
  }
172

173
  SSDataBlock* pBlock = *(SSDataBlock**)(*ppNode)->data;
24✔
174
  SArray* pIdx = *(SArray**)((SArray**)(*ppNode)->data + 1);
418✔
175
  
176
  if (listNEles(pExtW->pFreeBlocks) >= 10) {
418✔
177
    blockDataDestroy(pBlock);
×
178
    taosArrayDestroy(pIdx);
×
179
    taosMemoryFreeClear(*ppNode);
×
180
    pExtW->stat.resBlockDestroyed++;
×
181
    return;
×
182
  }
183
  
184
  blockDataCleanup(pBlock);
418✔
185
  taosArrayClear(pIdx);
418✔
186
  tdListPrependNode(pExtW->pFreeBlocks, *ppNode);
418✔
187
  *ppNode = NULL;
418✔
188
  pExtW->stat.resBlockRecycled++;
418✔
189
}
190

191
static void extWinRecycleBlockList(SExternalWindowOperator* pExtW, void* p) {
836✔
192
  if (NULL == p) {
836✔
193
    return;
×
194
  }
195

196
  SListNode* pTmp = NULL;
836✔
197
  SList** ppList = (SList**)p;
836✔
198
  if ((*ppList) && TD_DLIST_NELES(*ppList) > 0) {
836✔
199
    SListNode* pNode = TD_DLIST_HEAD(*ppList);
×
200
    while (pNode) {
×
201
      pTmp = pNode;
×
202
      pNode = pNode->dl_next_;
×
203
      extWinRecycleBlkNode(pExtW, &pTmp);
×
204
    }
205
  }
206
  taosMemoryFree(*ppList);
836✔
207
}
208
static void extWinDestroyBlkNode(SExternalWindowOperator* pInfo, SListNode* pNode) {
91,721✔
209
  if (NULL == pNode) {
91,721✔
210
    return;
90,885✔
211
  }
212

213
  SSDataBlock* pBlock = *(SSDataBlock**)pNode->data;
836✔
214
  SArray* pIdx = *(SArray**)((SArray**)pNode->data + 1);
836✔
215
  
216
  blockDataDestroy(pBlock);
836✔
217
  taosArrayDestroy(pIdx);
836✔
218

219
  taosMemoryFree(pNode);
836✔
220

221
  pInfo->stat.resBlockDestroyed++;
836✔
222
}
223

224

225
void destroyExternalWindowOperatorInfo(void* param) {
91,303✔
226
  if (NULL == param) {
91,303✔
227
    return;
×
228
  }
229
  SExternalWindowOperator* pInfo = (SExternalWindowOperator*)param;
91,303✔
230
  cleanupBasicInfo(&pInfo->binfo);
91,303✔
231

232
  taosArrayDestroyEx(pInfo->pOutputBlocks, extWinDestroyBlockList);
91,303✔
233
  taosArrayDestroy(pInfo->pWins);
91,303✔
234
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
91,303✔
235
  taosArrayDestroy(pInfo->pWinRowIdx);
91,303✔
236
  
237
  taosArrayDestroy(pInfo->pPseudoColInfo);
91,303✔
238
  blockDataDestroy(pInfo->pTmpBlock);
91,303✔
239
  blockDataDestroy(pInfo->pEmptyInputBlock);
91,303✔
240

241
  extWinDestroyBlkNode(pInfo, pInfo->pLastBlkNode);
91,303✔
242
  if (pInfo->pFreeBlocks) {
91,303✔
243
    SListNode *node;
244
    while ((node = TD_DLIST_HEAD(pInfo->pFreeBlocks)) != NULL) {
836✔
245
      TD_DLIST_POP(pInfo->pFreeBlocks, node);
418✔
246
      extWinDestroyBlkNode(pInfo, node);
418✔
247
    }
248
    taosMemoryFree(pInfo->pFreeBlocks);
418✔
249
  }
250
  
251
  cleanupAggSup(&pInfo->aggSup);
91,303✔
252
  cleanupExprSupp(&pInfo->scalarSupp);
91,303✔
253
  taosMemoryFreeClear(pInfo->pResultRow);
91,303✔
254

255
  pInfo->binfo.resultRowInfo.openWindow = tdListFree(pInfo->binfo.resultRowInfo.openWindow);
91,303✔
256

257
  qDebug("ext window stat at destroy, created:%" PRId64 ", destroyed:%" PRId64 ", recycled:%" PRId64 ", reused:%" PRId64 ", append:%" PRId64, 
91,303✔
258
      pInfo->stat.resBlockCreated, pInfo->stat.resBlockDestroyed, pInfo->stat.resBlockRecycled, 
259
      pInfo->stat.resBlockReused, pInfo->stat.resBlockAppend);
260

261
  taosMemoryFreeClear(pInfo);
91,303✔
262
}
263

264
static int32_t extWinOpen(SOperatorInfo* pOperator);
265
static int32_t extWinNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
266

267
typedef struct SMergeAlignedExternalWindowOperator {
268
  SExternalWindowOperator* pExtW;
269
  int64_t curTs;
270
  SResultRow*  pResultRow;
271
} SMergeAlignedExternalWindowOperator;
272

273
void destroyMergeAlignedExternalWindowOperator(void* pOperator) {
2,834✔
274
  SMergeAlignedExternalWindowOperator* pMlExtInfo = (SMergeAlignedExternalWindowOperator*)pOperator;
2,834✔
275
  destroyExternalWindowOperatorInfo(pMlExtInfo->pExtW);
2,834✔
276
  taosMemoryFreeClear(pMlExtInfo);
2,834✔
277
}
2,834✔
278

279
int64_t* extWinExtractTsCol(SSDataBlock* pBlock, int32_t primaryTsIndex, SExecTaskInfo* pTaskInfo) {
4,169,081✔
280
  TSKEY* tsCols = NULL;
4,169,081✔
281

282
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
4,169,081✔
283
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, primaryTsIndex);
4,169,081✔
284
    if (!pColDataInfo) {
4,169,081✔
285
      pTaskInfo->code = terrno;
×
286
      T_LONG_JMP(pTaskInfo->env, terrno);
×
287
    }
288

289
    tsCols = (int64_t*)pColDataInfo->pData;
4,169,081✔
290
    if (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0) {
4,169,081✔
291
      int32_t code = blockDataUpdateTsWindow(pBlock, primaryTsIndex);
4,169,081✔
292
      if (code != TSDB_CODE_SUCCESS) {
4,169,081✔
293
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
294
        pTaskInfo->code = code;
×
295
        T_LONG_JMP(pTaskInfo->env, code);
×
296
      }
297
    }
298
  }
299

300
  return tsCols;
4,169,081✔
301
}
302

303
static int32_t extWinGetCurWinIdx(SExecTaskInfo* pTaskInfo) {
24,813,120✔
304
  return pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx;
24,813,120✔
305
}
306

307
static void extWinIncCurWinIdx(SOperatorInfo* pOperator) {
×
308
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
×
309
  pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx++;
×
310
}
×
311

312
static void extWinSetCurWinIdx(SOperatorInfo* pOperator, int32_t idx) {
24,314,102✔
313
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
24,314,102✔
314
  pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx = idx;
24,314,483✔
315
}
24,313,340✔
316

317

318
static void extWinIncCurWinOutIdx(SStreamRuntimeInfo* pStreamRuntimeInfo) {
836✔
319
  pStreamRuntimeInfo->funcInfo.curOutIdx++;
836✔
320
}
836✔
321

322

323
static const STimeWindow* extWinGetNextWin(SExternalWindowOperator* pExtW, SExecTaskInfo* pTaskInfo) {
×
324
  int32_t curIdx = extWinGetCurWinIdx(pTaskInfo);
×
325
  if (curIdx + 1 >= pExtW->pWins->size) return NULL;
×
326
  return taosArrayGet(pExtW->pWins, curIdx + 1);
×
327
}
328

329

330
static int32_t extWinAppendWinIdx(SExecTaskInfo*       pTaskInfo, SArray* pIdx, SSDataBlock* pBlock, int32_t currWinIdx, int32_t rows) {
448,310✔
331
  int32_t  code = 0, lino = 0;
448,310✔
332
  int64_t* lastRes = taosArrayGetLast(pIdx);
448,310✔
333
  int32_t* lastWinIdx = (int32_t*)lastRes;
448,310✔
334
  int32_t* lastRowIdx = lastWinIdx ? (lastWinIdx + 1) : NULL;
448,310✔
335
  int64_t  res = 0;
448,310✔
336
  int32_t* pWinIdx = (int32_t*)&res;
448,310✔
337
  int32_t* pRowIdx = pWinIdx + 1;
448,310✔
338

339
  if (lastWinIdx && *lastWinIdx == currWinIdx) {
448,310✔
340
    return code;
1,254✔
341
  }
342

343
  *pWinIdx = currWinIdx;
447,056✔
344
  *pRowIdx = pBlock->info.rows - rows;
447,056✔
345

346
  TSDB_CHECK_NULL(taosArrayPush(pIdx, &res), code, lino, _exit, terrno);
447,056✔
347

348
_exit:
447,056✔
349

350
  if (code) {
447,056✔
351
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
352
  }
353

354
  return code;
447,056✔
355
}
356

357

358
static int32_t mergeAlignExtWinSetOutputBuf(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, const STimeWindow* pWin, SResultRow** pResult,
3,693✔
359
                                       SExprSupp* pExprSup, SAggSupporter* pAggSup) {
360
  if (*pResult == NULL) {
3,693✔
361
    *pResult = getNewResultRow(pAggSup->pResultBuf, &pAggSup->currentPageId, pAggSup->resultRowSize);
2,439✔
362
    if (!*pResult) {
2,439✔
363
      qError("get new resultRow failed, err:%s", tstrerror(terrno));
×
364
      return terrno;
×
365
    }
366
    pResultRowInfo->cur = (SResultRowPosition){.pageId = (*pResult)->pageId, .offset = (*pResult)->offset};
2,439✔
367
  }
368
  
369
  (*pResult)->win = *pWin;
3,693✔
370
  (*pResult)->winIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
3,693✔
371
  
372
  return setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
3,693✔
373
}
374

375

376
static int32_t mergeAlignExtWinGetWinFromTs(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW, TSKEY ts, STimeWindow** ppWin) {
3,693✔
377
  int32_t blkWinIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
3,693✔
378
  
379
  // TODO handle desc order
380
  for (int32_t i = blkWinIdx; i < pExtW->pWins->size; ++i) {
4,947✔
381
    STimeWindow* pWin = taosArrayGet(pExtW->pWins, i);
4,947✔
382
    if (ts == pWin->skey) {
4,947✔
383
      extWinSetCurWinIdx(pOperator, i);
3,693✔
384
      *ppWin = pWin;
3,693✔
385
      return TSDB_CODE_SUCCESS;
3,693✔
386
    } else if (ts < pWin->skey) {
1,254✔
387
      qError("invalid ts %" PRId64 " for current window idx %d skey %" PRId64, ts, i, pWin->skey);
×
388
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
389
    }
390
  }
391
  
392
  qError("invalid ts %" PRId64 " to find merge aligned ext window, size:%d", ts, (int32_t)pExtW->pWins->size);
×
393
  return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
394
}
395

396
static int32_t mergeAlignExtWinFinalizeResult(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, SSDataBlock* pResultBlock) {
3,693✔
397
  int32_t        code = 0, lino = 0;
3,693✔
398
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
3,693✔
399
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
3,693✔
400
  SExprSupp*     pSup = &pOperator->exprSupp;
3,693✔
401
  SResultRow*  pResultRow = pMlExtInfo->pResultRow;
3,693✔
402
  
403
  finalizeResultRows(pExtW->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pOperator->pTaskInfo);
3,693✔
404
  
405
  if (pResultRow->numOfRows > 0) {
3,693✔
406
    TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pExtW->pWinRowIdx, pResultBlock, pResultRow->winIdx, pResultRow->numOfRows));
3,693✔
407
  }
408

409
_exit:
3,693✔
410

411
  if (code) {
3,693✔
412
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
413
  }
414

415
  return code;
3,693✔
416
}
417

418
static int32_t mergeAlignExtWinAggDo(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock, SSDataBlock* pResultBlock) {
2,439✔
419
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
2,439✔
420
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
2,439✔
421

422
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2,439✔
423
  SExprSupp*     pSup = &pOperator->exprSupp;
2,439✔
424
  int32_t        code = 0, lino = 0;
2,439✔
425
  STimeWindow *pWin = NULL;
2,439✔
426

427
  int32_t startPos = 0;
2,439✔
428
  int64_t* tsCols = extWinExtractTsCol(pBlock, pExtW->primaryTsIndex, pTaskInfo);
2,439✔
429
  TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
2,439✔
430
  
431
  code = mergeAlignExtWinGetWinFromTs(pOperator, pExtW, ts, &pWin);
2,439✔
432
  if (code) {
2,439✔
433
    qError("failed to get time window for ts:%" PRId64 ", prim ts index:%d, error:%s", ts, pExtW->primaryTsIndex, tstrerror(code));
×
434
    TAOS_CHECK_EXIT(code);
×
435
  }
436

437
  if (pMlExtInfo->curTs != INT64_MIN && pMlExtInfo->curTs != pWin->skey) {
2,439✔
438
    TAOS_CHECK_EXIT(mergeAlignExtWinFinalizeResult(pOperator, pResultRowInfo, pResultBlock));
×
439
    resetResultRow(pMlExtInfo->pResultRow, pExtW->aggSup.resultRowSize - sizeof(SResultRow));
×
440
  }
441
  
442
  TAOS_CHECK_EXIT(mergeAlignExtWinSetOutputBuf(pOperator, pResultRowInfo, pWin, &pMlExtInfo->pResultRow, pSup, &pExtW->aggSup));
2,439✔
443

444
  int32_t currPos = startPos;
2,439✔
445
  pMlExtInfo->curTs = pWin->skey;
2,439✔
446
  
447
  while (++currPos < pBlock->info.rows) {
6,201✔
448
    if (tsCols[currPos] == pMlExtInfo->curTs) continue;
3,762✔
449

450
    qDebug("current ts:%" PRId64 ", startPos:%d, currPos:%d, tsCols[currPos]:%" PRId64,
1,254✔
451
      pMlExtInfo->curTs, startPos, currPos, tsCols[currPos]); 
452
    TAOS_CHECK_EXIT(applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pExtW->twAggSup.timeWindowData, startPos,
1,254✔
453
                                           currPos - startPos, pBlock->info.rows, pSup->numOfExprs));
454

455
    TAOS_CHECK_EXIT(mergeAlignExtWinFinalizeResult(pOperator, pResultRowInfo, pResultBlock));
1,254✔
456
    resetResultRow(pMlExtInfo->pResultRow, pExtW->aggSup.resultRowSize - sizeof(SResultRow));
1,254✔
457

458
    TAOS_CHECK_EXIT(mergeAlignExtWinGetWinFromTs(pOperator, pExtW, tsCols[currPos], &pWin));
1,254✔
459
    
460
    qDebug("ext window align2 start:%" PRId64 ", end:%" PRId64, pWin->skey, pWin->ekey);
1,254✔
461
    startPos = currPos;
1,254✔
462
    
463
    TAOS_CHECK_EXIT(mergeAlignExtWinSetOutputBuf(pOperator, pResultRowInfo, pWin, &pMlExtInfo->pResultRow, pSup, &pExtW->aggSup));
1,254✔
464

465
    pMlExtInfo->curTs = pWin->skey;
1,254✔
466
  }
467

468
  code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pExtW->twAggSup.timeWindowData, startPos,
4,878✔
469
                                         currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
2,439✔
470

471
_exit:
2,439✔
472

473
  if (code != 0) {
2,439✔
474
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
475
    T_LONG_JMP(pTaskInfo->env, code);
×
476
  }
477
  
478
  return code;
2,439✔
479
}
480

481
static int32_t mergeAlignExtWinBuildWinRowIdx(SOperatorInfo* pOperator, SSDataBlock* pInput, SSDataBlock* pResult) {
×
482
  SExternalWindowOperator* pExtW = pOperator->info;
×
483
  int64_t* tsCols = extWinExtractTsCol(pInput, pExtW->primaryTsIndex, pOperator->pTaskInfo);
×
484
  STimeWindow* pWin = NULL;
×
485
  int32_t code = 0, lino = 0;
×
486
  int64_t prevTs = INT64_MIN;
×
487
  
488
  for (int32_t i = 0; i < pInput->info.rows; ++i) {
×
489
    if (prevTs == tsCols[i]) {
×
490
      continue;
×
491
    }
492
    
493
    TAOS_CHECK_EXIT(mergeAlignExtWinGetWinFromTs(pOperator, pExtW, tsCols[i], &pWin));
×
494
    TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pExtW->pWinRowIdx, pResult, extWinGetCurWinIdx(pOperator->pTaskInfo), pInput->info.rows - i));
×
495

496
    prevTs = tsCols[i];
×
497
  }
498

499
_exit:
×
500

501
  if (code != 0) {
×
502
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
503
  }
504

505
  return code;  
×
506
}
507

508
static int32_t mergeAlignExtWinProjectDo(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
×
509
                                            SSDataBlock* pResultBlock) {
510
  SExternalWindowOperator* pExtW = pOperator->info;
×
511
  SExprSupp*               pExprSup = &pExtW->scalarSupp;
×
512
  int32_t                  code = 0, lino = 0;
×
513
  
514
  TAOS_CHECK_EXIT(projectApplyFunctions(pExprSup->pExprInfo, pResultBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
×
515
                        GET_STM_RTINFO(pOperator->pTaskInfo)));
516

517
  TAOS_CHECK_EXIT(mergeAlignExtWinBuildWinRowIdx(pOperator, pBlock, pResultBlock));
×
518

519
_exit:
×
520

521
  if (code != 0) {
×
522
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
523
  }
524

525
  return code;
×
526
}
527

528
void mergeAlignExtWinDo(SOperatorInfo* pOperator) {
2,834✔
529
  SExecTaskInfo*                       pTaskInfo = pOperator->pTaskInfo;
2,834✔
530
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
2,834✔
531
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
2,834✔
532
  SResultRow*                          pResultRow = NULL;
2,834✔
533
  int32_t                              code = 0;
2,834✔
534
  SSDataBlock*                         pRes = pExtW->binfo.pRes;
2,834✔
535
  SExprSupp*                           pSup = &pOperator->exprSupp;
2,834✔
536
  int32_t                              lino = 0;
2,834✔
537

538
  taosArrayClear(pExtW->pWinRowIdx);
2,834✔
539
  blockDataCleanup(pRes);
2,834✔
540

541
  while (1) {
2,439✔
542
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
5,273✔
543

544
    if (pBlock == NULL) {
5,273✔
545
      // close last time window
546
      if (pMlExtInfo->curTs != INT64_MIN && EEXT_MODE_AGG == pExtW->mode) {
2,834✔
547
        TAOS_CHECK_EXIT(mergeAlignExtWinFinalizeResult(pOperator, &pExtW->binfo.resultRowInfo, pRes));
2,439✔
548
      }
549
      setOperatorCompleted(pOperator);
2,834✔
550
      break;
2,834✔
551
    }
552

553
    pRes->info.scanFlag = pBlock->info.scanFlag;
2,439✔
554
    code = setInputDataBlock(pSup, pBlock, pExtW->binfo.inputTsOrder, pBlock->info.scanFlag, true);
2,439✔
555
    QUERY_CHECK_CODE(code, lino, _exit);
2,439✔
556

557
    printDataBlock(pBlock, __func__, "externalwindowAlign", pTaskInfo->id.queryId);
2,439✔
558
    qDebug("ext windowpExtWAlign->scalarMode:%d", pExtW->mode);
2,439✔
559

560
    if (EEXT_MODE_SCALAR == pExtW->mode) {
2,439✔
561
      TAOS_CHECK_EXIT(mergeAlignExtWinProjectDo(pOperator, &pExtW->binfo.resultRowInfo, pBlock, pRes));
×
562
    } else {
563
      TAOS_CHECK_EXIT(mergeAlignExtWinAggDo(pOperator, &pExtW->binfo.resultRowInfo, pBlock, pRes));
2,439✔
564
    }
565

566
    if (pRes->info.rows >= pOperator->resultInfo.threshold) {
2,439✔
567
      break;
×
568
    }
569
  }
570

571
  pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamBlkWinIdx = pExtW->pWinRowIdx;
2,834✔
572
  
573
_exit:
2,834✔
574

575
  if (code != 0) {
2,834✔
576
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
577
    pTaskInfo->code = code;
×
578
    T_LONG_JMP(pTaskInfo->env, code);
×
579
  }
580
}
2,834✔
581

582
static int32_t mergeAlignExtWinNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
5,273✔
583
  SExecTaskInfo*                       pTaskInfo = pOperator->pTaskInfo;
5,273✔
584
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
5,273✔
585
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
5,273✔
586
  int32_t                              code = 0;
5,273✔
587
  int32_t lino = 0;
5,273✔
588

589
  if (pOperator->status == OP_EXEC_DONE) {
5,273✔
590
    (*ppRes) = NULL;
2,439✔
591
    return TSDB_CODE_SUCCESS;
2,439✔
592
  }
593

594
  SSDataBlock* pRes = pExtW->binfo.pRes;
2,834✔
595
  blockDataCleanup(pRes);
2,834✔
596

597
  if (taosArrayGetSize(pExtW->pWins) <= 0) {
2,834✔
598
    size_t size = taosArrayGetSize(pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPesudoFuncVals);
2,834✔
599
    STimeWindow* pWin = taosArrayReserve(pExtW->pWins, size);
2,834✔
600
    TSDB_CHECK_NULL(pWin, code, lino, _exit, terrno);
2,834✔
601

602
    for (int32_t i = 0; i < size; ++i) {
6,922✔
603
      SSTriggerCalcParam* pParam = taosArrayGet(pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPesudoFuncVals, i);
4,088✔
604
      pWin[i].skey = pParam->wstart;
4,088✔
605
      pWin[i].ekey = pParam->wstart + 1;
4,088✔
606
    }
607
    
608
    pExtW->outputWinId = pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx;
2,834✔
609
  }
610

611
  mergeAlignExtWinDo(pOperator);
2,834✔
612
  
613
  size_t rows = pRes->info.rows;
2,834✔
614
  pOperator->resultInfo.totalRows += rows;
2,834✔
615
  (*ppRes) = (rows == 0) ? NULL : pRes;
2,834✔
616

617
_exit:
2,834✔
618

619
  if (code != 0) {
2,834✔
620
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
621
    pTaskInfo->code = code;
×
622
    T_LONG_JMP(pTaskInfo->env, code);
×
623
  }
624
  return code;
2,834✔
625
}
626

627
int32_t resetMergeAlignedExtWinOperator(SOperatorInfo* pOperator) {
2,834✔
628
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
2,834✔
629
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
2,834✔
630
  SExecTaskInfo*                       pTaskInfo = pOperator->pTaskInfo;
2,834✔
631
  SMergeAlignedIntervalPhysiNode * pPhynode = (SMergeAlignedIntervalPhysiNode*)pOperator->pPhyNode;
2,834✔
632
  pOperator->status = OP_NOT_OPENED;
2,834✔
633

634
  taosArrayClear(pExtW->pWins);
2,834✔
635

636
  resetBasicOperatorState(&pExtW->binfo);
2,834✔
637
  pMlExtInfo->pResultRow = NULL;
2,834✔
638
  pMlExtInfo->curTs = INT64_MIN;
2,834✔
639

640
  int32_t code = resetAggSup(&pOperator->exprSupp, &pExtW->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
5,668✔
641
                             sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
2,834✔
642
                             &pTaskInfo->storageAPI.functionStore);
643
  if (code == 0) {
2,834✔
644
    colDataDestroy(&pExtW->twAggSup.timeWindowData);
2,834✔
645
    code = initExecTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pTaskInfo->window);
2,834✔
646
  }
647
  return code;
2,834✔
648
}
649

650
int32_t createMergeAlignedExternalWindowOperator(SOperatorInfo* pDownstream, SPhysiNode* pNode,
2,834✔
651
                                                 SExecTaskInfo* pTaskInfo, SOperatorInfo** ppOptrOut) {
652
  SMergeAlignedIntervalPhysiNode* pPhynode = (SMergeAlignedIntervalPhysiNode*)pNode;
2,834✔
653
  int32_t code = 0;
2,834✔
654
  int32_t lino = 0;
2,834✔
655
  SMergeAlignedExternalWindowOperator* pMlExtInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedExternalWindowOperator));
2,834✔
656
  SOperatorInfo*                       pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2,834✔
657

658
  if (pTaskInfo->pStreamRuntimeInfo != NULL){
2,834✔
659
    pTaskInfo->pStreamRuntimeInfo->funcInfo.withExternalWindow = true;
2,834✔
660
  }
661
  pOperator->pPhyNode = pNode;
2,834✔
662
  if (!pMlExtInfo || !pOperator) {
2,834✔
663
    code = terrno;
×
664
    goto _error;
×
665
  }
666

667
  pMlExtInfo->pExtW = taosMemoryCalloc(1, sizeof(SExternalWindowOperator));
2,834✔
668
  if (!pMlExtInfo->pExtW) {
2,834✔
669
    code = terrno;
×
670
    goto _error;
×
671
  }
672

673
  SExternalWindowOperator* pExtW = pMlExtInfo->pExtW;
2,834✔
674
  SExprSupp* pSup = &pOperator->exprSupp;
2,834✔
675
  pSup->hasWindowOrGroup = true;
2,834✔
676
  pMlExtInfo->curTs = INT64_MIN;
2,834✔
677

678
  pExtW->primaryTsIndex = ((SColumnNode*)pPhynode->window.pTspk)->slotId;
2,834✔
679
  pExtW->mode = pPhynode->window.pProjs ? EEXT_MODE_SCALAR : EEXT_MODE_AGG;
2,834✔
680
  pExtW->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC;
2,834✔
681
  pExtW->binfo.outputTsOrder = pExtW->binfo.inputTsOrder;
2,834✔
682

683
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
2,834✔
684
  initResultSizeInfo(&pOperator->resultInfo, 4096);
2,834✔
685

686
  int32_t num = 0;
2,834✔
687
  SExprInfo* pExprInfo = NULL;
2,834✔
688
  code = createExprInfo(pPhynode->window.pFuncs, NULL, &pExprInfo, &num);
2,834✔
689
  QUERY_CHECK_CODE(code, lino, _error);
2,834✔
690

691
  if (pExtW->mode == EEXT_MODE_AGG) {
2,834✔
692
    code = initAggSup(pSup, &pExtW->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
2,834✔
693
                      &pTaskInfo->storageAPI.functionStore);
694
    QUERY_CHECK_CODE(code, lino, _error);
2,834✔
695
  }
696

697
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhynode->window.node.pOutputDataBlockDesc);
2,834✔
698
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
2,834✔
699
  initBasicInfo(&pExtW->binfo, pResBlock);
2,834✔
700

701
  pExtW->pWins = taosArrayInit(4096, sizeof(STimeWindow));
2,834✔
702
  if (!pExtW->pWins) QUERY_CHECK_CODE(terrno, lino, _error);
2,834✔
703

704
  pExtW->pWinRowIdx = taosArrayInit(4096, sizeof(int64_t));
2,834✔
705
  TSDB_CHECK_NULL(pExtW->pWinRowIdx, code, lino, _error, terrno);
2,834✔
706

707
  initResultRowInfo(&pExtW->binfo.resultRowInfo);
2,834✔
708
  code = blockDataEnsureCapacity(pExtW->binfo.pRes, pOperator->resultInfo.capacity);
2,834✔
709
  QUERY_CHECK_CODE(code, lino, _error);
2,834✔
710
  setOperatorInfo(pOperator, "MergeAlignedExternalWindowOperator", QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW, false, OP_NOT_OPENED, pMlExtInfo, pTaskInfo);
2,834✔
711
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignExtWinNext, NULL,
2,834✔
712
                                         destroyMergeAlignedExternalWindowOperator, optrDefaultBufFn, NULL,
713
                                         optrDefaultGetNextExtFn, NULL);
714
  setOperatorResetStateFn(pOperator, resetMergeAlignedExtWinOperator);
2,834✔
715

716
  code = appendDownstream(pOperator, &pDownstream, 1);
2,834✔
717
  QUERY_CHECK_CODE(code, lino, _error);
2,834✔
718
  *ppOptrOut = pOperator;
2,834✔
719
  return code;
2,834✔
720
  
721
_error:
×
722
  if (pMlExtInfo) destroyMergeAlignedExternalWindowOperator(pMlExtInfo);
×
723
  destroyOperatorAndDownstreams(pOperator, &pDownstream, 1);
×
724
  pTaskInfo->code = code;
×
725
  return code;
×
726
}
727

728
static int32_t resetExternalWindowExprSupp(SExternalWindowOperator* pExtW, SExecTaskInfo* pTaskInfo,
148,928✔
729
                                           SExternalWindowPhysiNode* pPhynode) {
730
  int32_t    code = 0, lino = 0, num = 0;
148,928✔
731
  SExprInfo* pExprInfo = NULL;
148,928✔
732
  cleanupExprSuppWithoutFilter(&pExtW->scalarSupp);
148,928✔
733

734
  SNodeList* pNodeList = NULL;
148,928✔
735
  if (pPhynode->window.pProjs) {
148,928✔
736
    pNodeList = pPhynode->window.pProjs;
×
737
  } else {
738
    pNodeList = pPhynode->window.pExprs;
148,928✔
739
  }
740

741
  code = createExprInfo(pNodeList, NULL, &pExprInfo, &num);
148,928✔
742
  QUERY_CHECK_CODE(code, lino, _error);
148,928✔
743
  code = initExprSupp(&pExtW->scalarSupp, pExprInfo, num, &pTaskInfo->storageAPI.functionStore);
148,928✔
744
  QUERY_CHECK_CODE(code, lino, _error);
148,928✔
745
  return code;
148,928✔
746
_error:
×
747
  if (code != TSDB_CODE_SUCCESS) {
×
748
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
749
    pTaskInfo->code = code;
×
750
  }
751
  return code;
×
752
}
753

754
static int32_t resetExternalWindowOperator(SOperatorInfo* pOperator) {
149,026✔
755
  int32_t code = 0, lino = 0;
149,026✔
756
  SExternalWindowOperator* pExtW = pOperator->info;
149,026✔
757
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
149,026✔
758
  SExternalWindowPhysiNode* pPhynode = (SExternalWindowPhysiNode*)pOperator->pPhyNode;
149,026✔
759
  pOperator->status = OP_NOT_OPENED;
149,026✔
760

761
  //resetBasicOperatorState(&pExtW->binfo);
762
  initResultRowInfo(&pExtW->binfo.resultRowInfo);
149,026✔
763

764
  pExtW->outputWinId = 0;
148,928✔
765
  pExtW->lastWinId = -1;
148,928✔
766
  taosArrayClear(pExtW->pWins);
148,928✔
767
  extWinRecycleBlkNode(pExtW, &pExtW->pLastBlkNode);
148,928✔
768

769
/*
770
  int32_t code = blockDataEnsureCapacity(pExtW->binfo.pRes, pOperator->resultInfo.capacity);
771
  if (code == 0) {
772
    code = resetAggSup(&pOperator->exprSupp, &pExtW->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
773
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
774
                       &pTaskInfo->storageAPI.functionStore);
775
  }
776
*/
777
  TAOS_CHECK_EXIT(resetExternalWindowExprSupp(pExtW, pTaskInfo, pPhynode));
148,928✔
778
  colDataDestroy(&pExtW->twAggSup.timeWindowData);
148,928✔
779
  TAOS_CHECK_EXIT(initExecTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pTaskInfo->window));
148,928✔
780

781
  pExtW->outWinIdx = 0;
149,026✔
782
  pExtW->lastSKey = INT64_MIN;
149,026✔
783

784
  qDebug("%s ext window stat at reset, created:%" PRId64 ", destroyed:%" PRId64 ", recycled:%" PRId64 ", reused:%" PRId64 ", append:%" PRId64, 
149,026✔
785
      pTaskInfo->id.str, pExtW->stat.resBlockCreated, pExtW->stat.resBlockDestroyed, pExtW->stat.resBlockRecycled, 
786
      pExtW->stat.resBlockReused, pExtW->stat.resBlockAppend);
787

788
_exit:
13,699✔
789

790
  if (code) {
149,026✔
791
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
792
  }
793
  
794
  return code;
149,026✔
795
}
796

797
static EDealRes extWinHasCountLikeFunc(SNode* pNode, void* res) {
713,277✔
798
  if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
713,277✔
799
    SFunctionNode* pFunc = (SFunctionNode*)pNode;
250,633✔
800
    if (fmIsCountLikeFunc(pFunc->funcId) || (pFunc->hasOriginalFunc && fmIsCountLikeFunc(pFunc->originalFuncId))) {
250,633✔
801
      *(bool*)res = true;
69,232✔
802
      return DEAL_RES_END;
70,011✔
803
    }
804
  }
805
  return DEAL_RES_CONTINUE;
651,609✔
806
}
807

808

809
static int32_t extWinCreateEmptyInputBlock(SOperatorInfo* pOperator, SSDataBlock** ppBlock) {
70,011✔
810
  int32_t code = TSDB_CODE_SUCCESS;
70,011✔
811
  int32_t lino = 0;
70,011✔
812
  SSDataBlock* pBlock = NULL;
70,011✔
813
  if (!tsCountAlwaysReturnValue) {
70,011✔
814
    return TSDB_CODE_SUCCESS;
×
815
  }
816

817
  SExternalWindowOperator* pExtW = pOperator->info;
70,011✔
818

819
  if (!pExtW->hasCountFunc) {
70,011✔
820
    return TSDB_CODE_SUCCESS;
×
821
  }
822

823
  code = createDataBlock(&pBlock);
70,405✔
824
  if (code) {
69,630✔
825
    return code;
×
826
  }
827

828
  pBlock->info.rows = 1;
69,630✔
829
  pBlock->info.capacity = 0;
70,020✔
830

831
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
346,550✔
832
    SColumnInfoData colInfo = {0};
276,517✔
833
    colInfo.hasNull = true;
275,742✔
834
    colInfo.info.type = TSDB_DATA_TYPE_NULL;
275,742✔
835
    colInfo.info.bytes = 1;
275,742✔
836

837
    SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
275,742✔
838
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
578,720✔
839
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
302,190✔
840
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
302,571✔
841
        int32_t slotId = pFuncParam->pCol->slotId;
208,817✔
842
        int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
208,817✔
843
        if (slotId >= numOfCols) {
208,427✔
844
          code = taosArrayEnsureCap(pBlock->pDataBlock, slotId + 1);
144,963✔
845
          QUERY_CHECK_CODE(code, lino, _end);
144,582✔
846

847
          for (int32_t k = numOfCols; k < slotId + 1; ++k) {
294,216✔
848
            void* tmp = taosArrayPush(pBlock->pDataBlock, &colInfo);
149,638✔
849
            QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
149,634✔
850
          }
851
        }
852
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
93,767✔
853
        // do nothing
854
      }
855
    }
856
  }
857

858
  code = blockDataEnsureCapacity(pBlock, pBlock->info.rows);
70,414✔
859
  QUERY_CHECK_CODE(code, lino, _end);
70,795✔
860

861
  for (int32_t i = 0; i < blockDataGetNumOfCols(pBlock); ++i) {
221,204✔
862
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
150,409✔
863
    QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
150,409✔
864
    colDataSetNULL(pColInfoData, 0);
865
  }
866
  *ppBlock = pBlock;
70,795✔
867

868
_end:
70,795✔
869
  if (code != TSDB_CODE_SUCCESS) {
70,795✔
870
    blockDataDestroy(pBlock);
×
871
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
872
  }
873
  return code;
70,401✔
874
}
875

876

877

878
static int extWinTsWinCompare(const void* pLeft, const void* pRight) {
6,364,609✔
879
  int64_t ts = *(int64_t*)pLeft;
6,364,609✔
880
  SExtWinTimeWindow* pWin = (SExtWinTimeWindow*)pRight;
6,364,609✔
881
  if (ts < pWin->tw.skey) {
6,364,609✔
882
    return -1;
2,233,394✔
883
  }
884
  if (ts >= pWin->tw.ekey) {
4,131,215✔
885
    return 1;
121,330✔
886
  }
887

888
  return 0;
4,009,885✔
889
}
890

891

892
static int32_t extWinGetMultiTbWinFromTs(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW, int64_t* tsCol, int64_t rowNum, int32_t* startPos) {
2,322,643✔
893
  int32_t idx = taosArraySearchIdx(pExtW->pWins, tsCol, extWinTsWinCompare, TD_EQ);
2,322,643✔
894
  if (idx >= 0) {
2,322,643✔
895
    *startPos = 0;
2,321,785✔
896
    return idx;
2,321,785✔
897
  }
898

899
  SExtWinTimeWindow* pWin = NULL;
858✔
900
  int32_t w = 0;
858✔
901
  for (int64_t i = 1; i < rowNum; ++i) {
1,716✔
902
    for (; w < pExtW->pWins->size; ++w) {
1,716✔
903
      pWin = TARRAY_GET_ELEM(pExtW->pWins, w);
1,716✔
904
      if (tsCol[i] < pWin->tw.skey) {
1,716✔
905
        break;
858✔
906
      }
907
      
908
      if (tsCol[i] < pWin->tw.ekey) {
858✔
909
        *startPos = i;
×
910
        return w;
×
911
      }
912
    }
913
  }
914

915
  return -1;
858✔
916
}
917

918
static int32_t extWinGetNoOvlpWin(SOperatorInfo* pOperator, int64_t* tsCol, int32_t* startPos, SDataBlockInfo* pInfo, SExtWinTimeWindow** ppWin, int32_t* winRows) {
120,375✔
919
  SExternalWindowOperator* pExtW = pOperator->info;
120,375✔
920
  if ((*startPos) >= pInfo->rows) {
120,375✔
921
    qDebug("%s %s blk rowIdx %d reach the end, size: %d, skip block", 
31,249✔
922
        GET_TASKID(pOperator->pTaskInfo), __func__, *startPos, (int32_t)pInfo->rows);
923
    *ppWin = NULL;
31,249✔
924
    return TSDB_CODE_SUCCESS;
31,249✔
925
  }
926
  
927
  if (pExtW->blkWinIdx < 0) {
89,126✔
928
    pExtW->blkWinIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
31,638✔
929
  } else {
930
    pExtW->blkWinIdx++;
57,488✔
931
  }
932

933
  if (pExtW->blkWinIdx >= pExtW->pWins->size) {
89,126✔
934
    qDebug("%s %s ext win blk idx %d reach the end, size: %d, skip block", 
×
935
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (int32_t)pExtW->pWins->size);
936
    *ppWin = NULL;
×
937
    return TSDB_CODE_SUCCESS;
×
938
  }
939
  
940
  SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
89,126✔
941
  if (tsCol[pInfo->rows - 1] < pWin->tw.skey) {
89,126✔
942
    qDebug("%s %s block end ts %" PRId64 " is small than curr win %d skey %" PRId64 ", skip block", 
×
943
        GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[pInfo->rows - 1], pExtW->blkWinIdx, pWin->tw.skey);
944
    *ppWin = NULL;
×
945
    return TSDB_CODE_SUCCESS;
×
946
  }
947

948
  int32_t r = *startPos;
89,126✔
949

950
  qDebug("%s %s start to get novlp win from winIdx %d rowIdx %d", GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, r);
89,126✔
951

952
  // TODO handle desc order
953
  for (; pExtW->blkWinIdx < pExtW->pWins->size; ++pExtW->blkWinIdx) {
98,354✔
954
    pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
98,354✔
955
    for (; r < pInfo->rows; ++r) {
121,284✔
956
      if (tsCol[r] < pWin->tw.skey) {
120,895✔
957
        continue;
22,930✔
958
      }
959

960
      if (tsCol[r] < pWin->tw.ekey) {
97,965✔
961
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
88,737✔
962
        *ppWin = pWin;
88,737✔
963
        *startPos = r;
88,737✔
964
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
88,737✔
965

966
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
88,737✔
967
            GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, r, tsCol[r], tsCol[r + *winRows - 1]);
968
        
969
        return TSDB_CODE_SUCCESS;
88,737✔
970
      }
971

972
      break;
9,228✔
973
    }
974

975
    if (r == pInfo->rows) {
9,617✔
976
      break;
389✔
977
    }
978
  }
979

980
  qDebug("%s %s no more ext win in block, TR[%" PRId64 ", %" PRId64 "), skip it", 
389✔
981
      GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
982

983
  *ppWin = NULL;
389✔
984
  return TSDB_CODE_SUCCESS;
389✔
985
}
986

987
static int32_t extWinGetOvlpWin(SOperatorInfo* pOperator, int64_t* tsCol, int32_t* startPos, SDataBlockInfo* pInfo, SExtWinTimeWindow** ppWin, int32_t* winRows) {
104,287✔
988
  SExternalWindowOperator* pExtW = pOperator->info;
104,287✔
989
  if (pExtW->blkWinIdx < 0) {
104,287✔
990
    pExtW->blkWinIdx = pExtW->blkWinStartIdx;
5,461✔
991
  } else {
992
    pExtW->blkWinIdx++;
98,826✔
993
  }
994

995
  if (pExtW->blkWinIdx >= pExtW->pWins->size) {
104,287✔
996
    qDebug("%s %s ext win blk idx %d reach the end, size: %d, skip block", 
5,072✔
997
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (int32_t)pExtW->pWins->size);
998
    *ppWin = NULL;
5,072✔
999
    return TSDB_CODE_SUCCESS;
5,072✔
1000
  }
1001
  
1002
  SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
99,215✔
1003
  if (tsCol[pInfo->rows - 1] < pWin->tw.skey) {
99,215✔
1004
    qDebug("%s %s block end ts %" PRId64 " is small than curr win %d skey %" PRId64 ", skip block", 
×
1005
        GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[pInfo->rows - 1], pExtW->blkWinIdx, pWin->tw.skey);
1006
    *ppWin = NULL;
×
1007
    return TSDB_CODE_SUCCESS;
×
1008
  }
1009

1010
  int64_t r = 0;
99,215✔
1011

1012
  qDebug("%s %s start to get ovlp win from winIdx %d rowIdx %d", GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pExtW->blkRowStartIdx);
99,215✔
1013
  
1014
  // TODO handle desc order
1015
  for (; pExtW->blkWinIdx < pExtW->pWins->size; ++pExtW->blkWinIdx) {
105,439✔
1016
    pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
105,439✔
1017
    for (r = pExtW->blkRowStartIdx; r < pInfo->rows; ++r) {
197,288✔
1018
      if (tsCol[r] < pWin->tw.skey) {
196,899✔
1019
        pExtW->blkRowStartIdx = r + 1;
91,849✔
1020
        continue;
91,849✔
1021
      }
1022

1023
      if (tsCol[r] < pWin->tw.ekey) {
105,050✔
1024
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
98,826✔
1025
        *ppWin = pWin;
98,826✔
1026
        *startPos = r;
98,826✔
1027
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
98,826✔
1028

1029
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
98,826✔
1030
            GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, (int32_t)r, tsCol[r], tsCol[r + *winRows - 1]);
1031
        
1032
        if ((r + *winRows) < pInfo->rows) {
98,826✔
1033
          pExtW->blkWinStartIdx = pExtW->blkWinIdx + 1;
93,754✔
1034
          pExtW->blkWinStartSet = true;
93,754✔
1035
        }
1036
        
1037
        return TSDB_CODE_SUCCESS;
98,826✔
1038
      }
1039

1040
      break;
6,224✔
1041
    }
1042

1043
    if (r >= pInfo->rows) {
6,613✔
1044
      if (!pExtW->blkWinStartSet) {
389✔
1045
        pExtW->blkWinStartIdx = pExtW->blkWinIdx;
389✔
1046
      }
1047
      
1048
      break;
389✔
1049
    }
1050
  }
1051

1052
  qDebug("%s %s no more ext win in block, TR[%" PRId64 ", %" PRId64 "), skip it", 
389✔
1053
      GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1054

1055
  *ppWin = NULL;
389✔
1056
  return TSDB_CODE_SUCCESS;
389✔
1057
}
1058

1059

1060
static int32_t extWinGetMultiTbNoOvlpWin(SOperatorInfo* pOperator, int64_t* tsCol, int32_t* startPos, SDataBlockInfo* pInfo, SExtWinTimeWindow** ppWin, int32_t* winRows) {
11,884,422✔
1061
  SExternalWindowOperator* pExtW = pOperator->info;
11,884,422✔
1062
  if ((*startPos) >= pInfo->rows) {
11,885,184✔
1063
    qDebug("%s %s blk rowIdx %d reach the end, size: %d, skip block", 
2,321,785✔
1064
        GET_TASKID(pOperator->pTaskInfo), __func__, *startPos, (int32_t)pInfo->rows);
1065
    *ppWin = NULL;
2,322,166✔
1066
    return TSDB_CODE_SUCCESS;
2,321,785✔
1067
  }
1068
  
1069
  if (pExtW->blkWinIdx < 0) {
9,563,399✔
1070
    pExtW->blkWinIdx = extWinGetMultiTbWinFromTs(pOperator, pExtW, tsCol, pInfo->rows, startPos);
2,322,643✔
1071
    if (pExtW->blkWinIdx < 0) {
2,321,500✔
1072
      qDebug("%s %s blk TR[%" PRId64 ", %" PRId64 ") not in any win, skip block", 
858✔
1073
          GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1074
      *ppWin = NULL;
858✔
1075
      return TSDB_CODE_SUCCESS;
858✔
1076
    }
1077

1078
    extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
2,321,785✔
1079
    *ppWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
2,321,785✔
1080
    *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, *startPos, (*ppWin)->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
2,321,785✔
1081

1082
    qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
2,321,785✔
1083
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (*ppWin)->tw.skey, (*ppWin)->tw.ekey, *winRows, *startPos, tsCol[*startPos], tsCol[*startPos + *winRows - 1]);
1084
    
1085
    return TSDB_CODE_SUCCESS;
2,321,785✔
1086
  } else {
1087
    pExtW->blkWinIdx++;
7,240,375✔
1088
  }
1089

1090
  if (pExtW->blkWinIdx >= pExtW->pWins->size) {
7,240,756✔
1091
    qDebug("%s %s ext win blk idx %d reach the end, size: %d, skip block", 
×
1092
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (int32_t)pExtW->pWins->size);
1093
    *ppWin = NULL;
×
1094
    return TSDB_CODE_SUCCESS;
×
1095
  }
1096
  
1097
  SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
7,239,946✔
1098
  if (tsCol[pInfo->rows - 1] < pWin->tw.skey) {
7,239,946✔
1099
    qDebug("%s %s block end ts %" PRId64 " is small than curr win %d skey %" PRId64 ", skip block", 
×
1100
        GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[pInfo->rows - 1], pExtW->blkWinIdx, pWin->tw.skey);
1101
    *ppWin = NULL;
×
1102
    return TSDB_CODE_SUCCESS;
×
1103
  }
1104

1105
  int32_t r = *startPos;
7,239,565✔
1106

1107
  qDebug("%s %s start to get mnovlp win from winIdx %d rowIdx %d", GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, r);
7,239,946✔
1108

1109
  // TODO handle desc order
1110
  for (; pExtW->blkWinIdx < pExtW->pWins->size; ++pExtW->blkWinIdx) {
7,240,756✔
1111
    pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
7,240,756✔
1112
    for (; r < pInfo->rows; ++r) {
7,242,886✔
1113
      if (tsCol[r] < pWin->tw.skey) {
7,242,886✔
1114
        continue;
2,511✔
1115
      }
1116

1117
      if (tsCol[r] < pWin->tw.ekey) {
7,240,756✔
1118
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
7,240,756✔
1119
        *ppWin = pWin;
7,240,375✔
1120
        *startPos = r;
7,240,375✔
1121
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
7,240,375✔
1122

1123
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
7,240,756✔
1124
            GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, r, tsCol[r], tsCol[r + *winRows - 1]);
1125
        
1126
        return TSDB_CODE_SUCCESS;
7,240,756✔
1127
      }
1128

1129
      break;
×
1130
    }
1131

1132
    if (r == pInfo->rows) {
×
1133
      break;
×
1134
    }
1135
  }
1136

1137
  qDebug("%s %s no more ext win in block, TR[%" PRId64 ", %" PRId64 "), skip it", 
×
1138
      GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1139

1140
  *ppWin = NULL;
×
1141
  return TSDB_CODE_SUCCESS;
×
1142
}
1143

1144
static int32_t extWinGetFirstWinFromTs(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW, int64_t* tsCol,
1,806,900✔
1145
                                       int64_t rowNum, int32_t* startPos) {
1146
  SExtWinTimeWindow* pWin = NULL;
1,806,900✔
1147
  int32_t            idx = taosArraySearchIdx(pExtW->pWins, tsCol, extWinTsWinCompare, TD_EQ);
1,806,900✔
1148
  if (idx >= 0) {
1,806,900✔
1149
    for (int i = idx - 1; i >= 0; --i) {
1,688,100✔
1150
      pWin = TARRAY_GET_ELEM(pExtW->pWins, i);
×
1151
      if (extWinTsWinCompare(tsCol, pWin) == 0) {
×
1152
        idx = i;
×
1153
      } else {
1154
        break;
×
1155
      }
1156
    }
1157
    *startPos = 0;
1,688,100✔
1158
    return idx;
1,688,100✔
1159
  }
1160

1161
  pWin = NULL;
118,800✔
1162
  int32_t w = 0;
118,800✔
1163
  for (int64_t i = 1; i < rowNum; ++i) {
238,394✔
1164
    for (; w < pExtW->pWins->size; ++w) {
278,094✔
1165
      pWin = TARRAY_GET_ELEM(pExtW->pWins, w);
278,094✔
1166
      if (tsCol[i] < pWin->tw.skey) {
278,094✔
1167
        break;
119,594✔
1168
      }
1169

1170
      if (tsCol[i] < pWin->tw.ekey) {
158,500✔
1171
        *startPos = i;
39,700✔
1172
        return w;
39,700✔
1173
      }
1174
    }
1175
  }
1176

1177
  return -1;
79,100✔
1178
}
1179

1180
static int32_t extWinGetMultiTbOvlpWin(SOperatorInfo* pOperator, int64_t* tsCol, int32_t* startPos, SDataBlockInfo* pInfo, SExtWinTimeWindow** ppWin, int32_t* winRows) {
4,164,000✔
1181
  SExternalWindowOperator* pExtW = pOperator->info;
4,164,000✔
1182
  if (pExtW->blkWinIdx < 0) {
4,164,000✔
1183
    pExtW->blkWinIdx = extWinGetFirstWinFromTs(pOperator, pExtW, tsCol, pInfo->rows, startPos);
1,806,900✔
1184
    if (pExtW->blkWinIdx < 0) {
1,806,900✔
1185
      qDebug("%s %s blk TR[%" PRId64 ", %" PRId64 ") not in any win, skip block", 
79,100✔
1186
          GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1187
      *ppWin = NULL;
79,100✔
1188
      return TSDB_CODE_SUCCESS;
79,100✔
1189
    }
1190

1191
    extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
1,727,800✔
1192
    *ppWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
1,727,800✔
1193
    *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, *startPos, (*ppWin)->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
1,727,800✔
1194
    
1195
    qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
1,727,800✔
1196
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (*ppWin)->tw.skey, (*ppWin)->tw.ekey, *winRows, *startPos, tsCol[*startPos], tsCol[*startPos + *winRows - 1]);
1197
    
1198
    return TSDB_CODE_SUCCESS;
1,727,800✔
1199
  } else {
1200
    pExtW->blkWinIdx++;
2,357,100✔
1201
  }
1202

1203
  if (pExtW->blkWinIdx >= pExtW->pWins->size) {
2,357,100✔
1204
    qDebug("%s %s ext win blk idx %d reach the end, size: %d, skip block", 
1,609,600✔
1205
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (int32_t)pExtW->pWins->size);
1206
    *ppWin = NULL;
1,609,600✔
1207
    return TSDB_CODE_SUCCESS;
1,609,600✔
1208
  }
1209
  
1210
  SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
747,500✔
1211
  if (tsCol[pInfo->rows - 1] < pWin->tw.skey) {
747,500✔
1212
    qDebug("%s %s block end ts %" PRId64 " is small than curr win %d skey %" PRId64 ", skip block", 
118,200✔
1213
        GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[pInfo->rows - 1], pExtW->blkWinIdx, pWin->tw.skey);
1214
    *ppWin = NULL;
118,200✔
1215
    return TSDB_CODE_SUCCESS;
118,200✔
1216
  }
1217

1218
  int64_t r = 0;
629,300✔
1219

1220
  qDebug("%s %s start to get movlp win from winIdx %d rowIdx %d", GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pExtW->blkRowStartIdx);
629,300✔
1221

1222
  // TODO handle desc order
1223
  for (; pExtW->blkWinIdx < pExtW->pWins->size; ++pExtW->blkWinIdx) {
629,300✔
1224
    pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
629,300✔
1225
    for (r = pExtW->blkRowStartIdx; r < pInfo->rows; ++r) {
2,085,960✔
1226
      if (tsCol[r] < pWin->tw.skey) {
2,085,960✔
1227
        pExtW->blkRowStartIdx = r + 1;
1,456,660✔
1228
        continue;
1,456,660✔
1229
      }
1230

1231
      if (tsCol[r] < pWin->tw.ekey) {
629,300✔
1232
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
629,300✔
1233
        *ppWin = pWin;
629,300✔
1234
        *startPos = r;
629,300✔
1235
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
629,300✔
1236

1237
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
629,300✔
1238
            GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, (int32_t)r, tsCol[r], tsCol[r + *winRows - 1]);
1239
        
1240
        return TSDB_CODE_SUCCESS;
629,300✔
1241
      }
1242

1243
      break;
×
1244
    }
1245

1246
    if (r >= pInfo->rows) {
×
1247
      break;
×
1248
    }
1249
  }
1250

1251
  qDebug("%s %s no more ext win in block, TR[%" PRId64 ", %" PRId64 "), skip it", 
×
1252
      GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1253

1254
  *ppWin = NULL;
×
1255
  return TSDB_CODE_SUCCESS;
×
1256
}
1257

1258

1259
static int32_t extWinGetWinStartPos(STimeWindow win, const SDataBlockInfo* pBlockInfo, int32_t lastEndPos, int32_t order, int32_t* nextPos, int64_t* tsCol) {
×
1260
  bool ascQuery = order == TSDB_ORDER_ASC;
×
1261

1262
  if (win.ekey <= pBlockInfo->window.skey && ascQuery) {
×
1263
    return -2;
×
1264
  }
1265
//if (win.skey > pBlockInfo->window.ekey && !ascQuery) return -2;
1266

1267
  if (win.skey > pBlockInfo->window.ekey && ascQuery) return -1;
×
1268
//if (win.ekey < pBlockInfo->window.skey && !ascQuery) return -1;
1269

1270
  while (true) {
1271
    if (win.ekey <= tsCol[lastEndPos + 1] && ascQuery) return -2;
×
1272
    if (win.skey <= tsCol[lastEndPos + 1] && ascQuery) break;
×
1273
    lastEndPos++;
×
1274
  }
1275

1276
  *nextPos = lastEndPos + 1;
×
1277
  return 0;
×
1278
}
1279

1280
static int32_t extWinAggSetWinOutputBuf(SOperatorInfo* pOperator, SExtWinTimeWindow* win, SExprSupp* pSupp, 
9,887,629✔
1281
                                     SAggSupporter* pAggSup, SExecTaskInfo* pTaskInfo) {
1282
  int32_t code = 0, lino = 0;
9,887,629✔
1283
  SResultRow* pResultRow = NULL;
9,887,629✔
1284
  SExternalWindowOperator* pExtW = (SExternalWindowOperator*)pOperator->info;
9,887,629✔
1285
  
1286
#if 0
1287
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
1288
                                                  true, tableGroupId, pTaskInfo, true, pAggSup, true);
1289
  if (pResultRow == NULL) {
1290
    qError("failed to set result output buffer, error:%s", tstrerror(pTaskInfo->code));
1291
    return pTaskInfo->code;
1292
  }
1293

1294
  qDebug("current result rows num:%d", tSimpleHashGetSize(pAggSup->pResultRowHashTable));
1295

1296
#else
1297
  if (win->winOutIdx >= 0) {
9,887,629✔
1298
    pResultRow = (SResultRow*)((char*)pExtW->pResultRow + win->winOutIdx * pAggSup->resultRowSize);
9,442,435✔
1299
  } else {
1300
    win->winOutIdx = pExtW->outWinIdx++;
445,194✔
1301
    
1302
    qDebug("set window [%" PRId64 ", %" PRId64 "] outIdx:%d", win->tw.skey, win->tw.ekey, win->winOutIdx);
445,194✔
1303

1304
    pResultRow = (SResultRow*)((char*)pExtW->pResultRow + win->winOutIdx * pAggSup->resultRowSize);
445,194✔
1305
    
1306
    memset(pResultRow, 0, pAggSup->resultRowSize);
445,194✔
1307

1308
    pResultRow->winIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
445,194✔
1309
    TAOS_SET_POBJ_ALIGNED(&pResultRow->win, &win->tw);
445,194✔
1310
  }
1311
#endif
1312

1313
  // set time window for current result
1314
  TAOS_CHECK_EXIT(setResultRowInitCtx(pResultRow, pSupp->pCtx, pSupp->numOfExprs, pSupp->rowEntryInfoOffset));
9,888,010✔
1315

1316
_exit:
9,887,629✔
1317
  
1318
  if (code) {
9,887,629✔
1319
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1320
  }
1321

1322
  return code;
9,887,629✔
1323
}
1324

1325
static int32_t extWinAggDo(SOperatorInfo* pOperator, int32_t startPos, int32_t forwardRows,
12,205,898✔
1326
                                  SSDataBlock* pInputBlock) {
1327
  if (forwardRows == 0) return 0;
12,205,898✔
1328
  SExprSupp*               pSup = &pOperator->exprSupp;
12,205,898✔
1329
  SExternalWindowOperator* pExtW = pOperator->info;
12,205,898✔
1330
  return applyAggFunctionOnPartialTuples(pOperator->pTaskInfo, pSup->pCtx, &pExtW->twAggSup.timeWindowData, startPos,
24,398,457✔
1331
                                         forwardRows, pInputBlock->info.rows, pSup->numOfExprs);
12,205,898✔
1332

1333
}
1334

1335
static bool extWinLastWinClosed(SExternalWindowOperator* pExtW) {
836✔
1336
  if (pExtW->outWinIdx <= 0 || (pExtW->multiTableMode && !pExtW->inputHasOrder)) {
836✔
1337
    return false;
836✔
1338
  }
1339

1340
  if (NULL == pExtW->timeRangeExpr || !pExtW->timeRangeExpr->needCalc) {
×
1341
    return true;
×
1342
  }
1343

1344
  SList* pList = taosArrayGetP(pExtW->pOutputBlocks, pExtW->outWinIdx - 1);
×
1345
  if (0 == listNEles(pList)) {
×
1346
    return true;
×
1347
  }
1348

1349
  SListNode* pNode = listTail(pList);
×
1350
  SArray* pBlkWinIdx = *((SArray**)pNode->data + 1);
×
1351
  int64_t* pIdx = taosArrayGetLast(pBlkWinIdx);
×
1352
  if (pIdx && *(int32_t*)pIdx < pExtW->blkWinStartIdx) {
×
1353
    return true;
×
1354
  }
1355

1356
  return false;
×
1357
}
1358

1359
static int32_t extWinGetWinResBlock(SOperatorInfo* pOperator, int32_t rows, SExtWinTimeWindow* pWin, SSDataBlock** ppRes, SArray** ppIdx) {
2,090✔
1360
  SExternalWindowOperator* pExtW = pOperator->info;
2,090✔
1361
  SList*                   pList = NULL;
2,090✔
1362
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
2,090✔
1363
  
1364
  if (pWin->winOutIdx >= 0) {
2,090✔
1365
    pList = taosArrayGetP(pExtW->pOutputBlocks, pWin->winOutIdx);
1,254✔
1366
  } else {
1367
    if (extWinLastWinClosed(pExtW)) {
836✔
1368
      pWin->winOutIdx = pExtW->outWinIdx - 1;
×
1369
      pList = taosArrayGetP(pExtW->pOutputBlocks, pWin->winOutIdx);
×
1370
    } else {
1371
      pWin->winOutIdx = pExtW->outWinIdx++;
836✔
1372
      pList = tdListNew(POINTER_BYTES * 2);
836✔
1373
      TSDB_CHECK_NULL(pList, code, lino, _exit, terrno);
836✔
1374
      SList** ppList = taosArrayGet(pExtW->pOutputBlocks, pWin->winOutIdx);
836✔
1375
      extWinRecycleBlockList(pExtW, ppList);
836✔
1376
      *ppList = pList;
836✔
1377
    }
1378
  }
1379
  
1380
  TAOS_CHECK_EXIT(extWinGetLastBlockFromList(pExtW, pList, rows, ppRes, ppIdx));
2,090✔
1381

1382
_exit:
2,090✔
1383

1384
  if (code) {
2,090✔
1385
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1386
  }
1387

1388
  return code;
2,090✔
1389
}
1390

1391
static int32_t extWinProjectDo(SOperatorInfo* pOperator, SSDataBlock* pInputBlock, int32_t startPos, int32_t rows, SExtWinTimeWindow* pWin) {
2,090✔
1392
  SExternalWindowOperator* pExtW = pOperator->info;
2,090✔
1393
  SExprSupp*               pExprSup = &pExtW->scalarSupp;
2,090✔
1394
  SSDataBlock*             pResBlock = NULL;
2,090✔
1395
  SArray*                  pIdx = NULL;
2,090✔
1396
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
2,090✔
1397
  
1398
  TAOS_CHECK_EXIT(extWinGetWinResBlock(pOperator, rows, pWin, &pResBlock, &pIdx));
2,090✔
1399

1400
  qDebug("%s %s win[%" PRId64 ", %" PRId64 "] got res block %p winRowIdx %p, winOutIdx:%d, capacity:%d", 
2,090✔
1401
      pOperator->pTaskInfo->id.str, __func__, pWin->tw.skey, pWin->tw.ekey, pResBlock, pIdx, pWin->winOutIdx, pResBlock->info.capacity);
1402
  
1403
  if (!pExtW->pTmpBlock) {
2,090✔
1404
    TAOS_CHECK_EXIT(createOneDataBlock(pInputBlock, false, &pExtW->pTmpBlock));
418✔
1405
  } else {
1406
    blockDataCleanup(pExtW->pTmpBlock);
1,672✔
1407
  }
1408
  
1409
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pExtW->pTmpBlock, TMAX(1, rows)));
2,090✔
1410

1411
  qDebug("%s %s start to copy %d rows to tmp blk", pOperator->pTaskInfo->id.str, __func__, rows);
2,090✔
1412
  TAOS_CHECK_EXIT(blockDataMergeNRows(pExtW->pTmpBlock, pInputBlock, startPos, rows));
2,090✔
1413

1414
  qDebug("%s %s start to apply project to tmp blk", pOperator->pTaskInfo->id.str, __func__);
2,090✔
1415
  TAOS_CHECK_EXIT(projectApplyFunctionsWithSelect(pExprSup->pExprInfo, pResBlock, pExtW->pTmpBlock, pExprSup->pCtx, pExprSup->numOfExprs,
2,090✔
1416
        NULL, GET_STM_RTINFO(pOperator->pTaskInfo), true, pExprSup->hasIndefRowsFunc));
1417

1418
  TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pIdx, pResBlock, extWinGetCurWinIdx(pOperator->pTaskInfo), rows));
2,090✔
1419

1420
_exit:
2,090✔
1421

1422
  if (code) {
2,090✔
1423
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1424
  } else {
1425
    qDebug("%s %s project succeed", pOperator->pTaskInfo->id.str, __func__);
2,090✔
1426
  }
1427
  
1428
  return code;
2,090✔
1429
}
1430

1431
static int32_t extWinProjectOpen(SOperatorInfo* pOperator, SSDataBlock* pInputBlock) {
1,254✔
1432
  SExternalWindowOperator* pExtW = pOperator->info;
1,254✔
1433
  int64_t*                 tsCol = extWinExtractTsCol(pInputBlock, pExtW->primaryTsIndex, pOperator->pTaskInfo);
1,254✔
1434
  SExtWinTimeWindow*       pWin = NULL;
1,254✔
1435
  bool                     ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
1,254✔
1436
  int32_t                  startPos = 0, winRows = 0;
1,254✔
1437
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
1,254✔
1438
  
1439
  while (true) {
1440
    TAOS_CHECK_EXIT((*pExtW->getWinFp)(pOperator, tsCol, &startPos, &pInputBlock->info, &pWin, &winRows));
3,344✔
1441
    if (pWin == NULL) {
3,344✔
1442
      break;
1,254✔
1443
    }
1444

1445
    qDebug("%s ext window [%" PRId64 ", %" PRId64 ") project start, ascScan:%d, startPos:%d, winRows:%d",
2,090✔
1446
           GET_TASKID(pOperator->pTaskInfo), pWin->tw.skey, pWin->tw.ekey, ascScan, startPos, winRows);        
1447
    
1448
    TAOS_CHECK_EXIT(extWinProjectDo(pOperator, pInputBlock, startPos, winRows, pWin));
2,090✔
1449
    
1450
    startPos += winRows;
2,090✔
1451
  }
1452
  
1453
_exit:
1,254✔
1454

1455
  if (code) {
1,254✔
1456
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
1457
  }
1458

1459
  return code;
1,254✔
1460
}
1461

1462
static int32_t extWinIndefRowsDoImpl(SOperatorInfo* pOperator, SSDataBlock* pRes, SSDataBlock* pBlock) {
×
1463
  SExternalWindowOperator* pExtW = pOperator->info;
×
1464
  SOptrBasicInfo*     pInfo = &pExtW->binfo;
×
1465
  SExprSupp*          pSup = &pOperator->exprSupp;
×
1466
  SExecTaskInfo*      pTaskInfo = pOperator->pTaskInfo;
×
1467
  int32_t order = pInfo->inputTsOrder;
×
1468
  int32_t scanFlag = pBlock->info.scanFlag;
×
1469
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
×
1470

1471
  SExprSupp* pScalarSup = &pExtW->scalarSupp;
×
1472
  if (pScalarSup->pExprInfo != NULL) {
×
1473
    TAOS_CHECK_EXIT(projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
×
1474
                                 pExtW->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo)));
1475
  }
1476

1477
  TAOS_CHECK_EXIT(setInputDataBlock(pSup, pBlock, order, scanFlag, false));
×
1478

1479
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pRes, pRes->info.rows + pBlock->info.rows));
×
1480

1481
  TAOS_CHECK_EXIT(projectApplyFunctions(pSup->pExprInfo, pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
×
1482
                               pExtW->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo)));
1483

1484
_exit:
×
1485

1486
  if (code) {
×
1487
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __FUNCTION__, lino, tstrerror(code));
×
1488
  }
1489

1490
  return code;
×
1491
}
1492

1493
static int32_t extWinIndefRowsSetWinOutputBuf(SExternalWindowOperator* pExtW, SExtWinTimeWindow* win, SExprSupp* pSupp, 
×
1494
                                     SAggSupporter* pAggSup, SExecTaskInfo* pTaskInfo, bool reset) {
1495
  int32_t code = 0, lino = 0;
×
1496
  SResultRow* pResultRow = NULL;
×
1497

1498
  pResultRow = (SResultRow*)((char*)pExtW->pResultRow + win->winOutIdx * pAggSup->resultRowSize);
×
1499
  
1500
  qDebug("set window [%" PRId64 ", %" PRId64 "] outIdx:%d", win->tw.skey, win->tw.ekey, win->winOutIdx);
×
1501

1502
  if (reset) {
×
1503
    memset(pResultRow, 0, pAggSup->resultRowSize);
×
1504
    for (int32_t k = 0; k < pSupp->numOfExprs; ++k) {
×
1505
      SqlFunctionCtx* pCtx = &pSupp->pCtx[k];
×
1506
      pCtx->pOutput = NULL;
×
1507
    }
1508
  }
1509

1510
  TAOS_SET_POBJ_ALIGNED(&pResultRow->win, &win->tw);
×
1511

1512
  // set time window for current result
1513
  TAOS_CHECK_EXIT(setResultRowInitCtx(pResultRow, pSupp->pCtx, pSupp->numOfExprs, pSupp->rowEntryInfoOffset));
×
1514

1515
_exit:
×
1516
  
1517
  if (code) {
×
1518
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1519
  }
1520

1521
  return code;
×
1522
}
1523

1524
static int32_t extWinGetSetWinResBlockBuf(SOperatorInfo* pOperator, int32_t rows, SExtWinTimeWindow* pWin, SSDataBlock** ppRes, SArray** ppIdx) {
×
1525
  SExternalWindowOperator* pExtW = pOperator->info;
×
1526
  SList*                   pList = NULL;
×
1527
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
×
1528
  
1529
  if (pWin->winOutIdx >= 0) {
×
1530
    pList = taosArrayGetP(pExtW->pOutputBlocks, pWin->winOutIdx);
×
1531
    TAOS_CHECK_EXIT(extWinIndefRowsSetWinOutputBuf(pExtW, pWin, &pOperator->exprSupp, &pExtW->aggSup, pOperator->pTaskInfo, false));
×
1532
  } else {
1533
    if (extWinLastWinClosed(pExtW)) {
×
1534
      pWin->winOutIdx = pExtW->outWinIdx - 1;
×
1535
      pList = taosArrayGetP(pExtW->pOutputBlocks, pWin->winOutIdx);
×
1536
    } else {
1537
      pWin->winOutIdx = pExtW->outWinIdx++;
×
1538
      pList = tdListNew(POINTER_BYTES * 2);
×
1539
      TSDB_CHECK_NULL(pList, code, lino, _exit, terrno);
×
1540
      SList** ppList = taosArrayGet(pExtW->pOutputBlocks, pWin->winOutIdx);
×
1541
      extWinRecycleBlockList(pExtW, ppList);
×
1542
      *ppList = pList;
×
1543
    }
1544
    TAOS_CHECK_EXIT(extWinIndefRowsSetWinOutputBuf(pExtW, pWin, &pOperator->exprSupp, &pExtW->aggSup, pOperator->pTaskInfo, true));
×
1545
  }
1546
  
1547
  TAOS_CHECK_EXIT(extWinGetLastBlockFromList(pExtW, pList, rows, ppRes, ppIdx));
×
1548

1549
_exit:
×
1550

1551
  if (code) {
×
1552
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1553
  }
1554

1555
  return code;
×
1556
}
1557

1558

1559
static int32_t extWinIndefRowsDo(SOperatorInfo* pOperator, SSDataBlock* pInputBlock, int32_t startPos, int32_t rows, SExtWinTimeWindow* pWin) {
×
1560
  SExternalWindowOperator* pExtW = pOperator->info;
×
1561
  SSDataBlock*             pResBlock = NULL;
×
1562
  SArray*                  pIdx = NULL;
×
1563
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
×
1564
  
1565
  TAOS_CHECK_EXIT(extWinGetSetWinResBlockBuf(pOperator, rows, pWin, &pResBlock, &pIdx));
×
1566
  
1567
  if (!pExtW->pTmpBlock) {
×
1568
    TAOS_CHECK_EXIT(createOneDataBlock(pInputBlock, false, &pExtW->pTmpBlock));
×
1569
  } else {
1570
    blockDataCleanup(pExtW->pTmpBlock);
×
1571
  }
1572
  
1573
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pExtW->pTmpBlock, TMAX(1, rows)));
×
1574

1575
  TAOS_CHECK_EXIT(blockDataMergeNRows(pExtW->pTmpBlock, pInputBlock, startPos, rows));
×
1576
  TAOS_CHECK_EXIT(extWinIndefRowsDoImpl(pOperator, pResBlock, pExtW->pTmpBlock));
×
1577

1578
  TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pIdx, pResBlock, extWinGetCurWinIdx(pOperator->pTaskInfo), rows));
×
1579

1580
_exit:
×
1581

1582
  if (code) {
×
1583
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1584
  }
1585
  
1586
  return code;
×
1587
}
1588

1589

1590
static int32_t extWinIndefRowsOpen(SOperatorInfo* pOperator, SSDataBlock* pInputBlock) {
×
1591
  SExternalWindowOperator* pExtW = pOperator->info;
×
1592
  int64_t*                 tsCol = extWinExtractTsCol(pInputBlock, pExtW->primaryTsIndex, pOperator->pTaskInfo);
×
1593
  SExtWinTimeWindow*       pWin = NULL;
×
1594
  bool                     ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
×
1595
  int32_t                  startPos = 0, winRows = 0;
×
1596
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
×
1597
  
1598
  while (true) {
1599
    TAOS_CHECK_EXIT((*pExtW->getWinFp)(pOperator, tsCol, &startPos, &pInputBlock->info, &pWin, &winRows));
×
1600
    if (pWin == NULL) {
×
1601
      break;
×
1602
    }
1603

1604
    qDebug("%s ext window [%" PRId64 ", %" PRId64 ") indefRows start, ascScan:%d, startPos:%d, winRows:%d",
×
1605
           GET_TASKID(pOperator->pTaskInfo), pWin->tw.skey, pWin->tw.ekey, ascScan, startPos, winRows);        
1606
    
1607
    TAOS_CHECK_EXIT(extWinIndefRowsDo(pOperator, pInputBlock, startPos, winRows, pWin));
×
1608
    
1609
    startPos += winRows;
×
1610
  }
1611
  
1612
_exit:
×
1613

1614
  if (code) {
×
1615
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
1616
  }
1617

1618
  return code;
×
1619
}
1620

1621
static int32_t extWinNonAggOutputRes(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
836✔
1622
  SExternalWindowOperator* pExtW = pOperator->info;
836✔
1623
  int32_t                  numOfWin = pExtW->outWinIdx;
836✔
1624
  int32_t                  code = TSDB_CODE_SUCCESS;
836✔
1625
  int32_t                  lino = 0;
836✔
1626
  SSDataBlock*             pRes = NULL;
836✔
1627

1628
  for (; pExtW->outputWinId < numOfWin; pExtW->outputWinId++, extWinIncCurWinOutIdx(pOperator->pTaskInfo->pStreamRuntimeInfo)) {
836✔
1629
    SList* pList = taosArrayGetP(pExtW->pOutputBlocks, pExtW->outputWinId);
836✔
1630
    if (listNEles(pList) <= 0) {
836✔
1631
      continue;
×
1632
    }
1633

1634
    SListNode* pNode = tdListPopHead(pList);
836✔
1635
    pRes = *(SSDataBlock**)pNode->data;
836✔
1636
    pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamBlkWinIdx = *(SArray**)((SArray**)pNode->data + 1);
836✔
1637
    pExtW->pLastBlkNode = pNode;
836✔
1638

1639
    if (listNEles(pList) <= 0) {
836✔
1640
      pExtW->outputWinId++;
836✔
1641
      extWinIncCurWinOutIdx(pOperator->pTaskInfo->pStreamRuntimeInfo);
836✔
1642
    }
1643

1644
    break;
836✔
1645
  }
1646

1647
  if (pRes) {
836✔
1648
    qDebug("%s result generated, rows:%" PRId64 , GET_TASKID(pOperator->pTaskInfo), pRes->info.rows);
836✔
1649
    pRes->info.version = pOperator->pTaskInfo->version;
836✔
1650
    pRes->info.dataLoad = 1;
836✔
1651
  } else {
1652
    pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamBlkWinIdx = NULL;
×
1653
    qDebug("%s ext window done", GET_TASKID(pOperator->pTaskInfo));
×
1654
  }
1655

1656
  *ppRes = (pRes && pRes->info.rows > 0) ? pRes : NULL;
836✔
1657

1658
_exit:
836✔
1659

1660
  if (code != TSDB_CODE_SUCCESS) {
836✔
1661
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1662
  }
1663

1664
  return code;
836✔
1665
}
1666

1667
static int32_t extWinAggHandleEmptyWins(SOperatorInfo* pOperator, SSDataBlock* pBlock, bool allRemains, SExtWinTimeWindow* pWin) {
12,226,296✔
1668
  int32_t code = 0, lino = 0;
12,226,296✔
1669
  SExternalWindowOperator* pExtW = (SExternalWindowOperator*)pOperator->info;
12,226,296✔
1670
  SExprSupp* pSup = &pOperator->exprSupp;
12,226,296✔
1671
  int32_t currIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
12,226,296✔
1672

1673
  if (NULL == pExtW->pEmptyInputBlock || (pWin && pWin->tw.skey == pExtW->lastSKey)) {
12,226,296✔
1674
    goto _exit;
2,376,492✔
1675
  }
1676

1677
  bool ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
9,849,804✔
1678
  int32_t endIdx = allRemains ? (pExtW->pWins->size - 1) : (currIdx - 1);
9,849,804✔
1679
  SResultRowInfo* pResultRowInfo = &pExtW->binfo.resultRowInfo;
9,849,804✔
1680
  SSDataBlock* pInput = pExtW->pEmptyInputBlock;
9,849,804✔
1681

1682
  if ((pExtW->lastWinId + 1) <= endIdx) {
9,849,804✔
1683
    TAOS_CHECK_EXIT(setInputDataBlock(pSup, pExtW->pEmptyInputBlock, pExtW->binfo.inputTsOrder, MAIN_SCAN, true));
35,061✔
1684
  }
1685
  
1686
  for (int32_t i = pExtW->lastWinId + 1; i <= endIdx; ++i) {
9,950,588✔
1687
    SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, i);
100,784✔
1688

1689
    extWinSetCurWinIdx(pOperator, i);
100,784✔
1690
    qDebug("%s %dth ext empty window start:%" PRId64 ", end:%" PRId64 ", ascScan:%d",
100,784✔
1691
           GET_TASKID(pOperator->pTaskInfo), i, pWin->tw.skey, pWin->tw.ekey, ascScan);
1692

1693
    TAOS_CHECK_EXIT(extWinAggSetWinOutputBuf(pOperator, pWin, pSup, &pExtW->aggSup, pOperator->pTaskInfo));
100,784✔
1694

1695
    updateTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pWin->tw, 1);
100,784✔
1696
    code = extWinAggDo(pOperator, 0, 1, pInput);
100,784✔
1697
    pExtW->lastWinId = i;  
100,784✔
1698
    TAOS_CHECK_EXIT(code);
100,784✔
1699
  }
1700

1701
  
1702
_exit:
9,849,804✔
1703

1704
  if (code) {
12,226,296✔
1705
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __FUNCTION__, lino, tstrerror(code));
×
1706
  } else {
1707
    if (pBlock) {
12,226,296✔
1708
      TAOS_CHECK_EXIT(setInputDataBlock(pSup, pBlock, pExtW->binfo.inputTsOrder, pBlock->info.scanFlag, true));
12,105,114✔
1709
    }
1710

1711
    if (!allRemains) {
12,225,521✔
1712
      extWinSetCurWinIdx(pOperator, currIdx);  
12,104,339✔
1713
    }
1714
  }
1715

1716
  return code;
12,224,759✔
1717
}
1718

1719
static int32_t extWinAggOpen(SOperatorInfo* pOperator, SSDataBlock* pInputBlock) {
4,165,388✔
1720
  SExternalWindowOperator* pExtW = (SExternalWindowOperator*)pOperator->info;
4,165,388✔
1721
  int32_t                  startPos = 0, winRows = 0;
4,165,388✔
1722
  int64_t*                 tsCol = extWinExtractTsCol(pInputBlock, pExtW->primaryTsIndex, pOperator->pTaskInfo);
4,165,388✔
1723
  bool                     ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
4,165,388✔
1724
  int32_t                  code = 0, lino = 0;
4,165,388✔
1725
  SExtWinTimeWindow*       pWin = NULL;
4,165,388✔
1726
  bool                     scalarCalc = false;
4,165,388✔
1727

1728
  while (true) {
1729
    TAOS_CHECK_EXIT((*pExtW->getWinFp)(pOperator, tsCol, &startPos, &pInputBlock->info, &pWin, &winRows));
16,269,359✔
1730
    if (pWin == NULL) {
16,270,502✔
1731
      break;
4,165,388✔
1732
    }
1733

1734
    TAOS_CHECK_EXIT(extWinAggHandleEmptyWins(pOperator, pInputBlock, false, pWin));
12,105,114✔
1735

1736
    qDebug("%s ext window [%" PRId64 ", %" PRId64 ") agg start, ascScan:%d, startPos:%d, winRows:%d",
12,103,196✔
1737
           GET_TASKID(pOperator->pTaskInfo), pWin->tw.skey, pWin->tw.ekey, ascScan, startPos, winRows);        
1738

1739
    if (!scalarCalc) {
12,105,114✔
1740
      if (pExtW->scalarSupp.pExprInfo) {
4,085,041✔
1741
        SExprSupp* pScalarSup = &pExtW->scalarSupp;
×
1742
        TAOS_CHECK_EXIT(projectApplyFunctions(pScalarSup->pExprInfo, pInputBlock, pInputBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
×
1743
                                     pExtW->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo)));
1744
      }
1745
      
1746
      scalarCalc = true;
4,085,041✔
1747
    }
1748

1749
    if (pWin->tw.skey != pExtW->lastSKey) {
12,105,114✔
1750
      TAOS_CHECK_EXIT(extWinAggSetWinOutputBuf(pOperator, pWin, &pOperator->exprSupp, &pExtW->aggSup, pOperator->pTaskInfo));
9,786,845✔
1751
    }
1752
    
1753
    updateTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pWin->tw, 1);
12,104,733✔
1754
    TAOS_CHECK_EXIT(extWinAggDo(pOperator, startPos, winRows, pInputBlock));
12,105,114✔
1755
    
1756
    pExtW->lastSKey = pWin->tw.skey;
12,103,971✔
1757
    pExtW->lastWinId = extWinGetCurWinIdx(pOperator->pTaskInfo);
12,103,542✔
1758
    startPos += winRows;
12,103,971✔
1759
  }
1760

1761
_exit:
4,165,388✔
1762

1763
  if (code) {
4,165,388✔
1764
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
1765
  }
1766

1767
  return code;
4,165,388✔
1768
}
1769

1770
static int32_t extWinAggOutputRes(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
121,182✔
1771
  SExternalWindowOperator* pExtW = pOperator->info;
121,182✔
1772
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
121,182✔
1773
  SSDataBlock*             pBlock = pExtW->binfo.pRes;
121,182✔
1774
  int32_t                  code = TSDB_CODE_SUCCESS;
121,182✔
1775
  int32_t                  lino = 0;
121,182✔
1776
  SExprInfo*               pExprInfo = pOperator->exprSupp.pExprInfo;
121,182✔
1777
  int32_t                  numOfExprs = pOperator->exprSupp.numOfExprs;
121,182✔
1778
  int32_t*                 rowEntryOffset = pOperator->exprSupp.rowEntryInfoOffset;
121,182✔
1779
  SqlFunctionCtx*          pCtx = pOperator->exprSupp.pCtx;
121,182✔
1780
  int32_t                  numOfWin = pExtW->outWinIdx;
121,182✔
1781

1782
  pBlock->info.version = pTaskInfo->version;
121,182✔
1783
  blockDataCleanup(pBlock);
121,182✔
1784
  taosArrayClear(pExtW->pWinRowIdx);
121,182✔
1785

1786
  for (int32_t i = pExtW->outputWinId; i < pExtW->pWins->size; ++i, pExtW->outputWinId += 1) {
565,684✔
1787
    SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, i);
444,502✔
1788
    int32_t            winIdx = pWin->winOutIdx;
444,502✔
1789
    if (winIdx < 0) {
444,502✔
1790
      continue;
1,975✔
1791
    }
1792
    SResultRow* pRow = (SResultRow*)((char*)pExtW->pResultRow + winIdx * pExtW->aggSup.resultRowSize);
442,527✔
1793

1794
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
442,527✔
1795

1796
    // no results, continue to check the next one
1797
    if (pRow->numOfRows == 0) {
442,527✔
1798
      continue;
×
1799
    }
1800

1801
    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
442,527✔
1802
      uint32_t newSize = pBlock->info.rows + pRow->numOfRows + numOfWin - winIdx;
90,492✔
1803
      TAOS_CHECK_EXIT(blockDataEnsureCapacity(pBlock, newSize));
90,492✔
1804
      qDebug("datablock capacity not sufficient, expand to required:%d, current capacity:%d, %s", newSize,
90,492✔
1805
             pBlock->info.capacity, GET_TASKID(pTaskInfo));
1806
    }
1807

1808
    TAOS_CHECK_EXIT(copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo));
442,527✔
1809

1810
    pBlock->info.rows += pRow->numOfRows;
442,527✔
1811

1812
    TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pExtW->pWinRowIdx, pBlock, pRow->winIdx, pRow->numOfRows));
442,527✔
1813

1814
    if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
442,527✔
1815
      break;
×
1816
    }
1817
  }
1818

1819
  qDebug("%s result generated, rows:%" PRId64 ", groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
121,182✔
1820
         pBlock->info.id.groupId);
1821
         
1822
  pBlock->info.dataLoad = 1;
121,182✔
1823

1824
  *ppRes = (pBlock->info.rows > 0) ? pBlock : NULL;
121,182✔
1825
  pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamBlkWinIdx = pExtW->pWinRowIdx;
121,182✔
1826

1827
_exit:
121,182✔
1828

1829
  if (code != TSDB_CODE_SUCCESS) {
121,182✔
1830
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1831
  }
1832

1833
  return code;
121,182✔
1834
}
1835

1836
static int32_t extWinInitResultRow(SExecTaskInfo*        pTaskInfo, SExternalWindowOperator* pExtW, int32_t winNum) {
121,587✔
1837
  if (EEXT_MODE_SCALAR == pExtW->mode) {
121,587✔
1838
    return TSDB_CODE_SUCCESS;
418✔
1839
  }
1840

1841
  if (winNum <= pExtW->resultRowCapacity) {
121,563✔
1842
    return TSDB_CODE_SUCCESS;
26,735✔
1843
  }
1844
  
1845
  taosMemoryFreeClear(pExtW->pResultRow);
94,828✔
1846
  pExtW->resultRowCapacity = -1;
94,434✔
1847

1848
  int32_t code = 0, lino = 0;
94,434✔
1849
  
1850
  pExtW->pResultRow = taosMemoryCalloc(winNum, pExtW->aggSup.resultRowSize);
94,434✔
1851
  QUERY_CHECK_NULL(pExtW->pResultRow, code, lino, _exit, terrno);
94,828✔
1852

1853
  pExtW->resultRowCapacity = winNum;
94,828✔
1854

1855
_exit:
94,828✔
1856

1857
  if (code) {
94,828✔
1858
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1859
  }
1860

1861
  return code;
94,828✔
1862
}
1863

1864
static void extWinFreeResultRow(SExternalWindowOperator* pExtW) {
121,182✔
1865
  if (pExtW->resultRowCapacity * pExtW->aggSup.resultRowSize >= 1048576) {
121,182✔
1866
    taosMemoryFreeClear(pExtW->pResultRow);
×
1867
    pExtW->resultRowCapacity = -1;
×
1868
  }
1869
  if (pExtW->binfo.pRes && pExtW->binfo.pRes->info.rows * pExtW->aggSup.resultRowSize >= 1048576) {
121,182✔
1870
    blockDataFreeCols(pExtW->binfo.pRes);
×
1871
  }
1872
}
121,182✔
1873

1874
static int32_t extWinInitWindowList(SExternalWindowOperator* pExtW, SExecTaskInfo*        pTaskInfo) {
121,981✔
1875
  if (taosArrayGetSize(pExtW->pWins) > 0) {
121,981✔
1876
    return TSDB_CODE_SUCCESS;
×
1877
  }
1878
  
1879
  int32_t code = 0, lino = 0;
121,981✔
1880
  SStreamRuntimeFuncInfo* pInfo = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
121,981✔
1881
  size_t size = taosArrayGetSize(pInfo->pStreamPesudoFuncVals);
121,981✔
1882
  SExtWinTimeWindow* pWin = taosArrayReserve(pExtW->pWins, size);
121,981✔
1883
  TSDB_CHECK_NULL(pWin, code, lino, _exit, terrno);
121,981✔
1884

1885
  TAOS_CHECK_EXIT(extWinInitResultRow(pTaskInfo, pExtW, size));
121,981✔
1886

1887
  if (pExtW->timeRangeExpr && pExtW->timeRangeExpr->needCalc) {
121,981✔
1888
    TAOS_CHECK_EXIT(scalarCalculateExtWinsTimeRange(pExtW->timeRangeExpr, pInfo, pWin));
40,257✔
1889
    if (qDebugFlag & DEBUG_DEBUG) {
40,257✔
1890
      for (int32_t i = 0; i < size; ++i) {
143,253✔
1891
        qDebug("%s the %d/%d ext window calced initialized, TR[%" PRId64 ", %" PRId64 ")", 
102,996✔
1892
            pTaskInfo->id.str, i, (int32_t)size, pWin[i].tw.skey, pWin[i].tw.ekey);
1893
      }
1894
    }
1895
  } else {
1896
    for (int32_t i = 0; i < size; ++i) {
426,733✔
1897
      SSTriggerCalcParam* pParam = taosArrayGet(pInfo->pStreamPesudoFuncVals, i);
345,009✔
1898

1899
      pWin[i].tw.skey = pParam->wstart;
345,009✔
1900
      pWin[i].tw.ekey = pParam->wend + ((pInfo->triggerType != STREAM_TRIGGER_SLIDING) ? 1 : 0);
345,009✔
1901
      pWin[i].winOutIdx = -1;
345,009✔
1902

1903
      qDebug("%s the %d/%d ext window initialized, TR[%" PRId64 ", %" PRId64 ")", 
345,009✔
1904
          pTaskInfo->id.str, i, (int32_t)size, pWin[i].tw.skey, pWin[i].tw.ekey);
1905
    }
1906
  }
1907
  
1908
  pExtW->outputWinId = pInfo->curIdx;
121,981✔
1909
  pExtW->lastWinId = -1;
121,981✔
1910
  pExtW->blkWinStartIdx = pInfo->curIdx;
121,981✔
1911

1912
_exit:
121,981✔
1913

1914
  if (code) {
121,981✔
1915
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1916
  }
1917

1918
  return code;
121,981✔
1919
}
1920

1921
static bool extWinNonAggGotResBlock(SExternalWindowOperator* pExtW) {
1,254✔
1922
  if (pExtW->multiTableMode && !pExtW->inputHasOrder) {
1,254✔
1923
    return false;
1,254✔
1924
  }
1925
  int32_t remainWin = pExtW->outWinIdx - pExtW->outputWinId;
×
1926
  if (remainWin > 1 && (NULL == pExtW->timeRangeExpr || !pExtW->timeRangeExpr->needCalc)) {
×
1927
    return true;
×
1928
  }
1929
  
1930
  SList* pList = taosArrayGetP(pExtW->pOutputBlocks, pExtW->outputWinId);
×
1931
  if (!pList || listNEles(pList) <= 0) {
×
1932
    return false;
×
1933
  }
1934
  if (listNEles(pList) > 1) {
×
1935
    return true;
×
1936
  }
1937

1938
  SListNode* pNode = listHead(pList);
×
1939
  SArray* pIdx = *(SArray**)((SArray**)pNode->data + 1);
×
1940
  int32_t* winIdx = taosArrayGetLast(pIdx);
×
1941
  if (winIdx && *winIdx < pExtW->blkWinStartIdx) {
×
1942
    return true;
×
1943
  }
1944

1945
  return false;
×
1946
}
1947

1948
static int32_t extWinOpen(SOperatorInfo* pOperator) {
122,399✔
1949
  if (OPTR_IS_OPENED(pOperator)) {
122,399✔
1950
    return TSDB_CODE_SUCCESS;
418✔
1951
  }
1952
  
1953
  int32_t                  code = 0;
121,981✔
1954
  int32_t                  lino = 0;
121,981✔
1955
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
121,981✔
1956
  SOperatorInfo*           pDownstream = pOperator->pDownstream[0];
121,981✔
1957
  SExternalWindowOperator* pExtW = pOperator->info;
121,981✔
1958
  SExprSupp*               pSup = &pOperator->exprSupp;
121,981✔
1959
  
1960
  TAOS_CHECK_EXIT(extWinInitWindowList(pExtW, pTaskInfo));
121,981✔
1961

1962
  while (1) {
4,166,642✔
1963
    pExtW->blkWinIdx = -1;
4,288,623✔
1964
    pExtW->blkWinStartSet = false;
4,288,623✔
1965
    pExtW->blkRowStartIdx = 0;
4,288,623✔
1966
    
1967
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
4,288,623✔
1968
    if (pBlock == NULL) {
4,288,242✔
1969
      if (EEXT_MODE_AGG == pExtW->mode) {
121,600✔
1970
        TAOS_CHECK_EXIT(extWinAggHandleEmptyWins(pOperator, pBlock, true, NULL));
121,182✔
1971
      }
1972
      pExtW->blkWinStartIdx = pExtW->pWins->size;
121,600✔
1973
      break;
121,600✔
1974
    }
1975

1976
    printDataBlock(pBlock, __func__, pTaskInfo->id.str, pTaskInfo->id.queryId);
4,166,642✔
1977

1978
    qDebug("ext window mode:%d got %" PRId64 " rows from downstream", pExtW->mode, pBlock->info.rows);
4,166,642✔
1979
    
1980
    switch (pExtW->mode) {
4,166,642✔
1981
      case EEXT_MODE_SCALAR:
1,254✔
1982
        TAOS_CHECK_EXIT(extWinProjectOpen(pOperator, pBlock));
1,254✔
1983
        if (extWinNonAggGotResBlock(pExtW)) {
1,254✔
1984
          return code;
×
1985
        }
1986
        break;
1,254✔
1987
      case EEXT_MODE_AGG:
4,165,388✔
1988
        TAOS_CHECK_EXIT(extWinAggOpen(pOperator, pBlock));
4,165,388✔
1989
        break;
4,165,388✔
1990
      case EEXT_MODE_INDEFR_FUNC:
×
1991
        TAOS_CHECK_EXIT(extWinIndefRowsOpen(pOperator, pBlock));
×
1992
        if (extWinNonAggGotResBlock(pExtW)) {
×
1993
          return code;
×
1994
        }
1995
        break;
×
1996
      default:
×
1997
        break;
×
1998
    }
1999
  }
2000

2001
  OPTR_SET_OPENED(pOperator);
121,600✔
2002

2003
#if 0
2004
  if (pExtW->mode == EEXT_MODE_AGG) {
2005
    qDebug("ext window before dump final rows num:%d", tSimpleHashGetSize(pExtW->aggSup.pResultRowHashTable));
2006

2007
    code = initGroupedResultInfo(&pExtW->groupResInfo, pExtW->aggSup.pResultRowHashTable, pExtW->binfo.inputTsOrder);
2008
    QUERY_CHECK_CODE(code, lino, _exit);
2009

2010
    qDebug("ext window after dump final rows num:%d", tSimpleHashGetSize(pExtW->aggSup.pResultRowHashTable));
2011
  }
2012
#endif
2013

2014
_exit:
121,600✔
2015

2016
  if (code != 0) {
121,600✔
2017
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
2018
    pTaskInfo->code = code;
×
2019
    T_LONG_JMP(pTaskInfo->env, code);
×
2020
  }
2021
  
2022
  return code;
121,600✔
2023
}
2024

2025
static int32_t extWinNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
237,913✔
2026
  int32_t                  code = 0;
237,913✔
2027
  int32_t                  lino = 0;
237,913✔
2028
  SExternalWindowOperator* pExtW = pOperator->info;
237,913✔
2029
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
237,913✔
2030

2031
  if (pOperator->status == OP_EXEC_DONE) {
237,913✔
2032
    *ppRes = NULL;
115,514✔
2033
    return code;
115,514✔
2034
  }
2035

2036
  extWinRecycleBlkNode(pExtW, &pExtW->pLastBlkNode);
122,399✔
2037

2038
  TAOS_CHECK_EXIT(pOperator->fpSet._openFn(pOperator));
122,399✔
2039

2040
  if (pExtW->mode == EEXT_MODE_SCALAR || pExtW->mode == EEXT_MODE_INDEFR_FUNC) {
122,018✔
2041
    TAOS_CHECK_EXIT(extWinNonAggOutputRes(pOperator, ppRes));
836✔
2042
    if (NULL == *ppRes) {
836✔
2043
      setOperatorCompleted(pOperator);
×
2044
      extWinFreeResultRow(pExtW);
×
2045
    }
2046
  } else {
2047
#if 0    
2048
    doBuildResultDatablock(pOperator, &pExtW->binfo, &pExtW->groupResInfo, pExtW->aggSup.pResultBuf);
2049
    bool hasRemain = hasRemainResults(&pExtW->groupResInfo);
2050
    if (!hasRemain) {
2051
      setOperatorCompleted(pOperator);
2052
      break;
2053
    }
2054
    if (pExtW->binfo.pRes->info.rows > 0) break;
2055
#else
2056
    TAOS_CHECK_EXIT(extWinAggOutputRes(pOperator, ppRes));
121,182✔
2057
    setOperatorCompleted(pOperator);
121,182✔
2058
    extWinFreeResultRow(pExtW);
121,182✔
2059
#endif      
2060
  }
2061

2062
  if (*ppRes) {
122,018✔
2063
    pOperator->resultInfo.totalRows += (*ppRes)->info.rows;
120,043✔
2064
  }
2065
  
2066
_exit:
1,975✔
2067

2068
  if (code) {
122,018✔
2069
    qError("%s %s failed at line %d since %s", GET_TASKID(pTaskInfo), __func__, lino, tstrerror(code));
×
2070
    pTaskInfo->code = code;
×
2071
    T_LONG_JMP(pTaskInfo->env, code);
×
2072
  }
2073

2074
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM && (*ppRes)) {
122,018✔
2075
    printDataBlock(*ppRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo), pTaskInfo->id.queryId);
120,043✔
2076
  }
2077
  
2078
  return code;
122,018✔
2079
}
2080

2081

2082
int32_t createExternalWindowOperator(SOperatorInfo* pDownstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo,
88,075✔
2083
                                     SOperatorInfo** pOptrOut) {
2084
  SExternalWindowPhysiNode* pPhynode = (SExternalWindowPhysiNode*)pNode;
88,075✔
2085
  QRY_PARAM_CHECK(pOptrOut);
88,075✔
2086
  int32_t                  code = 0;
88,469✔
2087
  int32_t                  lino = 0;
88,469✔
2088
  SExternalWindowOperator* pExtW = taosMemoryCalloc(1, sizeof(SExternalWindowOperator));
88,469✔
2089
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
88,469✔
2090
  pOperator->pPhyNode = pNode;
88,469✔
2091
  if (!pExtW || !pOperator) {
88,469✔
2092
    code = terrno;
×
2093
    lino = __LINE__;
×
2094
    goto _error;
×
2095
  }
2096
  
2097
  setOperatorInfo(pOperator, "ExternalWindowOperator", QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW, true, OP_NOT_OPENED,
88,469✔
2098
                  pExtW, pTaskInfo);
2099
                  
2100
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhynode->window.node.pOutputDataBlockDesc);
88,469✔
2101
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
88,469✔
2102
  initBasicInfo(&pExtW->binfo, pResBlock);
88,469✔
2103

2104
  pExtW->primaryTsIndex = ((SColumnNode*)pPhynode->window.pTspk)->slotId;
88,075✔
2105
  pExtW->mode = pPhynode->window.pProjs ? EEXT_MODE_SCALAR : (pPhynode->window.indefRowsFunc ? EEXT_MODE_INDEFR_FUNC : EEXT_MODE_AGG);
88,469✔
2106
  pExtW->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC;
88,469✔
2107
  pExtW->binfo.outputTsOrder = pExtW->binfo.inputTsOrder;
88,469✔
2108

2109
  if (pTaskInfo->pStreamRuntimeInfo != NULL){
88,075✔
2110
    pTaskInfo->pStreamRuntimeInfo->funcInfo.withExternalWindow = true;
88,469✔
2111
  }
2112

2113
  // pExtW->limitInfo = (SLimitInfo){0};
2114
  // initLimitInfo(pPhynode->window.node.pLimit, pPhynode->window.node.pSlimit, &pExtW->limitInfo);
2115

2116
  if (pPhynode->window.pProjs) {
87,681✔
2117
    int32_t    numOfScalarExpr = 0;
418✔
2118
    SExprInfo* pScalarExprInfo = NULL;
418✔
2119
    code = createExprInfo(pPhynode->window.pProjs, NULL, &pScalarExprInfo, &numOfScalarExpr);
418✔
2120
    QUERY_CHECK_CODE(code, lino, _error);
418✔
2121

2122
    code = initExprSupp(&pExtW->scalarSupp, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
418✔
2123
    QUERY_CHECK_CODE(code, lino, _error);
418✔
2124

2125
  //if (pExtW->multiTableMode) {
2126
    pExtW->pOutputBlocks = taosArrayInit_s(POINTER_BYTES, STREAM_CALC_REQ_MAX_WIN_NUM);
418✔
2127
    if (!pExtW->pOutputBlocks) QUERY_CHECK_CODE(terrno, lino, _error);
418✔
2128
  //}
2129
    pExtW->pFreeBlocks = tdListNew(POINTER_BYTES * 2);
418✔
2130
    QUERY_CHECK_NULL(pExtW->pFreeBlocks, code, lino, _error, terrno);
418✔
2131
  } else if (pExtW->mode == EEXT_MODE_AGG) {
88,051✔
2132
    if (pPhynode->window.pExprs != NULL) {
88,051✔
2133
      int32_t    num = 0;
×
2134
      SExprInfo* pSExpr = NULL;
×
2135
      code = createExprInfo(pPhynode->window.pExprs, NULL, &pSExpr, &num);
×
2136
      QUERY_CHECK_CODE(code, lino, _error);
×
2137
    
2138
      code = initExprSupp(&pExtW->scalarSupp, pSExpr, num, &pTaskInfo->storageAPI.functionStore);
×
2139
      if (code != TSDB_CODE_SUCCESS) {
×
2140
        goto _error;
×
2141
      }
2142
      checkIndefRowsFuncs(&pExtW->scalarSupp);
×
2143
    }
2144
    
2145
    size_t keyBufSize = sizeof(int64_t) * 2 + POINTER_BYTES;
88,051✔
2146
    initResultSizeInfo(&pOperator->resultInfo, 4096);
88,051✔
2147
    //code = blockDataEnsureCapacity(pExtW->binfo.pRes, pOperator->resultInfo.capacity);
2148
    //QUERY_CHECK_CODE(code, lino, _error);
2149

2150
    pExtW->pWinRowIdx = taosArrayInit(4096, sizeof(int64_t));
88,051✔
2151
    TSDB_CHECK_NULL(pExtW->pWinRowIdx, code, lino, _error, terrno);
88,051✔
2152
    
2153
    int32_t num = 0;
88,051✔
2154
    SExprInfo* pExprInfo = NULL;
88,051✔
2155
    code = createExprInfo(pPhynode->window.pFuncs, NULL, &pExprInfo, &num);
88,051✔
2156
    QUERY_CHECK_CODE(code, lino, _error);
88,051✔
2157
    code = initAggSup(&pOperator->exprSupp, &pExtW->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, 0, 0);
88,051✔
2158
    QUERY_CHECK_CODE(code, lino, _error);
88,051✔
2159

2160
    nodesWalkExprs(pPhynode->window.pFuncs, extWinHasCountLikeFunc, &pExtW->hasCountFunc);
88,051✔
2161
    if (pExtW->hasCountFunc) {
87,657✔
2162
      code = extWinCreateEmptyInputBlock(pOperator, &pExtW->pEmptyInputBlock);
70,401✔
2163
      QUERY_CHECK_CODE(code, lino, _error);
70,401✔
2164
      qDebug("%s ext window has CountLikeFunc", pOperator->pTaskInfo->id.str);
70,401✔
2165
    } else {
2166
      qDebug("%s ext window doesn't have CountLikeFunc", pOperator->pTaskInfo->id.str);
17,256✔
2167
    }
2168

2169
    code = initExecTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pTaskInfo->window);
88,051✔
2170
    QUERY_CHECK_CODE(code, lino, _error);
88,051✔
2171

2172
    pExtW->lastSKey = INT64_MIN;
88,051✔
2173
  } else {
2174
    size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
×
2175
    
2176
    if (pPhynode->window.pExprs != NULL) {
×
2177
      int32_t    num = 0;
×
2178
      SExprInfo* pSExpr = NULL;
×
2179
      code = createExprInfo(pPhynode->window.pExprs, NULL, &pSExpr, &num);
×
2180
      QUERY_CHECK_CODE(code, lino, _error);
×
2181
    
2182
      code = initExprSupp(&pExtW->scalarSupp, pSExpr, num, &pTaskInfo->storageAPI.functionStore);
×
2183
      if (code != TSDB_CODE_SUCCESS) {
×
2184
        goto _error;
×
2185
      }
2186
    }
2187
    
2188
    int32_t    numOfExpr = 0;
×
2189
    SExprInfo* pExprInfo = NULL;
×
2190
    code = createExprInfo(pPhynode->window.pFuncs, NULL, &pExprInfo, &numOfExpr);
×
2191
    TSDB_CHECK_CODE(code, lino, _error);
×
2192
    
2193
    code = initAggSup(&pOperator->exprSupp, &pExtW->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str,
×
2194
                              pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
×
2195
    TSDB_CHECK_CODE(code, lino, _error);
×
2196
    pOperator->exprSupp.hasWindowOrGroup = false;
×
2197
    
2198
    //code = setFunctionResultOutput(pOperator, &pExtW->binfo, &pExtW->aggSup, MAIN_SCAN, numOfExpr);
2199
    //TSDB_CHECK_CODE(code, lino, _error);
2200
    
2201
    code = filterInitFromNode((SNode*)pNode->pConditions, &pOperator->exprSupp.pFilterInfo, 0,
×
2202
                              pTaskInfo->pStreamRuntimeInfo);
×
2203
    TSDB_CHECK_CODE(code, lino, _error);
×
2204
    
2205
    pExtW->binfo.inputTsOrder = pNode->inputTsOrder;
×
2206
    pExtW->binfo.outputTsOrder = pNode->outputTsOrder;
×
2207
    code = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfExpr, &pExtW->pPseudoColInfo);
×
2208
    TSDB_CHECK_CODE(code, lino, _error);
×
2209

2210
  //if (pExtW->multiTableMode) {
2211
    pExtW->pOutputBlocks = taosArrayInit_s(POINTER_BYTES, STREAM_CALC_REQ_MAX_WIN_NUM);
×
2212
    if (!pExtW->pOutputBlocks) QUERY_CHECK_CODE(terrno, lino, _error);
×
2213
  //}
2214
    pExtW->pFreeBlocks = tdListNew(POINTER_BYTES * 2);
×
2215
    QUERY_CHECK_NULL(pExtW->pFreeBlocks, code, lino, _error, terrno);  
×
2216
  }
2217

2218
  pExtW->pWins = taosArrayInit(4096, sizeof(SExtWinTimeWindow));
88,469✔
2219
  if (!pExtW->pWins) QUERY_CHECK_CODE(terrno, lino, _error);
88,469✔
2220
  
2221
  //initResultRowInfo(&pExtW->binfo.resultRowInfo);
2222

2223
  pExtW->timeRangeExpr = (STimeRangeNode*)pPhynode->pTimeRange;
88,469✔
2224
  if (pExtW->timeRangeExpr) {
88,469✔
2225
    QUERY_CHECK_NULL(pExtW->timeRangeExpr->pStart, code, lino, _error, TSDB_CODE_STREAM_INTERNAL_ERROR);
88,469✔
2226
    QUERY_CHECK_NULL(pExtW->timeRangeExpr->pEnd, code, lino, _error, TSDB_CODE_STREAM_INTERNAL_ERROR);
88,469✔
2227
  }
2228

2229
  if (pPhynode->isSingleTable) {
88,469✔
2230
    pExtW->getWinFp = (pExtW->timeRangeExpr && (pExtW->timeRangeExpr->needCalc || (pTaskInfo->pStreamRuntimeInfo->funcInfo.addOptions & CALC_SLIDING_OVERLAP))) ? extWinGetOvlpWin : extWinGetNoOvlpWin;
31,200✔
2231
    pExtW->multiTableMode = false;
31,200✔
2232
  } else {
2233
    pExtW->getWinFp = (pExtW->timeRangeExpr && (pExtW->timeRangeExpr->needCalc || (pTaskInfo->pStreamRuntimeInfo->funcInfo.addOptions & CALC_SLIDING_OVERLAP))) ? extWinGetMultiTbOvlpWin : extWinGetMultiTbNoOvlpWin;
57,269✔
2234
    pExtW->multiTableMode = true;
57,269✔
2235
  }
2236
  pExtW->inputHasOrder = pPhynode->inputHasOrder;
88,469✔
2237

2238
  pOperator->fpSet = createOperatorFpSet(extWinOpen, extWinNext, NULL, destroyExternalWindowOperatorInfo,
88,469✔
2239
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2240
  setOperatorResetStateFn(pOperator, resetExternalWindowOperator);
88,469✔
2241
  code = appendDownstream(pOperator, &pDownstream, 1);
88,469✔
2242
  if (code != 0) {
88,469✔
2243
    goto _error;
×
2244
  }
2245

2246
  *pOptrOut = pOperator;
88,469✔
2247
  return code;
88,469✔
2248

2249
_error:
×
2250

2251
  if (pExtW != NULL) {
×
2252
    destroyExternalWindowOperatorInfo(pExtW);
×
2253
  }
2254

2255
  destroyOperatorAndDownstreams(pOperator, &pDownstream, 1);
×
2256
  pTaskInfo->code = code;
×
2257
  qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code));
×
2258
  return code;
×
2259
}
2260

2261

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc