• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4849

13 Nov 2025 07:17AM UTC coverage: 63.829% (+0.6%) from 63.27%
#4849

push

travis-ci

guanshengliang
test: enable auto upload

148928 of 233322 relevant lines covered (63.83%)

115593255.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.34
/source/libs/executor/src/externalwindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "operator.h"
18
#include "querytask.h"
19
#include "tdatablock.h"
20
#include "stream.h"
21
#include "filter.h"
22
#include "cmdnodes.h"
23

24
typedef struct SBlockList {
25
  const SSDataBlock* pSrcBlock;
26
  SList*             pBlocks;
27
  int32_t            blockRowNumThreshold;
28
} SBlockList;
29

30

31
typedef int32_t (*extWinGetWinFp)(SOperatorInfo*, int64_t*, int32_t*, SDataBlockInfo*, SExtWinTimeWindow**, int32_t*);
32

33
typedef struct SExtWindowStat {
34
  int64_t resBlockCreated;
35
  int64_t resBlockDestroyed;
36
  int64_t resBlockRecycled;
37
  int64_t resBlockReused;
38
  int64_t resBlockAppend;
39
} SExtWindowStat;
40

41
typedef struct SExternalWindowOperator {
42
  SOptrBasicInfo     binfo;
43
  SExprSupp          scalarSupp;
44
  int32_t            primaryTsIndex;
45
  EExtWinMode        mode;
46
  bool               multiTableMode;
47
  bool               inputHasOrder;
48
  SArray*            pWins;           // SArray<SExtWinTimeWindow>
49
  SArray*            pPseudoColInfo;  
50
  STimeRangeNode*    timeRangeExpr;
51
  
52
  extWinGetWinFp     getWinFp;
53

54
  bool               blkWinStartSet;
55
  int32_t            blkWinStartIdx;
56
  int32_t            blkWinIdx;
57
  int32_t            blkRowStartIdx;
58
  int32_t            outputWinId;
59
  int32_t            outWinIdx;
60

61
  // for project&indefRows
62
  SList*             pFreeBlocks;    // SList<SSDatablock*+SAarray*>
63
  SArray*            pOutputBlocks;  // SArray<SList*>, for each window, we have a list of blocks
64
  SListNode*         pLastBlkNode; 
65
  SSDataBlock*       pTmpBlock;
66
  
67
  // for agg
68
  SAggSupporter      aggSup;
69
  STimeWindowAggSupp twAggSup;
70

71
  int32_t            resultRowCapacity;
72
  SResultRow*        pResultRow;
73

74
  int64_t            lastSKey;
75
  int32_t            lastWinId;
76
  SSDataBlock*       pEmptyInputBlock;
77
  bool               hasCountFunc;
78
  SExtWindowStat     stat;
79
  SArray*            pWinRowIdx;
80
} SExternalWindowOperator;
81

82

83
static int32_t extWinBlockListAddBlock(SExternalWindowOperator* pExtW, SList* pList, int32_t rows, SSDataBlock** ppBlock, SArray** ppIdx) {
824✔
84
  SSDataBlock* pRes = NULL;
824✔
85
  int32_t code = 0, lino = 0;
824✔
86

87
  if (listNEles(pExtW->pFreeBlocks) > 0) {
824✔
88
    SListNode* pNode = tdListPopHead(pExtW->pFreeBlocks);
×
89
    *ppBlock = *(SSDataBlock**)pNode->data;
×
90
    *ppIdx = *(SArray**)((SArray**)pNode->data + 1);
×
91
    tdListAppendNode(pList, pNode);
×
92
    pExtW->stat.resBlockReused++;
×
93
  } else {
94
    TAOS_CHECK_EXIT(createOneDataBlock(pExtW->binfo.pRes, false, &pRes));
824✔
95
    TAOS_CHECK_EXIT(blockDataEnsureCapacity(pRes, TMAX(rows, 4096)));
824✔
96
    SArray* pIdx = taosArrayInit(10, sizeof(int64_t));
824✔
97
    TSDB_CHECK_NULL(pIdx, code, lino, _exit, terrno);
824✔
98
    void* res[2] = {pRes, pIdx};
824✔
99
    TAOS_CHECK_EXIT(tdListAppend(pList, res));
824✔
100

101
    *ppBlock = pRes;
824✔
102
    *ppIdx = pIdx;
824✔
103
    pExtW->stat.resBlockCreated++;
824✔
104
  }
105
  
106
_exit:
824✔
107

108
  if (code) {
824✔
109
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
110
    blockDataDestroy(pRes);
×
111
  }
112
  
113
  return code;
824✔
114
}
115

116
static int32_t extWinGetLastBlockFromList(SExternalWindowOperator* pExtW, SList* pList, int32_t rows, SSDataBlock** ppBlock, SArray** ppIdx) {
2,060✔
117
  int32_t    code = 0, lino = 0;
2,060✔
118
  SSDataBlock* pRes = NULL;
2,060✔
119

120
  SListNode* pNode = TD_DLIST_TAIL(pList);
2,060✔
121
  if (NULL == pNode) {
2,060✔
122
    TAOS_CHECK_EXIT(extWinBlockListAddBlock(pExtW, pList, rows, ppBlock, ppIdx));
824✔
123
    return code;
824✔
124
  }
125

126
  pRes = *(SSDataBlock**)pNode->data;
1,236✔
127
  if ((pRes->info.rows + rows) > pRes->info.capacity) {
1,236✔
128
    TAOS_CHECK_EXIT(extWinBlockListAddBlock(pExtW, pList, rows, ppBlock, ppIdx));
×
129
    return code;
×
130
  }
131

132
  *ppIdx = *(SArray**)((SSDataBlock**)pNode->data + 1);
1,236✔
133
  *ppBlock = pRes;
1,236✔
134
  pExtW->stat.resBlockAppend++;
1,236✔
135

136
_exit:
1,236✔
137

138
  if (code) {
1,236✔
139
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
140
  }
141
  
142
  return code;
1,236✔
143
}
144

145
static void extWinDestroyBlockList(void* p) {
16,875,520✔
146
  if (NULL == p) {
16,875,520✔
147
    return;
×
148
  }
149

150
  SListNode* pTmp = NULL;
16,875,520✔
151
  SList** ppList = (SList**)p;
16,875,520✔
152
  if ((*ppList) && TD_DLIST_NELES(*ppList) > 0) {
16,875,520✔
153
    SListNode* pNode = TD_DLIST_HEAD(*ppList);
×
154
    while (pNode) {
×
155
      SSDataBlock* pBlock = *(SSDataBlock**)pNode->data;
×
156
      blockDataDestroy(pBlock);
×
157
      SArray* pIdx = *(SArray**)((SArray**)pNode->data + 1);
×
158
      taosArrayDestroy(pIdx);
×
159
      pTmp = pNode;
×
160
      pNode = pNode->dl_next_;
×
161
      taosMemoryFree(pTmp);
×
162
    }
163
  }
164
  taosMemoryFree(*ppList);
16,875,520✔
165
}
166

167

168
static void extWinRecycleBlkNode(SExternalWindowOperator* pExtW, SListNode** ppNode) {
173,656✔
169
  if (NULL == ppNode || NULL == *ppNode) {
173,656✔
170
    return;
173,244✔
171
  }
172

173
  SSDataBlock* pBlock = *(SSDataBlock**)(*ppNode)->data;
412✔
174
  SArray* pIdx = *(SArray**)((SArray**)(*ppNode)->data + 1);
412✔
175
  
176
  if (listNEles(pExtW->pFreeBlocks) >= 10) {
412✔
177
    blockDataDestroy(pBlock);
×
178
    taosArrayDestroy(pIdx);
×
179
    taosMemoryFreeClear(*ppNode);
×
180
    pExtW->stat.resBlockDestroyed++;
×
181
    return;
×
182
  }
183
  
184
  blockDataCleanup(pBlock);
412✔
185
  taosArrayClear(pIdx);
412✔
186
  tdListPrependNode(pExtW->pFreeBlocks, *ppNode);
412✔
187
  *ppNode = NULL;
412✔
188
  pExtW->stat.resBlockRecycled++;
412✔
189
}
190

191
static void extWinRecycleBlockList(SExternalWindowOperator* pExtW, void* p) {
824✔
192
  if (NULL == p) {
824✔
193
    return;
×
194
  }
195

196
  SListNode* pTmp = NULL;
824✔
197
  SList** ppList = (SList**)p;
824✔
198
  if ((*ppList) && TD_DLIST_NELES(*ppList) > 0) {
824✔
199
    SListNode* pNode = TD_DLIST_HEAD(*ppList);
×
200
    while (pNode) {
×
201
      pTmp = pNode;
×
202
      pNode = pNode->dl_next_;
×
203
      extWinRecycleBlkNode(pExtW, &pTmp);
×
204
    }
205
  }
206
  taosMemoryFree(*ppList);
824✔
207
}
208
static void extWinDestroyBlkNode(SExternalWindowOperator* pInfo, SListNode* pNode) {
73,924✔
209
  if (NULL == pNode) {
73,924✔
210
    return;
73,100✔
211
  }
212

213
  SSDataBlock* pBlock = *(SSDataBlock**)pNode->data;
824✔
214
  SArray* pIdx = *(SArray**)((SArray**)pNode->data + 1);
824✔
215
  
216
  blockDataDestroy(pBlock);
824✔
217
  taosArrayDestroy(pIdx);
824✔
218

219
  taosMemoryFree(pNode);
824✔
220

221
  pInfo->stat.resBlockDestroyed++;
824✔
222
}
223

224

225
void destroyExternalWindowOperatorInfo(void* param) {
73,512✔
226
  if (NULL == param) {
73,512✔
227
    return;
×
228
  }
229
  SExternalWindowOperator* pInfo = (SExternalWindowOperator*)param;
73,512✔
230
  cleanupBasicInfo(&pInfo->binfo);
73,512✔
231

232
  taosArrayDestroyEx(pInfo->pOutputBlocks, extWinDestroyBlockList);
73,512✔
233
  taosArrayDestroy(pInfo->pWins);
73,512✔
234
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
73,512✔
235
  taosArrayDestroy(pInfo->pWinRowIdx);
73,512✔
236
  
237
  taosArrayDestroy(pInfo->pPseudoColInfo);
73,512✔
238
  blockDataDestroy(pInfo->pTmpBlock);
73,512✔
239
  blockDataDestroy(pInfo->pEmptyInputBlock);
73,512✔
240

241
  extWinDestroyBlkNode(pInfo, pInfo->pLastBlkNode);
73,512✔
242
  if (pInfo->pFreeBlocks) {
73,512✔
243
    SListNode *node;
244
    while ((node = TD_DLIST_HEAD(pInfo->pFreeBlocks)) != NULL) {
824✔
245
      TD_DLIST_POP(pInfo->pFreeBlocks, node);
412✔
246
      extWinDestroyBlkNode(pInfo, node);
412✔
247
    }
248
    taosMemoryFree(pInfo->pFreeBlocks);
412✔
249
  }
250
  
251
  cleanupAggSup(&pInfo->aggSup);
73,512✔
252
  cleanupExprSupp(&pInfo->scalarSupp);
73,512✔
253
  taosMemoryFreeClear(pInfo->pResultRow);
73,512✔
254

255
  pInfo->binfo.resultRowInfo.openWindow = tdListFree(pInfo->binfo.resultRowInfo.openWindow);
73,512✔
256

257
  qDebug("ext window stat at destroy, created:%" PRId64 ", destroyed:%" PRId64 ", recycled:%" PRId64 ", reused:%" PRId64 ", append:%" PRId64, 
73,512✔
258
      pInfo->stat.resBlockCreated, pInfo->stat.resBlockDestroyed, pInfo->stat.resBlockRecycled, 
259
      pInfo->stat.resBlockReused, pInfo->stat.resBlockAppend);
260

261
  taosMemoryFreeClear(pInfo);
73,512✔
262
}
263

264
static int32_t extWinOpen(SOperatorInfo* pOperator);
265
static int32_t extWinNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
266

267
typedef struct SMergeAlignedExternalWindowOperator {
268
  SExternalWindowOperator* pExtW;
269
  int64_t curTs;
270
  SResultRow*  pResultRow;
271
} SMergeAlignedExternalWindowOperator;
272

273
void destroyMergeAlignedExternalWindowOperator(void* pOperator) {
2,804✔
274
  SMergeAlignedExternalWindowOperator* pMlExtInfo = (SMergeAlignedExternalWindowOperator*)pOperator;
2,804✔
275
  destroyExternalWindowOperatorInfo(pMlExtInfo->pExtW);
2,804✔
276
  taosMemoryFreeClear(pMlExtInfo);
2,804✔
277
}
2,804✔
278

279
int64_t* extWinExtractTsCol(SSDataBlock* pBlock, int32_t primaryTsIndex, SExecTaskInfo* pTaskInfo) {
4,087,730✔
280
  TSKEY* tsCols = NULL;
4,087,730✔
281

282
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
4,087,730✔
283
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, primaryTsIndex);
4,087,730✔
284
    if (!pColDataInfo) {
4,087,730✔
285
      pTaskInfo->code = terrno;
×
286
      T_LONG_JMP(pTaskInfo->env, terrno);
×
287
    }
288

289
    tsCols = (int64_t*)pColDataInfo->pData;
4,087,730✔
290
    if (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0) {
4,087,730✔
291
      int32_t code = blockDataUpdateTsWindow(pBlock, primaryTsIndex);
4,087,730✔
292
      if (code != TSDB_CODE_SUCCESS) {
4,087,730✔
293
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
294
        pTaskInfo->code = code;
×
295
        T_LONG_JMP(pTaskInfo->env, code);
×
296
      }
297
    }
298
  }
299

300
  return tsCols;
4,087,730✔
301
}
302

303
static int32_t extWinGetCurWinIdx(SExecTaskInfo* pTaskInfo) {
24,006,879✔
304
  return pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx;
24,006,879✔
305
}
306

307
static void extWinIncCurWinIdx(SOperatorInfo* pOperator) {
×
308
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
×
309
  pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx++;
×
310
}
×
311

312
static void extWinSetCurWinIdx(SOperatorInfo* pOperator, int32_t idx) {
23,717,163✔
313
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
23,717,163✔
314
  pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx = idx;
23,717,555✔
315
}
23,717,559✔
316

317

318
static void extWinIncCurWinOutIdx(SStreamRuntimeInfo* pStreamRuntimeInfo) {
824✔
319
  pStreamRuntimeInfo->funcInfo.curOutIdx++;
824✔
320
}
824✔
321

322

323
static const STimeWindow* extWinGetNextWin(SExternalWindowOperator* pExtW, SExecTaskInfo* pTaskInfo) {
×
324
  int32_t curIdx = extWinGetCurWinIdx(pTaskInfo);
×
325
  if (curIdx + 1 >= pExtW->pWins->size) return NULL;
×
326
  return taosArrayGet(pExtW->pWins, curIdx + 1);
×
327
}
328

329

330
static int32_t extWinAppendWinIdx(SExecTaskInfo*       pTaskInfo, SArray* pIdx, SSDataBlock* pBlock, int32_t currWinIdx, int32_t rows) {
244,764✔
331
  int32_t  code = 0, lino = 0;
244,764✔
332
  int64_t* lastRes = taosArrayGetLast(pIdx);
244,764✔
333
  int32_t* lastWinIdx = (int32_t*)lastRes;
244,764✔
334
  int32_t* lastRowIdx = lastWinIdx ? (lastWinIdx + 1) : NULL;
244,764✔
335
  int64_t  res = 0;
244,764✔
336
  int32_t* pWinIdx = (int32_t*)&res;
244,764✔
337
  int32_t* pRowIdx = pWinIdx + 1;
244,764✔
338

339
  if (lastWinIdx && *lastWinIdx == currWinIdx) {
244,764✔
340
    return code;
1,236✔
341
  }
342

343
  *pWinIdx = currWinIdx;
243,528✔
344
  *pRowIdx = pBlock->info.rows - rows;
243,137✔
345

346
  TSDB_CHECK_NULL(taosArrayPush(pIdx, &res), code, lino, _exit, terrno);
243,528✔
347

348
_exit:
243,528✔
349

350
  if (code) {
243,528✔
351
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
352
  }
353

354
  return code;
243,528✔
355
}
356

357

358
static int32_t mergeAlignExtWinSetOutputBuf(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, const STimeWindow* pWin, SResultRow** pResult,
3,648✔
359
                                       SExprSupp* pExprSup, SAggSupporter* pAggSup) {
360
  if (*pResult == NULL) {
3,648✔
361
    *pResult = getNewResultRow(pAggSup->pResultBuf, &pAggSup->currentPageId, pAggSup->resultRowSize);
2,412✔
362
    if (!*pResult) {
2,412✔
363
      qError("get new resultRow failed, err:%s", tstrerror(terrno));
×
364
      return terrno;
×
365
    }
366
    pResultRowInfo->cur = (SResultRowPosition){.pageId = (*pResult)->pageId, .offset = (*pResult)->offset};
2,412✔
367
  }
368
  
369
  (*pResult)->win = *pWin;
3,648✔
370
  (*pResult)->winIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
3,648✔
371
  
372
  return setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
3,648✔
373
}
374

375

376
static int32_t mergeAlignExtWinGetWinFromTs(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW, TSKEY ts, STimeWindow** ppWin) {
3,648✔
377
  int32_t blkWinIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
3,648✔
378
  
379
  // TODO handle desc order
380
  for (int32_t i = blkWinIdx; i < pExtW->pWins->size; ++i) {
4,884✔
381
    STimeWindow* pWin = taosArrayGet(pExtW->pWins, i);
4,884✔
382
    if (ts == pWin->skey) {
4,884✔
383
      extWinSetCurWinIdx(pOperator, i);
3,648✔
384
      *ppWin = pWin;
3,648✔
385
      return TSDB_CODE_SUCCESS;
3,648✔
386
    } else if (ts < pWin->skey) {
1,236✔
387
      qError("invalid ts %" PRId64 " for current window idx %d skey %" PRId64, ts, i, pWin->skey);
×
388
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
389
    }
390
  }
391
  
392
  qError("invalid ts %" PRId64 " to find merge aligned ext window, size:%d", ts, (int32_t)pExtW->pWins->size);
×
393
  return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
394
}
395

396
static int32_t mergeAlignExtWinFinalizeResult(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, SSDataBlock* pResultBlock) {
3,648✔
397
  int32_t        code = 0, lino = 0;
3,648✔
398
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
3,648✔
399
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
3,648✔
400
  SExprSupp*     pSup = &pOperator->exprSupp;
3,648✔
401
  SResultRow*  pResultRow = pMlExtInfo->pResultRow;
3,648✔
402
  
403
  finalizeResultRows(pExtW->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pOperator->pTaskInfo);
3,648✔
404
  
405
  if (pResultRow->numOfRows > 0) {
3,648✔
406
    TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pExtW->pWinRowIdx, pResultBlock, pResultRow->winIdx, pResultRow->numOfRows));
3,648✔
407
  }
408

409
_exit:
3,648✔
410

411
  if (code) {
3,648✔
412
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
413
  }
414

415
  return code;
3,648✔
416
}
417

418
static int32_t mergeAlignExtWinAggDo(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock, SSDataBlock* pResultBlock) {
2,412✔
419
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
2,412✔
420
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
2,412✔
421

422
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2,412✔
423
  SExprSupp*     pSup = &pOperator->exprSupp;
2,412✔
424
  int32_t        code = 0, lino = 0;
2,412✔
425
  STimeWindow *pWin = NULL;
2,412✔
426

427
  int32_t startPos = 0;
2,412✔
428
  int64_t* tsCols = extWinExtractTsCol(pBlock, pExtW->primaryTsIndex, pTaskInfo);
2,412✔
429
  TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
2,412✔
430
  
431
  code = mergeAlignExtWinGetWinFromTs(pOperator, pExtW, ts, &pWin);
2,412✔
432
  if (code) {
2,412✔
433
    qError("failed to get time window for ts:%" PRId64 ", prim ts index:%d, error:%s", ts, pExtW->primaryTsIndex, tstrerror(code));
×
434
    TAOS_CHECK_EXIT(code);
×
435
  }
436

437
  if (pMlExtInfo->curTs != INT64_MIN && pMlExtInfo->curTs != pWin->skey) {
2,412✔
438
    TAOS_CHECK_EXIT(mergeAlignExtWinFinalizeResult(pOperator, pResultRowInfo, pResultBlock));
×
439
    resetResultRow(pMlExtInfo->pResultRow, pExtW->aggSup.resultRowSize - sizeof(SResultRow));
×
440
  }
441
  
442
  TAOS_CHECK_EXIT(mergeAlignExtWinSetOutputBuf(pOperator, pResultRowInfo, pWin, &pMlExtInfo->pResultRow, pSup, &pExtW->aggSup));
2,412✔
443

444
  int32_t currPos = startPos;
2,412✔
445
  pMlExtInfo->curTs = pWin->skey;
2,412✔
446
  
447
  while (++currPos < pBlock->info.rows) {
6,120✔
448
    if (tsCols[currPos] == pMlExtInfo->curTs) continue;
3,708✔
449

450
    qDebug("current ts:%" PRId64 ", startPos:%d, currPos:%d, tsCols[currPos]:%" PRId64,
1,236✔
451
      pMlExtInfo->curTs, startPos, currPos, tsCols[currPos]); 
452
    TAOS_CHECK_EXIT(applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pExtW->twAggSup.timeWindowData, startPos,
1,236✔
453
                                           currPos - startPos, pBlock->info.rows, pSup->numOfExprs));
454

455
    TAOS_CHECK_EXIT(mergeAlignExtWinFinalizeResult(pOperator, pResultRowInfo, pResultBlock));
1,236✔
456
    resetResultRow(pMlExtInfo->pResultRow, pExtW->aggSup.resultRowSize - sizeof(SResultRow));
1,236✔
457

458
    TAOS_CHECK_EXIT(mergeAlignExtWinGetWinFromTs(pOperator, pExtW, tsCols[currPos], &pWin));
1,236✔
459
    
460
    qDebug("ext window align2 start:%" PRId64 ", end:%" PRId64, pWin->skey, pWin->ekey);
1,236✔
461
    startPos = currPos;
1,236✔
462
    
463
    TAOS_CHECK_EXIT(mergeAlignExtWinSetOutputBuf(pOperator, pResultRowInfo, pWin, &pMlExtInfo->pResultRow, pSup, &pExtW->aggSup));
1,236✔
464

465
    pMlExtInfo->curTs = pWin->skey;
1,236✔
466
  }
467

468
  code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pExtW->twAggSup.timeWindowData, startPos,
4,824✔
469
                                         currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
2,412✔
470

471
_exit:
2,412✔
472

473
  if (code != 0) {
2,412✔
474
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
475
    T_LONG_JMP(pTaskInfo->env, code);
×
476
  }
477
  
478
  return code;
2,412✔
479
}
480

481
static int32_t mergeAlignExtWinBuildWinRowIdx(SOperatorInfo* pOperator, SSDataBlock* pInput, SSDataBlock* pResult) {
×
482
  SExternalWindowOperator* pExtW = pOperator->info;
×
483
  int64_t* tsCols = extWinExtractTsCol(pInput, pExtW->primaryTsIndex, pOperator->pTaskInfo);
×
484
  STimeWindow* pWin = NULL;
×
485
  int32_t code = 0, lino = 0;
×
486
  int64_t prevTs = INT64_MIN;
×
487
  
488
  for (int32_t i = 0; i < pInput->info.rows; ++i) {
×
489
    if (prevTs == tsCols[i]) {
×
490
      continue;
×
491
    }
492
    
493
    TAOS_CHECK_EXIT(mergeAlignExtWinGetWinFromTs(pOperator, pExtW, tsCols[i], &pWin));
×
494
    TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pExtW->pWinRowIdx, pResult, extWinGetCurWinIdx(pOperator->pTaskInfo), pInput->info.rows - i));
×
495

496
    prevTs = tsCols[i];
×
497
  }
498

499
_exit:
×
500

501
  if (code != 0) {
×
502
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
503
  }
504

505
  return code;  
×
506
}
507

508
static int32_t mergeAlignExtWinProjectDo(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
×
509
                                            SSDataBlock* pResultBlock) {
510
  SExternalWindowOperator* pExtW = pOperator->info;
×
511
  SExprSupp*               pExprSup = &pExtW->scalarSupp;
×
512
  int32_t                  code = 0, lino = 0;
×
513
  
514
  TAOS_CHECK_EXIT(projectApplyFunctions(pExprSup->pExprInfo, pResultBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
×
515
                        GET_STM_RTINFO(pOperator->pTaskInfo)));
516

517
  TAOS_CHECK_EXIT(mergeAlignExtWinBuildWinRowIdx(pOperator, pBlock, pResultBlock));
×
518

519
_exit:
×
520

521
  if (code != 0) {
×
522
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
523
  }
524

525
  return code;
×
526
}
527

528
void mergeAlignExtWinDo(SOperatorInfo* pOperator) {
2,804✔
529
  SExecTaskInfo*                       pTaskInfo = pOperator->pTaskInfo;
2,804✔
530
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
2,804✔
531
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
2,804✔
532
  SResultRow*                          pResultRow = NULL;
2,804✔
533
  int32_t                              code = 0;
2,804✔
534
  SSDataBlock*                         pRes = pExtW->binfo.pRes;
2,804✔
535
  SExprSupp*                           pSup = &pOperator->exprSupp;
2,804✔
536
  int32_t                              lino = 0;
2,804✔
537

538
  taosArrayClear(pExtW->pWinRowIdx);
2,804✔
539
  blockDataCleanup(pRes);
2,804✔
540

541
  while (1) {
2,412✔
542
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
5,216✔
543

544
    if (pBlock == NULL) {
5,216✔
545
      // close last time window
546
      if (pMlExtInfo->curTs != INT64_MIN && EEXT_MODE_AGG == pExtW->mode) {
2,804✔
547
        TAOS_CHECK_EXIT(mergeAlignExtWinFinalizeResult(pOperator, &pExtW->binfo.resultRowInfo, pRes));
2,412✔
548
      }
549
      setOperatorCompleted(pOperator);
2,804✔
550
      break;
2,804✔
551
    }
552

553
    pRes->info.scanFlag = pBlock->info.scanFlag;
2,412✔
554
    code = setInputDataBlock(pSup, pBlock, pExtW->binfo.inputTsOrder, pBlock->info.scanFlag, true);
2,412✔
555
    QUERY_CHECK_CODE(code, lino, _exit);
2,412✔
556

557
    printDataBlock(pBlock, __func__, "externalwindowAlign", pTaskInfo->id.queryId);
2,412✔
558
    qDebug("ext windowpExtWAlign->scalarMode:%d", pExtW->mode);
2,412✔
559

560
    if (EEXT_MODE_SCALAR == pExtW->mode) {
2,412✔
561
      TAOS_CHECK_EXIT(mergeAlignExtWinProjectDo(pOperator, &pExtW->binfo.resultRowInfo, pBlock, pRes));
×
562
    } else {
563
      TAOS_CHECK_EXIT(mergeAlignExtWinAggDo(pOperator, &pExtW->binfo.resultRowInfo, pBlock, pRes));
2,412✔
564
    }
565

566
    if (pRes->info.rows >= pOperator->resultInfo.threshold) {
2,412✔
567
      break;
×
568
    }
569
  }
570

571
  pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamBlkWinIdx = pExtW->pWinRowIdx;
2,804✔
572
  
573
_exit:
2,804✔
574

575
  if (code != 0) {
2,804✔
576
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
577
    pTaskInfo->code = code;
×
578
    T_LONG_JMP(pTaskInfo->env, code);
×
579
  }
580
}
2,804✔
581

582
static int32_t mergeAlignExtWinNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
5,216✔
583
  SExecTaskInfo*                       pTaskInfo = pOperator->pTaskInfo;
5,216✔
584
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
5,216✔
585
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
5,216✔
586
  int32_t                              code = 0;
5,216✔
587
  int32_t lino = 0;
5,216✔
588

589
  if (pOperator->status == OP_EXEC_DONE) {
5,216✔
590
    (*ppRes) = NULL;
2,412✔
591
    return TSDB_CODE_SUCCESS;
2,412✔
592
  }
593

594
  SSDataBlock* pRes = pExtW->binfo.pRes;
2,804✔
595
  blockDataCleanup(pRes);
2,804✔
596

597
  if (taosArrayGetSize(pExtW->pWins) <= 0) {
2,804✔
598
    size_t size = taosArrayGetSize(pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPesudoFuncVals);
2,804✔
599
    STimeWindow* pWin = taosArrayReserve(pExtW->pWins, size);
2,804✔
600
    TSDB_CHECK_NULL(pWin, code, lino, _exit, terrno);
2,804✔
601

602
    for (int32_t i = 0; i < size; ++i) {
6,844✔
603
      SSTriggerCalcParam* pParam = taosArrayGet(pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPesudoFuncVals, i);
4,040✔
604
      pWin[i].skey = pParam->wstart;
4,040✔
605
      pWin[i].ekey = pParam->wstart + 1;
4,040✔
606
    }
607
    
608
    pExtW->outputWinId = pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx;
2,804✔
609
  }
610

611
  mergeAlignExtWinDo(pOperator);
2,804✔
612
  
613
  size_t rows = pRes->info.rows;
2,804✔
614
  pOperator->resultInfo.totalRows += rows;
2,804✔
615
  (*ppRes) = (rows == 0) ? NULL : pRes;
2,804✔
616

617
_exit:
2,804✔
618

619
  if (code != 0) {
2,804✔
620
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
621
    pTaskInfo->code = code;
×
622
    T_LONG_JMP(pTaskInfo->env, code);
×
623
  }
624
  return code;
2,804✔
625
}
626

627
int32_t resetMergeAlignedExtWinOperator(SOperatorInfo* pOperator) {
2,804✔
628
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
2,804✔
629
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
2,804✔
630
  SExecTaskInfo*                       pTaskInfo = pOperator->pTaskInfo;
2,804✔
631
  SMergeAlignedIntervalPhysiNode * pPhynode = (SMergeAlignedIntervalPhysiNode*)pOperator->pPhyNode;
2,804✔
632
  pOperator->status = OP_NOT_OPENED;
2,804✔
633

634
  taosArrayClear(pExtW->pWins);
2,804✔
635

636
  resetBasicOperatorState(&pExtW->binfo);
2,804✔
637
  pMlExtInfo->pResultRow = NULL;
2,804✔
638
  pMlExtInfo->curTs = INT64_MIN;
2,804✔
639

640
  int32_t code = resetAggSup(&pOperator->exprSupp, &pExtW->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
5,608✔
641
                             sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
2,804✔
642
                             &pTaskInfo->storageAPI.functionStore);
643
  if (code == 0) {
2,804✔
644
    colDataDestroy(&pExtW->twAggSup.timeWindowData);
2,804✔
645
    code = initExecTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pTaskInfo->window);
2,804✔
646
  }
647
  return code;
2,804✔
648
}
649

650
int32_t createMergeAlignedExternalWindowOperator(SOperatorInfo* pDownstream, SPhysiNode* pNode,
2,804✔
651
                                                 SExecTaskInfo* pTaskInfo, SOperatorInfo** ppOptrOut) {
652
  SMergeAlignedIntervalPhysiNode* pPhynode = (SMergeAlignedIntervalPhysiNode*)pNode;
2,804✔
653
  int32_t code = 0;
2,804✔
654
  int32_t lino = 0;
2,804✔
655
  SMergeAlignedExternalWindowOperator* pMlExtInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedExternalWindowOperator));
2,804✔
656
  SOperatorInfo*                       pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2,804✔
657

658
  if (pTaskInfo->pStreamRuntimeInfo != NULL){
2,804✔
659
    pTaskInfo->pStreamRuntimeInfo->funcInfo.withExternalWindow = true;
2,804✔
660
  }
661
  pOperator->pPhyNode = pNode;
2,804✔
662
  if (!pMlExtInfo || !pOperator) {
2,804✔
663
    code = terrno;
×
664
    goto _error;
×
665
  }
666

667
  pMlExtInfo->pExtW = taosMemoryCalloc(1, sizeof(SExternalWindowOperator));
2,804✔
668
  if (!pMlExtInfo->pExtW) {
2,804✔
669
    code = terrno;
×
670
    goto _error;
×
671
  }
672

673
  SExternalWindowOperator* pExtW = pMlExtInfo->pExtW;
2,804✔
674
  SExprSupp* pSup = &pOperator->exprSupp;
2,804✔
675
  pSup->hasWindowOrGroup = true;
2,804✔
676
  pMlExtInfo->curTs = INT64_MIN;
2,804✔
677

678
  pExtW->primaryTsIndex = ((SColumnNode*)pPhynode->window.pTspk)->slotId;
2,804✔
679
  pExtW->mode = pPhynode->window.pProjs ? EEXT_MODE_SCALAR : EEXT_MODE_AGG;
2,804✔
680
  pExtW->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC;
2,804✔
681
  pExtW->binfo.outputTsOrder = pExtW->binfo.inputTsOrder;
2,804✔
682

683
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
2,804✔
684
  initResultSizeInfo(&pOperator->resultInfo, 4096);
2,804✔
685

686
  int32_t num = 0;
2,804✔
687
  SExprInfo* pExprInfo = NULL;
2,804✔
688
  code = createExprInfo(pPhynode->window.pFuncs, NULL, &pExprInfo, &num);
2,804✔
689
  QUERY_CHECK_CODE(code, lino, _error);
2,804✔
690

691
  if (pExtW->mode == EEXT_MODE_AGG) {
2,804✔
692
    code = initAggSup(pSup, &pExtW->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
2,804✔
693
                      &pTaskInfo->storageAPI.functionStore);
694
    QUERY_CHECK_CODE(code, lino, _error);
2,804✔
695
  }
696

697
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhynode->window.node.pOutputDataBlockDesc);
2,804✔
698
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
2,804✔
699
  initBasicInfo(&pExtW->binfo, pResBlock);
2,804✔
700

701
  pExtW->pWins = taosArrayInit(4096, sizeof(STimeWindow));
2,804✔
702
  if (!pExtW->pWins) QUERY_CHECK_CODE(terrno, lino, _error);
2,804✔
703

704
  pExtW->pWinRowIdx = taosArrayInit(4096, sizeof(int64_t));
2,804✔
705
  TSDB_CHECK_NULL(pExtW->pWinRowIdx, code, lino, _error, terrno);
2,804✔
706

707
  initResultRowInfo(&pExtW->binfo.resultRowInfo);
2,804✔
708
  code = blockDataEnsureCapacity(pExtW->binfo.pRes, pOperator->resultInfo.capacity);
2,804✔
709
  QUERY_CHECK_CODE(code, lino, _error);
2,804✔
710
  setOperatorInfo(pOperator, "MergeAlignedExternalWindowOperator", QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW, false, OP_NOT_OPENED, pMlExtInfo, pTaskInfo);
2,804✔
711
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignExtWinNext, NULL,
2,804✔
712
                                         destroyMergeAlignedExternalWindowOperator, optrDefaultBufFn, NULL,
713
                                         optrDefaultGetNextExtFn, NULL);
714
  setOperatorResetStateFn(pOperator, resetMergeAlignedExtWinOperator);
2,804✔
715

716
  code = appendDownstream(pOperator, &pDownstream, 1);
2,804✔
717
  QUERY_CHECK_CODE(code, lino, _error);
2,804✔
718
  *ppOptrOut = pOperator;
2,804✔
719
  return code;
2,804✔
720
  
721
_error:
×
722
  if (pMlExtInfo) destroyMergeAlignedExternalWindowOperator(pMlExtInfo);
×
723
  destroyOperatorAndDownstreams(pOperator, &pDownstream, 1);
×
724
  pTaskInfo->code = code;
×
725
  return code;
×
726
}
727

728
static int32_t resetExternalWindowExprSupp(SExternalWindowOperator* pExtW, SExecTaskInfo* pTaskInfo,
89,920✔
729
                                           SExternalWindowPhysiNode* pPhynode) {
730
  int32_t    code = 0, lino = 0, num = 0;
89,920✔
731
  SExprInfo* pExprInfo = NULL;
89,920✔
732
  cleanupExprSuppWithoutFilter(&pExtW->scalarSupp);
89,920✔
733

734
  SNodeList* pNodeList = NULL;
89,920✔
735
  if (pPhynode->window.pProjs) {
89,920✔
736
    pNodeList = pPhynode->window.pProjs;
×
737
  } else {
738
    pNodeList = pPhynode->window.pExprs;
89,920✔
739
  }
740

741
  code = createExprInfo(pNodeList, NULL, &pExprInfo, &num);
89,920✔
742
  QUERY_CHECK_CODE(code, lino, _error);
89,920✔
743
  code = initExprSupp(&pExtW->scalarSupp, pExprInfo, num, &pTaskInfo->storageAPI.functionStore);
89,920✔
744
  QUERY_CHECK_CODE(code, lino, _error);
89,920✔
745
  return code;
89,920✔
746
_error:
×
747
  if (code != TSDB_CODE_SUCCESS) {
×
748
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
749
    pTaskInfo->code = code;
×
750
  }
751
  return code;
×
752
}
753

754
static int32_t resetExternalWindowOperator(SOperatorInfo* pOperator) {
89,920✔
755
  int32_t code = 0, lino = 0;
89,920✔
756
  SExternalWindowOperator* pExtW = pOperator->info;
89,920✔
757
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
89,920✔
758
  SExternalWindowPhysiNode* pPhynode = (SExternalWindowPhysiNode*)pOperator->pPhyNode;
89,920✔
759
  pOperator->status = OP_NOT_OPENED;
89,920✔
760

761
  //resetBasicOperatorState(&pExtW->binfo);
762
  initResultRowInfo(&pExtW->binfo.resultRowInfo);
89,920✔
763

764
  pExtW->outputWinId = 0;
89,920✔
765
  pExtW->lastWinId = -1;
89,920✔
766
  taosArrayClear(pExtW->pWins);
89,920✔
767
  extWinRecycleBlkNode(pExtW, &pExtW->pLastBlkNode);
89,920✔
768

769
/*
770
  int32_t code = blockDataEnsureCapacity(pExtW->binfo.pRes, pOperator->resultInfo.capacity);
771
  if (code == 0) {
772
    code = resetAggSup(&pOperator->exprSupp, &pExtW->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
773
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
774
                       &pTaskInfo->storageAPI.functionStore);
775
  }
776
*/
777
  TAOS_CHECK_EXIT(resetExternalWindowExprSupp(pExtW, pTaskInfo, pPhynode));
89,920✔
778
  colDataDestroy(&pExtW->twAggSup.timeWindowData);
89,920✔
779
  TAOS_CHECK_EXIT(initExecTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pTaskInfo->window));
89,920✔
780

781
  pExtW->outWinIdx = 0;
89,920✔
782
  pExtW->lastSKey = INT64_MIN;
89,920✔
783

784
  qDebug("%s ext window stat at reset, created:%" PRId64 ", destroyed:%" PRId64 ", recycled:%" PRId64 ", reused:%" PRId64 ", append:%" PRId64, 
89,920✔
785
      pTaskInfo->id.str, pExtW->stat.resBlockCreated, pExtW->stat.resBlockDestroyed, pExtW->stat.resBlockRecycled, 
786
      pExtW->stat.resBlockReused, pExtW->stat.resBlockAppend);
787

788
_exit:
12,340✔
789

790
  if (code) {
89,920✔
791
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
792
  }
793
  
794
  return code;
89,920✔
795
}
796

797
static EDealRes extWinHasCountLikeFunc(SNode* pNode, void* res) {
631,071✔
798
  if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
631,071✔
799
    SFunctionNode* pFunc = (SFunctionNode*)pNode;
215,431✔
800
    if (fmIsCountLikeFunc(pFunc->funcId) || (pFunc->hasOriginalFunc && fmIsCountLikeFunc(pFunc->originalFuncId))) {
215,431✔
801
      *(bool*)res = true;
53,223✔
802
      return DEAL_RES_END;
53,611✔
803
    }
804
  }
805
  return DEAL_RES_CONTINUE;
583,413✔
806
}
807

808

809
static int32_t extWinCreateEmptyInputBlock(SOperatorInfo* pOperator, SSDataBlock** ppBlock) {
52,822✔
810
  int32_t code = TSDB_CODE_SUCCESS;
52,822✔
811
  int32_t lino = 0;
52,822✔
812
  SSDataBlock* pBlock = NULL;
52,822✔
813
  if (!tsCountAlwaysReturnValue) {
53,611✔
814
    return TSDB_CODE_SUCCESS;
×
815
  }
816

817
  SExternalWindowOperator* pExtW = pOperator->info;
53,611✔
818

819
  if (!pExtW->hasCountFunc) {
53,611✔
820
    return TSDB_CODE_SUCCESS;
×
821
  }
822

823
  code = createDataBlock(&pBlock);
53,611✔
824
  if (code) {
53,611✔
825
    return code;
×
826
  }
827

828
  pBlock->info.rows = 1;
53,611✔
829
  pBlock->info.capacity = 0;
53,611✔
830

831
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
255,229✔
832
    SColumnInfoData colInfo = {0};
200,836✔
833
    colInfo.hasNull = true;
200,836✔
834
    colInfo.info.type = TSDB_DATA_TYPE_NULL;
200,836✔
835
    colInfo.info.bytes = 1;
200,836✔
836

837
    SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
200,836✔
838
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
427,546✔
839
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
227,101✔
840
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
227,101✔
841
        int32_t slotId = pFuncParam->pCol->slotId;
153,976✔
842
        int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
153,585✔
843
        if (slotId >= numOfCols) {
154,758✔
844
          code = taosArrayEnsureCap(pBlock->pDataBlock, slotId + 1);
99,427✔
845
          QUERY_CHECK_CODE(code, lino, _end);
99,036✔
846

847
          for (int32_t k = numOfCols; k < slotId + 1; ++k) {
198,072✔
848
            void* tmp = taosArrayPush(pBlock->pDataBlock, &colInfo);
99,427✔
849
            QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
99,036✔
850
          }
851
        }
852
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
72,734✔
853
        // do nothing
854
      }
855
    }
856
  }
857

858
  code = blockDataEnsureCapacity(pBlock, pBlock->info.rows);
53,611✔
859
  QUERY_CHECK_CODE(code, lino, _end);
53,611✔
860

861
  for (int32_t i = 0; i < blockDataGetNumOfCols(pBlock); ++i) {
153,038✔
862
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
99,427✔
863
    QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
99,427✔
864
    colDataSetNULL(pColInfoData, 0);
865
  }
866
  *ppBlock = pBlock;
53,611✔
867

868
_end:
53,611✔
869
  if (code != TSDB_CODE_SUCCESS) {
53,611✔
870
    blockDataDestroy(pBlock);
×
871
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
872
  }
873
  return code;
53,611✔
874
}
875

876

877

878
static int extWinTsWinCompare(const void* pLeft, const void* pRight) {
6,287,463✔
879
  int64_t ts = *(int64_t*)pLeft;
6,287,463✔
880
  SExtWinTimeWindow* pWin = (SExtWinTimeWindow*)pRight;
6,287,463✔
881
  if (ts < pWin->tw.skey) {
6,287,463✔
882
    return -1;
2,218,543✔
883
  }
884
  if (ts >= pWin->tw.ekey) {
4,068,920✔
885
    return 1;
120,202✔
886
  }
887

888
  return 0;
3,948,718✔
889
}
890

891

892
static int32_t extWinGetMultiTbWinFromTs(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW, int64_t* tsCol, int64_t rowNum, int32_t* startPos) {
2,280,972✔
893
  int32_t idx = taosArraySearchIdx(pExtW->pWins, tsCol, extWinTsWinCompare, TD_EQ);
2,280,972✔
894
  if (idx >= 0) {
2,280,972✔
895
    *startPos = 0;
2,280,118✔
896
    return idx;
2,280,118✔
897
  }
898

899
  SExtWinTimeWindow* pWin = NULL;
854✔
900
  int32_t w = 0;
854✔
901
  for (int64_t i = 1; i < rowNum; ++i) {
1,708✔
902
    for (; w < pExtW->pWins->size; ++w) {
1,708✔
903
      pWin = TARRAY_GET_ELEM(pExtW->pWins, w);
1,708✔
904
      if (tsCol[i] < pWin->tw.skey) {
1,708✔
905
        break;
854✔
906
      }
907
      
908
      if (tsCol[i] < pWin->tw.ekey) {
854✔
909
        *startPos = i;
×
910
        return w;
×
911
      }
912
    }
913
  }
914

915
  return -1;
854✔
916
}
917

918
static int32_t extWinGetNoOvlpWin(SOperatorInfo* pOperator, int64_t* tsCol, int32_t* startPos, SDataBlockInfo* pInfo, SExtWinTimeWindow** ppWin, int32_t* winRows) {
48,257✔
919
  SExternalWindowOperator* pExtW = pOperator->info;
48,257✔
920
  if ((*startPos) >= pInfo->rows) {
48,257✔
921
    qDebug("%s %s blk rowIdx %d reach the end, size: %d, skip block", 
16,912✔
922
        GET_TASKID(pOperator->pTaskInfo), __func__, *startPos, (int32_t)pInfo->rows);
923
    *ppWin = NULL;
16,912✔
924
    return TSDB_CODE_SUCCESS;
16,912✔
925
  }
926
  
927
  if (pExtW->blkWinIdx < 0) {
31,345✔
928
    pExtW->blkWinIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
16,912✔
929
  } else {
930
    pExtW->blkWinIdx++;
14,433✔
931
  }
932

933
  if (pExtW->blkWinIdx >= pExtW->pWins->size) {
31,345✔
934
    qDebug("%s %s ext win blk idx %d reach the end, size: %d, skip block", 
×
935
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (int32_t)pExtW->pWins->size);
936
    *ppWin = NULL;
×
937
    return TSDB_CODE_SUCCESS;
×
938
  }
939
  
940
  SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
31,345✔
941
  if (tsCol[pInfo->rows - 1] < pWin->tw.skey) {
31,345✔
942
    qDebug("%s %s block end ts %" PRId64 " is small than curr win %d skey %" PRId64 ", skip block", 
×
943
        GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[pInfo->rows - 1], pExtW->blkWinIdx, pWin->tw.skey);
944
    *ppWin = NULL;
×
945
    return TSDB_CODE_SUCCESS;
×
946
  }
947

948
  int32_t r = *startPos;
31,345✔
949

950
  qDebug("%s %s start to get novlp win from winIdx %d rowIdx %d", GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, r);
31,345✔
951

952
  // TODO handle desc order
953
  for (; pExtW->blkWinIdx < pExtW->pWins->size; ++pExtW->blkWinIdx) {
39,690✔
954
    pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
39,690✔
955
    for (; r < pInfo->rows; ++r) {
41,958✔
956
      if (tsCol[r] < pWin->tw.skey) {
41,958✔
957
        continue;
2,268✔
958
      }
959

960
      if (tsCol[r] < pWin->tw.ekey) {
39,690✔
961
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
31,345✔
962
        *ppWin = pWin;
31,345✔
963
        *startPos = r;
31,345✔
964
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
31,345✔
965

966
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
31,345✔
967
            GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, r, tsCol[r], tsCol[r + *winRows - 1]);
968
        
969
        return TSDB_CODE_SUCCESS;
31,345✔
970
      }
971

972
      break;
8,345✔
973
    }
974

975
    if (r == pInfo->rows) {
8,345✔
976
      break;
×
977
    }
978
  }
979

980
  qDebug("%s %s no more ext win in block, TR[%" PRId64 ", %" PRId64 "), skip it", 
×
981
      GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
982

983
  *ppWin = NULL;
×
984
  return TSDB_CODE_SUCCESS;
×
985
}
986

987
static int32_t extWinGetOvlpWin(SOperatorInfo* pOperator, int64_t* tsCol, int32_t* startPos, SDataBlockInfo* pInfo, SExtWinTimeWindow** ppWin, int32_t* winRows) {
2,646✔
988
  SExternalWindowOperator* pExtW = pOperator->info;
2,646✔
989
  if (pExtW->blkWinIdx < 0) {
2,646✔
990
    pExtW->blkWinIdx = pExtW->blkWinStartIdx;
1,134✔
991
  } else {
992
    pExtW->blkWinIdx++;
1,512✔
993
  }
994

995
  if (pExtW->blkWinIdx >= pExtW->pWins->size) {
2,646✔
996
    qDebug("%s %s ext win blk idx %d reach the end, size: %d, skip block", 
1,134✔
997
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (int32_t)pExtW->pWins->size);
998
    *ppWin = NULL;
1,134✔
999
    return TSDB_CODE_SUCCESS;
1,134✔
1000
  }
1001
  
1002
  SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
1,512✔
1003
  if (tsCol[pInfo->rows - 1] < pWin->tw.skey) {
1,512✔
1004
    qDebug("%s %s block end ts %" PRId64 " is small than curr win %d skey %" PRId64 ", skip block", 
×
1005
        GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[pInfo->rows - 1], pExtW->blkWinIdx, pWin->tw.skey);
1006
    *ppWin = NULL;
×
1007
    return TSDB_CODE_SUCCESS;
×
1008
  }
1009

1010
  int64_t r = 0;
1,512✔
1011

1012
  qDebug("%s %s start to get ovlp win from winIdx %d rowIdx %d", GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pExtW->blkRowStartIdx);
1,512✔
1013
  
1014
  // TODO handle desc order
1015
  for (; pExtW->blkWinIdx < pExtW->pWins->size; ++pExtW->blkWinIdx) {
1,512✔
1016
    pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
1,512✔
1017
    for (r = pExtW->blkRowStartIdx; r < pInfo->rows; ++r) {
4,914✔
1018
      if (tsCol[r] < pWin->tw.skey) {
4,914✔
1019
        pExtW->blkRowStartIdx = r + 1;
3,402✔
1020
        continue;
3,402✔
1021
      }
1022

1023
      if (tsCol[r] < pWin->tw.ekey) {
1,512✔
1024
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
1,512✔
1025
        *ppWin = pWin;
1,512✔
1026
        *startPos = r;
1,512✔
1027
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
1,512✔
1028

1029
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
1,512✔
1030
            GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, (int32_t)r, tsCol[r], tsCol[r + *winRows - 1]);
1031
        
1032
        if ((r + *winRows) < pInfo->rows) {
1,512✔
1033
          pExtW->blkWinStartIdx = pExtW->blkWinIdx + 1;
378✔
1034
          pExtW->blkWinStartSet = true;
378✔
1035
        }
1036
        
1037
        return TSDB_CODE_SUCCESS;
1,512✔
1038
      }
1039

1040
      break;
×
1041
    }
1042

1043
    if (r >= pInfo->rows) {
×
1044
      if (!pExtW->blkWinStartSet) {
×
1045
        pExtW->blkWinStartIdx = pExtW->blkWinIdx;
×
1046
      }
1047
      
1048
      break;
×
1049
    }
1050
  }
1051

1052
  qDebug("%s %s no more ext win in block, TR[%" PRId64 ", %" PRId64 "), skip it", 
×
1053
      GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1054

1055
  *ppWin = NULL;
×
1056
  return TSDB_CODE_SUCCESS;
×
1057
}
1058

1059

1060
static int32_t extWinGetMultiTbNoOvlpWin(SOperatorInfo* pOperator, int64_t* tsCol, int32_t* startPos, SDataBlockInfo* pInfo, SExtWinTimeWindow** ppWin, int32_t* winRows) {
11,749,908✔
1061
  SExternalWindowOperator* pExtW = pOperator->info;
11,749,908✔
1062
  if ((*startPos) >= pInfo->rows) {
11,750,296✔
1063
    qDebug("%s %s blk rowIdx %d reach the end, size: %d, skip block", 
2,280,118✔
1064
        GET_TASKID(pOperator->pTaskInfo), __func__, *startPos, (int32_t)pInfo->rows);
1065
    *ppWin = NULL;
2,280,506✔
1066
    return TSDB_CODE_SUCCESS;
2,280,118✔
1067
  }
1068
  
1069
  if (pExtW->blkWinIdx < 0) {
9,470,178✔
1070
    pExtW->blkWinIdx = extWinGetMultiTbWinFromTs(pOperator, pExtW, tsCol, pInfo->rows, startPos);
2,280,972✔
1071
    if (pExtW->blkWinIdx < 0) {
2,280,972✔
1072
      qDebug("%s %s blk TR[%" PRId64 ", %" PRId64 ") not in any win, skip block", 
854✔
1073
          GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1074
      *ppWin = NULL;
854✔
1075
      return TSDB_CODE_SUCCESS;
854✔
1076
    }
1077

1078
    extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
2,280,118✔
1079
    *ppWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
2,280,118✔
1080
    *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, *startPos, (*ppWin)->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
2,280,118✔
1081

1082
    qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
2,280,118✔
1083
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (*ppWin)->tw.skey, (*ppWin)->tw.ekey, *winRows, *startPos, tsCol[*startPos], tsCol[*startPos + *winRows - 1]);
1084
    
1085
    return TSDB_CODE_SUCCESS;
2,280,118✔
1086
  } else {
1087
    pExtW->blkWinIdx++;
7,189,594✔
1088
  }
1089

1090
  if (pExtW->blkWinIdx >= pExtW->pWins->size) {
7,189,982✔
1091
    qDebug("%s %s ext win blk idx %d reach the end, size: %d, skip block", 
×
1092
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (int32_t)pExtW->pWins->size);
1093
    *ppWin = NULL;
×
1094
    return TSDB_CODE_SUCCESS;
×
1095
  }
1096
  
1097
  SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
7,189,982✔
1098
  if (tsCol[pInfo->rows - 1] < pWin->tw.skey) {
7,189,982✔
1099
    qDebug("%s %s block end ts %" PRId64 " is small than curr win %d skey %" PRId64 ", skip block", 
×
1100
        GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[pInfo->rows - 1], pExtW->blkWinIdx, pWin->tw.skey);
1101
    *ppWin = NULL;
×
1102
    return TSDB_CODE_SUCCESS;
×
1103
  }
1104

1105
  int32_t r = *startPos;
7,190,370✔
1106

1107
  qDebug("%s %s start to get mnovlp win from winIdx %d rowIdx %d", GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, r);
7,190,370✔
1108

1109
  // TODO handle desc order
1110
  for (; pExtW->blkWinIdx < pExtW->pWins->size; ++pExtW->blkWinIdx) {
7,190,370✔
1111
    pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
7,190,370✔
1112
    for (; r < pInfo->rows; ++r) {
7,192,848✔
1113
      if (tsCol[r] < pWin->tw.skey) {
7,192,848✔
1114
        continue;
2,478✔
1115
      }
1116

1117
      if (tsCol[r] < pWin->tw.ekey) {
7,190,370✔
1118
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
7,190,370✔
1119
        *ppWin = pWin;
7,190,370✔
1120
        *startPos = r;
7,190,370✔
1121
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
7,190,370✔
1122

1123
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
7,190,370✔
1124
            GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, r, tsCol[r], tsCol[r + *winRows - 1]);
1125
        
1126
        return TSDB_CODE_SUCCESS;
7,190,370✔
1127
      }
1128

1129
      break;
×
1130
    }
1131

1132
    if (r == pInfo->rows) {
×
1133
      break;
×
1134
    }
1135
  }
1136

1137
  qDebug("%s %s no more ext win in block, TR[%" PRId64 ", %" PRId64 "), skip it", 
×
1138
      GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1139

1140
  *ppWin = NULL;
×
1141
  return TSDB_CODE_SUCCESS;
×
1142
}
1143

1144
static int32_t extWinGetFirstWinFromTs(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW, int64_t* tsCol,
1,786,300✔
1145
                                       int64_t rowNum, int32_t* startPos) {
1146
  SExtWinTimeWindow* pWin = NULL;
1,786,300✔
1147
  int32_t            idx = taosArraySearchIdx(pExtW->pWins, tsCol, extWinTsWinCompare, TD_EQ);
1,786,300✔
1148
  if (idx >= 0) {
1,786,300✔
1149
    for (int i = idx - 1; i >= 0; --i) {
1,668,600✔
1150
      pWin = TARRAY_GET_ELEM(pExtW->pWins, i);
×
1151
      if (extWinTsWinCompare(tsCol, pWin) == 0) {
×
1152
        idx = i;
×
1153
      } else {
1154
        break;
×
1155
      }
1156
    }
1157
    *startPos = 0;
1,668,600✔
1158
    return idx;
1,668,600✔
1159
  }
1160

1161
  pWin = NULL;
117,700✔
1162
  int32_t w = 0;
117,700✔
1163
  for (int64_t i = 1; i < rowNum; ++i) {
236,186✔
1164
    for (; w < pExtW->pWins->size; ++w) {
275,486✔
1165
      pWin = TARRAY_GET_ELEM(pExtW->pWins, w);
275,486✔
1166
      if (tsCol[i] < pWin->tw.skey) {
275,486✔
1167
        break;
118,486✔
1168
      }
1169

1170
      if (tsCol[i] < pWin->tw.ekey) {
157,000✔
1171
        *startPos = i;
39,300✔
1172
        return w;
39,300✔
1173
      }
1174
    }
1175
  }
1176

1177
  return -1;
78,400✔
1178
}
1179

1180
static int32_t extWinGetMultiTbOvlpWin(SOperatorInfo* pOperator, int64_t* tsCol, int32_t* startPos, SDataBlockInfo* pInfo, SExtWinTimeWindow** ppWin, int32_t* winRows) {
4,115,700✔
1181
  SExternalWindowOperator* pExtW = pOperator->info;
4,115,700✔
1182
  if (pExtW->blkWinIdx < 0) {
4,115,314✔
1183
    pExtW->blkWinIdx = extWinGetFirstWinFromTs(pOperator, pExtW, tsCol, pInfo->rows, startPos);
1,786,300✔
1184
    if (pExtW->blkWinIdx < 0) {
1,786,300✔
1185
      qDebug("%s %s blk TR[%" PRId64 ", %" PRId64 ") not in any win, skip block", 
78,400✔
1186
          GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1187
      *ppWin = NULL;
78,400✔
1188
      return TSDB_CODE_SUCCESS;
78,400✔
1189
    }
1190

1191
    extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
1,707,900✔
1192
    *ppWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
1,707,900✔
1193
    *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, *startPos, (*ppWin)->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
1,707,900✔
1194
    
1195
    qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
1,707,900✔
1196
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (*ppWin)->tw.skey, (*ppWin)->tw.ekey, *winRows, *startPos, tsCol[*startPos], tsCol[*startPos + *winRows - 1]);
1197
    
1198
    return TSDB_CODE_SUCCESS;
1,707,900✔
1199
  } else {
1200
    pExtW->blkWinIdx++;
2,329,400✔
1201
  }
1202

1203
  if (pExtW->blkWinIdx >= pExtW->pWins->size) {
2,329,400✔
1204
    qDebug("%s %s ext win blk idx %d reach the end, size: %d, skip block", 
1,591,800✔
1205
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (int32_t)pExtW->pWins->size);
1206
    *ppWin = NULL;
1,591,800✔
1207
    return TSDB_CODE_SUCCESS;
1,591,800✔
1208
  }
1209
  
1210
  SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
737,600✔
1211
  if (tsCol[pInfo->rows - 1] < pWin->tw.skey) {
737,600✔
1212
    qDebug("%s %s block end ts %" PRId64 " is small than curr win %d skey %" PRId64 ", skip block", 
116,100✔
1213
        GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[pInfo->rows - 1], pExtW->blkWinIdx, pWin->tw.skey);
1214
    *ppWin = NULL;
116,100✔
1215
    return TSDB_CODE_SUCCESS;
116,100✔
1216
  }
1217

1218
  int64_t r = 0;
621,500✔
1219

1220
  qDebug("%s %s start to get movlp win from winIdx %d rowIdx %d", GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pExtW->blkRowStartIdx);
621,500✔
1221

1222
  // TODO handle desc order
1223
  for (; pExtW->blkWinIdx < pExtW->pWins->size; ++pExtW->blkWinIdx) {
621,500✔
1224
    pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
621,500✔
1225
    for (r = pExtW->blkRowStartIdx; r < pInfo->rows; ++r) {
2,059,944✔
1226
      if (tsCol[r] < pWin->tw.skey) {
2,059,944✔
1227
        pExtW->blkRowStartIdx = r + 1;
1,438,444✔
1228
        continue;
1,438,444✔
1229
      }
1230

1231
      if (tsCol[r] < pWin->tw.ekey) {
621,500✔
1232
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
621,500✔
1233
        *ppWin = pWin;
621,500✔
1234
        *startPos = r;
621,500✔
1235
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
621,500✔
1236

1237
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
621,500✔
1238
            GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, (int32_t)r, tsCol[r], tsCol[r + *winRows - 1]);
1239
        
1240
        return TSDB_CODE_SUCCESS;
621,500✔
1241
      }
1242

1243
      break;
×
1244
    }
1245

1246
    if (r >= pInfo->rows) {
×
1247
      break;
×
1248
    }
1249
  }
1250

1251
  qDebug("%s %s no more ext win in block, TR[%" PRId64 ", %" PRId64 "), skip it", 
×
1252
      GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1253

1254
  *ppWin = NULL;
×
1255
  return TSDB_CODE_SUCCESS;
×
1256
}
1257

1258

1259
static int32_t extWinGetWinStartPos(STimeWindow win, const SDataBlockInfo* pBlockInfo, int32_t lastEndPos, int32_t order, int32_t* nextPos, int64_t* tsCol) {
×
1260
  bool ascQuery = order == TSDB_ORDER_ASC;
×
1261

1262
  if (win.ekey <= pBlockInfo->window.skey && ascQuery) {
×
1263
    return -2;
×
1264
  }
1265
//if (win.skey > pBlockInfo->window.ekey && !ascQuery) return -2;
1266

1267
  if (win.skey > pBlockInfo->window.ekey && ascQuery) return -1;
×
1268
//if (win.ekey < pBlockInfo->window.skey && !ascQuery) return -1;
1269

1270
  while (true) {
1271
    if (win.ekey <= tsCol[lastEndPos + 1] && ascQuery) return -2;
×
1272
    if (win.skey <= tsCol[lastEndPos + 1] && ascQuery) break;
×
1273
    lastEndPos++;
×
1274
  }
1275

1276
  *nextPos = lastEndPos + 1;
×
1277
  return 0;
×
1278
}
1279

1280
static int32_t extWinAggSetWinOutputBuf(SOperatorInfo* pOperator, SExtWinTimeWindow* win, SExprSupp* pSupp, 
9,611,264✔
1281
                                     SAggSupporter* pAggSup, SExecTaskInfo* pTaskInfo) {
1282
  int32_t code = 0, lino = 0;
9,611,264✔
1283
  SResultRow* pResultRow = NULL;
9,611,264✔
1284
  SExternalWindowOperator* pExtW = (SExternalWindowOperator*)pOperator->info;
9,611,264✔
1285
  
1286
#if 0
1287
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
1288
                                                  true, tableGroupId, pTaskInfo, true, pAggSup, true);
1289
  if (pResultRow == NULL) {
1290
    qError("failed to set result output buffer, error:%s", tstrerror(pTaskInfo->code));
1291
    return pTaskInfo->code;
1292
  }
1293

1294
  qDebug("current result rows num:%d", tSimpleHashGetSize(pAggSup->pResultRowHashTable));
1295

1296
#else
1297
  if (win->winOutIdx >= 0) {
9,611,264✔
1298
    pResultRow = (SResultRow*)((char*)pExtW->pResultRow + win->winOutIdx * pAggSup->resultRowSize);
9,372,208✔
1299
  } else {
1300
    win->winOutIdx = pExtW->outWinIdx++;
239,056✔
1301
    
1302
    qDebug("set window [%" PRId64 ", %" PRId64 "] outIdx:%d", win->tw.skey, win->tw.ekey, win->winOutIdx);
239,056✔
1303

1304
    pResultRow = (SResultRow*)((char*)pExtW->pResultRow + win->winOutIdx * pAggSup->resultRowSize);
239,056✔
1305
    
1306
    memset(pResultRow, 0, pAggSup->resultRowSize);
239,056✔
1307

1308
    pResultRow->winIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
239,056✔
1309
    TAOS_SET_POBJ_ALIGNED(&pResultRow->win, &win->tw);
238,665✔
1310
  }
1311
#endif
1312

1313
  // set time window for current result
1314
  TAOS_CHECK_EXIT(setResultRowInitCtx(pResultRow, pSupp->pCtx, pSupp->numOfExprs, pSupp->rowEntryInfoOffset));
9,610,485✔
1315

1316
_exit:
9,611,264✔
1317
  
1318
  if (code) {
9,611,264✔
1319
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1320
  }
1321

1322
  return code;
9,611,264✔
1323
}
1324

1325
static int32_t extWinAggDo(SOperatorInfo* pOperator, int32_t startPos, int32_t forwardRows,
11,883,104✔
1326
                                  SSDataBlock* pInputBlock) {
1327
  if (forwardRows == 0) return 0;
11,883,104✔
1328
  SExprSupp*               pSup = &pOperator->exprSupp;
11,883,104✔
1329
  SExternalWindowOperator* pExtW = pOperator->info;
11,883,104✔
1330
  return applyAggFunctionOnPartialTuples(pOperator->pTaskInfo, pSup->pCtx, &pExtW->twAggSup.timeWindowData, startPos,
23,754,524✔
1331
                                         forwardRows, pInputBlock->info.rows, pSup->numOfExprs);
11,883,104✔
1332

1333
}
1334

1335
static bool extWinLastWinClosed(SExternalWindowOperator* pExtW) {
824✔
1336
  if (pExtW->outWinIdx <= 0 || (pExtW->multiTableMode && !pExtW->inputHasOrder)) {
824✔
1337
    return false;
824✔
1338
  }
1339

1340
  if (NULL == pExtW->timeRangeExpr || !pExtW->timeRangeExpr->needCalc) {
×
1341
    return true;
×
1342
  }
1343

1344
  SList* pList = taosArrayGetP(pExtW->pOutputBlocks, pExtW->outWinIdx - 1);
×
1345
  if (0 == listNEles(pList)) {
×
1346
    return true;
×
1347
  }
1348

1349
  SListNode* pNode = listTail(pList);
×
1350
  SArray* pBlkWinIdx = *((SArray**)pNode->data + 1);
×
1351
  int64_t* pIdx = taosArrayGetLast(pBlkWinIdx);
×
1352
  if (pIdx && *(int32_t*)pIdx < pExtW->blkWinStartIdx) {
×
1353
    return true;
×
1354
  }
1355

1356
  return false;
×
1357
}
1358

1359
static int32_t extWinGetWinResBlock(SOperatorInfo* pOperator, int32_t rows, SExtWinTimeWindow* pWin, SSDataBlock** ppRes, SArray** ppIdx) {
2,060✔
1360
  SExternalWindowOperator* pExtW = pOperator->info;
2,060✔
1361
  SList*                   pList = NULL;
2,060✔
1362
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
2,060✔
1363
  
1364
  if (pWin->winOutIdx >= 0) {
2,060✔
1365
    pList = taosArrayGetP(pExtW->pOutputBlocks, pWin->winOutIdx);
1,236✔
1366
  } else {
1367
    if (extWinLastWinClosed(pExtW)) {
824✔
1368
      pWin->winOutIdx = pExtW->outWinIdx - 1;
×
1369
      pList = taosArrayGetP(pExtW->pOutputBlocks, pWin->winOutIdx);
×
1370
    } else {
1371
      pWin->winOutIdx = pExtW->outWinIdx++;
824✔
1372
      pList = tdListNew(POINTER_BYTES * 2);
824✔
1373
      TSDB_CHECK_NULL(pList, code, lino, _exit, terrno);
824✔
1374
      SList** ppList = taosArrayGet(pExtW->pOutputBlocks, pWin->winOutIdx);
824✔
1375
      extWinRecycleBlockList(pExtW, ppList);
824✔
1376
      *ppList = pList;
824✔
1377
    }
1378
  }
1379
  
1380
  TAOS_CHECK_EXIT(extWinGetLastBlockFromList(pExtW, pList, rows, ppRes, ppIdx));
2,060✔
1381

1382
_exit:
2,060✔
1383

1384
  if (code) {
2,060✔
1385
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1386
  }
1387

1388
  return code;
2,060✔
1389
}
1390

1391
static int32_t extWinProjectDo(SOperatorInfo* pOperator, SSDataBlock* pInputBlock, int32_t startPos, int32_t rows, SExtWinTimeWindow* pWin) {
2,060✔
1392
  SExternalWindowOperator* pExtW = pOperator->info;
2,060✔
1393
  SExprSupp*               pExprSup = &pExtW->scalarSupp;
2,060✔
1394
  SSDataBlock*             pResBlock = NULL;
2,060✔
1395
  SArray*                  pIdx = NULL;
2,060✔
1396
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
2,060✔
1397
  
1398
  TAOS_CHECK_EXIT(extWinGetWinResBlock(pOperator, rows, pWin, &pResBlock, &pIdx));
2,060✔
1399

1400
  qDebug("%s %s win[%" PRId64 ", %" PRId64 "] got res block %p winRowIdx %p, winOutIdx:%d, capacity:%d", 
2,060✔
1401
      pOperator->pTaskInfo->id.str, __func__, pWin->tw.skey, pWin->tw.ekey, pResBlock, pIdx, pWin->winOutIdx, pResBlock->info.capacity);
1402
  
1403
  if (!pExtW->pTmpBlock) {
2,060✔
1404
    TAOS_CHECK_EXIT(createOneDataBlock(pInputBlock, false, &pExtW->pTmpBlock));
412✔
1405
  } else {
1406
    blockDataCleanup(pExtW->pTmpBlock);
1,648✔
1407
  }
1408
  
1409
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pExtW->pTmpBlock, TMAX(1, rows)));
2,060✔
1410

1411
  qDebug("%s %s start to copy %d rows to tmp blk", pOperator->pTaskInfo->id.str, __func__, rows);
2,060✔
1412
  TAOS_CHECK_EXIT(blockDataMergeNRows(pExtW->pTmpBlock, pInputBlock, startPos, rows));
2,060✔
1413

1414
  qDebug("%s %s start to apply project to tmp blk", pOperator->pTaskInfo->id.str, __func__);
2,060✔
1415
  TAOS_CHECK_EXIT(projectApplyFunctionsWithSelect(pExprSup->pExprInfo, pResBlock, pExtW->pTmpBlock, pExprSup->pCtx, pExprSup->numOfExprs,
2,060✔
1416
        NULL, GET_STM_RTINFO(pOperator->pTaskInfo), true, pExprSup->hasIndefRowsFunc));
1417

1418
  TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pIdx, pResBlock, extWinGetCurWinIdx(pOperator->pTaskInfo), rows));
2,060✔
1419

1420
_exit:
2,060✔
1421

1422
  if (code) {
2,060✔
1423
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1424
  } else {
1425
    qDebug("%s %s project succeed", pOperator->pTaskInfo->id.str, __func__);
2,060✔
1426
  }
1427
  
1428
  return code;
2,060✔
1429
}
1430

1431
static int32_t extWinProjectOpen(SOperatorInfo* pOperator, SSDataBlock* pInputBlock) {
1,236✔
1432
  SExternalWindowOperator* pExtW = pOperator->info;
1,236✔
1433
  int64_t*                 tsCol = extWinExtractTsCol(pInputBlock, pExtW->primaryTsIndex, pOperator->pTaskInfo);
1,236✔
1434
  SExtWinTimeWindow*       pWin = NULL;
1,236✔
1435
  bool                     ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
1,236✔
1436
  int32_t                  startPos = 0, winRows = 0;
1,236✔
1437
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
1,236✔
1438
  
1439
  while (true) {
1440
    TAOS_CHECK_EXIT((*pExtW->getWinFp)(pOperator, tsCol, &startPos, &pInputBlock->info, &pWin, &winRows));
3,296✔
1441
    if (pWin == NULL) {
3,296✔
1442
      break;
1,236✔
1443
    }
1444

1445
    qDebug("%s ext window [%" PRId64 ", %" PRId64 ") project start, ascScan:%d, startPos:%d, winRows:%d",
2,060✔
1446
           GET_TASKID(pOperator->pTaskInfo), pWin->tw.skey, pWin->tw.ekey, ascScan, startPos, winRows);        
1447
    
1448
    TAOS_CHECK_EXIT(extWinProjectDo(pOperator, pInputBlock, startPos, winRows, pWin));
2,060✔
1449
    
1450
    startPos += winRows;
2,060✔
1451
  }
1452
  
1453
_exit:
1,236✔
1454

1455
  if (code) {
1,236✔
1456
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
1457
  }
1458

1459
  return code;
1,236✔
1460
}
1461

1462
static int32_t extWinIndefRowsDoImpl(SOperatorInfo* pOperator, SSDataBlock* pRes, SSDataBlock* pBlock) {
×
1463
  SExternalWindowOperator* pExtW = pOperator->info;
×
1464
  SOptrBasicInfo*     pInfo = &pExtW->binfo;
×
1465
  SExprSupp*          pSup = &pOperator->exprSupp;
×
1466
  SExecTaskInfo*      pTaskInfo = pOperator->pTaskInfo;
×
1467
  int32_t order = pInfo->inputTsOrder;
×
1468
  int32_t scanFlag = pBlock->info.scanFlag;
×
1469
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
×
1470

1471
  SExprSupp* pScalarSup = &pExtW->scalarSupp;
×
1472
  if (pScalarSup->pExprInfo != NULL) {
×
1473
    TAOS_CHECK_EXIT(projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
×
1474
                                 pExtW->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo)));
1475
  }
1476

1477
  TAOS_CHECK_EXIT(setInputDataBlock(pSup, pBlock, order, scanFlag, false));
×
1478

1479
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pRes, pRes->info.rows + pBlock->info.rows));
×
1480

1481
  TAOS_CHECK_EXIT(projectApplyFunctions(pSup->pExprInfo, pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
×
1482
                               pExtW->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo)));
1483

1484
_exit:
×
1485

1486
  if (code) {
×
1487
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __FUNCTION__, lino, tstrerror(code));
×
1488
  }
1489

1490
  return code;
×
1491
}
1492

1493
static int32_t extWinIndefRowsSetWinOutputBuf(SExternalWindowOperator* pExtW, SExtWinTimeWindow* win, SExprSupp* pSupp, 
×
1494
                                     SAggSupporter* pAggSup, SExecTaskInfo* pTaskInfo, bool reset) {
1495
  int32_t code = 0, lino = 0;
×
1496
  SResultRow* pResultRow = NULL;
×
1497

1498
  pResultRow = (SResultRow*)((char*)pExtW->pResultRow + win->winOutIdx * pAggSup->resultRowSize);
×
1499
  
1500
  qDebug("set window [%" PRId64 ", %" PRId64 "] outIdx:%d", win->tw.skey, win->tw.ekey, win->winOutIdx);
×
1501

1502
  if (reset) {
×
1503
    memset(pResultRow, 0, pAggSup->resultRowSize);
×
1504
    for (int32_t k = 0; k < pSupp->numOfExprs; ++k) {
×
1505
      SqlFunctionCtx* pCtx = &pSupp->pCtx[k];
×
1506
      pCtx->pOutput = NULL;
×
1507
    }
1508
  }
1509

1510
  TAOS_SET_POBJ_ALIGNED(&pResultRow->win, &win->tw);
×
1511

1512
  // set time window for current result
1513
  TAOS_CHECK_EXIT(setResultRowInitCtx(pResultRow, pSupp->pCtx, pSupp->numOfExprs, pSupp->rowEntryInfoOffset));
×
1514

1515
_exit:
×
1516
  
1517
  if (code) {
×
1518
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1519
  }
1520

1521
  return code;
×
1522
}
1523

1524
static int32_t extWinGetSetWinResBlockBuf(SOperatorInfo* pOperator, int32_t rows, SExtWinTimeWindow* pWin, SSDataBlock** ppRes, SArray** ppIdx) {
×
1525
  SExternalWindowOperator* pExtW = pOperator->info;
×
1526
  SList*                   pList = NULL;
×
1527
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
×
1528
  
1529
  if (pWin->winOutIdx >= 0) {
×
1530
    pList = taosArrayGetP(pExtW->pOutputBlocks, pWin->winOutIdx);
×
1531
    TAOS_CHECK_EXIT(extWinIndefRowsSetWinOutputBuf(pExtW, pWin, &pOperator->exprSupp, &pExtW->aggSup, pOperator->pTaskInfo, false));
×
1532
  } else {
1533
    if (extWinLastWinClosed(pExtW)) {
×
1534
      pWin->winOutIdx = pExtW->outWinIdx - 1;
×
1535
      pList = taosArrayGetP(pExtW->pOutputBlocks, pWin->winOutIdx);
×
1536
    } else {
1537
      pWin->winOutIdx = pExtW->outWinIdx++;
×
1538
      pList = tdListNew(POINTER_BYTES * 2);
×
1539
      TSDB_CHECK_NULL(pList, code, lino, _exit, terrno);
×
1540
      SList** ppList = taosArrayGet(pExtW->pOutputBlocks, pWin->winOutIdx);
×
1541
      extWinRecycleBlockList(pExtW, ppList);
×
1542
      *ppList = pList;
×
1543
    }
1544
    TAOS_CHECK_EXIT(extWinIndefRowsSetWinOutputBuf(pExtW, pWin, &pOperator->exprSupp, &pExtW->aggSup, pOperator->pTaskInfo, true));
×
1545
  }
1546
  
1547
  TAOS_CHECK_EXIT(extWinGetLastBlockFromList(pExtW, pList, rows, ppRes, ppIdx));
×
1548

1549
_exit:
×
1550

1551
  if (code) {
×
1552
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1553
  }
1554

1555
  return code;
×
1556
}
1557

1558

1559
static int32_t extWinIndefRowsDo(SOperatorInfo* pOperator, SSDataBlock* pInputBlock, int32_t startPos, int32_t rows, SExtWinTimeWindow* pWin) {
×
1560
  SExternalWindowOperator* pExtW = pOperator->info;
×
1561
  SSDataBlock*             pResBlock = NULL;
×
1562
  SArray*                  pIdx = NULL;
×
1563
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
×
1564
  
1565
  TAOS_CHECK_EXIT(extWinGetSetWinResBlockBuf(pOperator, rows, pWin, &pResBlock, &pIdx));
×
1566
  
1567
  if (!pExtW->pTmpBlock) {
×
1568
    TAOS_CHECK_EXIT(createOneDataBlock(pInputBlock, false, &pExtW->pTmpBlock));
×
1569
  } else {
1570
    blockDataCleanup(pExtW->pTmpBlock);
×
1571
  }
1572
  
1573
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pExtW->pTmpBlock, TMAX(1, rows)));
×
1574

1575
  TAOS_CHECK_EXIT(blockDataMergeNRows(pExtW->pTmpBlock, pInputBlock, startPos, rows));
×
1576
  TAOS_CHECK_EXIT(extWinIndefRowsDoImpl(pOperator, pResBlock, pExtW->pTmpBlock));
×
1577

1578
  TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pIdx, pResBlock, extWinGetCurWinIdx(pOperator->pTaskInfo), rows));
×
1579

1580
_exit:
×
1581

1582
  if (code) {
×
1583
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1584
  }
1585
  
1586
  return code;
×
1587
}
1588

1589

1590
static int32_t extWinIndefRowsOpen(SOperatorInfo* pOperator, SSDataBlock* pInputBlock) {
×
1591
  SExternalWindowOperator* pExtW = pOperator->info;
×
1592
  int64_t*                 tsCol = extWinExtractTsCol(pInputBlock, pExtW->primaryTsIndex, pOperator->pTaskInfo);
×
1593
  SExtWinTimeWindow*       pWin = NULL;
×
1594
  bool                     ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
×
1595
  int32_t                  startPos = 0, winRows = 0;
×
1596
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
×
1597
  
1598
  while (true) {
1599
    TAOS_CHECK_EXIT((*pExtW->getWinFp)(pOperator, tsCol, &startPos, &pInputBlock->info, &pWin, &winRows));
×
1600
    if (pWin == NULL) {
×
1601
      break;
×
1602
    }
1603

1604
    qDebug("%s ext window [%" PRId64 ", %" PRId64 ") indefRows start, ascScan:%d, startPos:%d, winRows:%d",
×
1605
           GET_TASKID(pOperator->pTaskInfo), pWin->tw.skey, pWin->tw.ekey, ascScan, startPos, winRows);        
1606
    
1607
    TAOS_CHECK_EXIT(extWinIndefRowsDo(pOperator, pInputBlock, startPos, winRows, pWin));
×
1608
    
1609
    startPos += winRows;
×
1610
  }
1611
  
1612
_exit:
×
1613

1614
  if (code) {
×
1615
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
1616
  }
1617

1618
  return code;
×
1619
}
1620

1621
static int32_t extWinNonAggOutputRes(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
824✔
1622
  SExternalWindowOperator* pExtW = pOperator->info;
824✔
1623
  int32_t                  numOfWin = pExtW->outWinIdx;
824✔
1624
  int32_t                  code = TSDB_CODE_SUCCESS;
824✔
1625
  int32_t                  lino = 0;
824✔
1626
  SSDataBlock*             pRes = NULL;
824✔
1627

1628
  for (; pExtW->outputWinId < numOfWin; pExtW->outputWinId++, extWinIncCurWinOutIdx(pOperator->pTaskInfo->pStreamRuntimeInfo)) {
824✔
1629
    SList* pList = taosArrayGetP(pExtW->pOutputBlocks, pExtW->outputWinId);
824✔
1630
    if (listNEles(pList) <= 0) {
824✔
1631
      continue;
×
1632
    }
1633

1634
    SListNode* pNode = tdListPopHead(pList);
824✔
1635
    pRes = *(SSDataBlock**)pNode->data;
824✔
1636
    pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamBlkWinIdx = *(SArray**)((SArray**)pNode->data + 1);
824✔
1637
    pExtW->pLastBlkNode = pNode;
824✔
1638

1639
    if (listNEles(pList) <= 0) {
824✔
1640
      pExtW->outputWinId++;
824✔
1641
      extWinIncCurWinOutIdx(pOperator->pTaskInfo->pStreamRuntimeInfo);
824✔
1642
    }
1643

1644
    break;
824✔
1645
  }
1646

1647
  if (pRes) {
824✔
1648
    qDebug("%s result generated, rows:%" PRId64 , GET_TASKID(pOperator->pTaskInfo), pRes->info.rows);
824✔
1649
    pRes->info.version = pOperator->pTaskInfo->version;
824✔
1650
    pRes->info.dataLoad = 1;
824✔
1651
  } else {
1652
    pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamBlkWinIdx = NULL;
×
1653
    qDebug("%s ext window done", GET_TASKID(pOperator->pTaskInfo));
×
1654
  }
1655

1656
  *ppRes = (pRes && pRes->info.rows > 0) ? pRes : NULL;
824✔
1657

1658
_exit:
824✔
1659

1660
  if (code != TSDB_CODE_SUCCESS) {
824✔
1661
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1662
  }
1663

1664
  return code;
824✔
1665
}
1666

1667
static int32_t extWinAggHandleEmptyWins(SOperatorInfo* pOperator, SSDataBlock* pBlock, bool allRemains, SExtWinTimeWindow* pWin) {
11,913,597✔
1668
  int32_t code = 0, lino = 0;
11,913,597✔
1669
  SExternalWindowOperator* pExtW = (SExternalWindowOperator*)pOperator->info;
11,913,597✔
1670
  SExprSupp* pSup = &pOperator->exprSupp;
11,913,597✔
1671
  int32_t currIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
11,913,209✔
1672

1673
  if (NULL == pExtW->pEmptyInputBlock || (pWin && pWin->tw.skey == pExtW->lastSKey)) {
11,913,597✔
1674
    goto _exit;
2,327,709✔
1675
  }
1676

1677
  bool ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
9,585,888✔
1678
  int32_t endIdx = allRemains ? (pExtW->pWins->size - 1) : (currIdx - 1);
9,585,500✔
1679
  SResultRowInfo* pResultRowInfo = &pExtW->binfo.resultRowInfo;
9,585,500✔
1680
  SSDataBlock* pInput = pExtW->pEmptyInputBlock;
9,585,500✔
1681

1682
  if ((pExtW->lastWinId + 1) <= endIdx) {
9,585,500✔
1683
    TAOS_CHECK_EXIT(setInputDataBlock(pSup, pExtW->pEmptyInputBlock, pExtW->binfo.inputTsOrder, MAIN_SCAN, true));
13,896✔
1684
  }
1685
  
1686
  for (int32_t i = pExtW->lastWinId + 1; i <= endIdx; ++i) {
9,637,919✔
1687
    SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, i);
52,419✔
1688

1689
    extWinSetCurWinIdx(pOperator, i);
52,419✔
1690
    qDebug("%s %dth ext empty window start:%" PRId64 ", end:%" PRId64 ", ascScan:%d",
52,419✔
1691
           GET_TASKID(pOperator->pTaskInfo), i, pWin->tw.skey, pWin->tw.ekey, ascScan);
1692

1693
    TAOS_CHECK_EXIT(extWinAggSetWinOutputBuf(pOperator, pWin, pSup, &pExtW->aggSup, pOperator->pTaskInfo));
52,419✔
1694

1695
    updateTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pWin->tw, 1);
52,419✔
1696
    code = extWinAggDo(pOperator, 0, 1, pInput);
52,419✔
1697
    pExtW->lastWinId = i;  
52,419✔
1698
    TAOS_CHECK_EXIT(code);
52,419✔
1699
  }
1700

1701
  
1702
_exit:
9,585,500✔
1703

1704
  if (code) {
11,913,597✔
1705
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __FUNCTION__, lino, tstrerror(code));
×
1706
  } else {
1707
    if (pBlock) {
11,913,597✔
1708
      TAOS_CHECK_EXIT(setInputDataBlock(pSup, pBlock, pExtW->binfo.inputTsOrder, pBlock->info.scanFlag, true));
11,830,685✔
1709
    }
1710

1711
    if (!allRemains) {
11,913,209✔
1712
      extWinSetCurWinIdx(pOperator, currIdx);  
11,830,685✔
1713
    }
1714
  }
1715

1716
  return code;
11,913,205✔
1717
}
1718

1719
static int32_t extWinAggOpen(SOperatorInfo* pOperator, SSDataBlock* pInputBlock) {
4,084,082✔
1720
  SExternalWindowOperator* pExtW = (SExternalWindowOperator*)pOperator->info;
4,084,082✔
1721
  int32_t                  startPos = 0, winRows = 0;
4,084,082✔
1722
  int64_t*                 tsCol = extWinExtractTsCol(pInputBlock, pExtW->primaryTsIndex, pOperator->pTaskInfo);
4,084,082✔
1723
  bool                     ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
4,084,082✔
1724
  int32_t                  code = 0, lino = 0;
4,084,082✔
1725
  SExtWinTimeWindow*       pWin = NULL;
4,084,082✔
1726
  bool                     scalarCalc = false;
4,084,082✔
1727

1728
  while (true) {
1729
    TAOS_CHECK_EXIT((*pExtW->getWinFp)(pOperator, tsCol, &startPos, &pInputBlock->info, &pWin, &winRows));
15,914,379✔
1730
    if (pWin == NULL) {
15,912,828✔
1731
      break;
4,083,695✔
1732
    }
1733

1734
    TAOS_CHECK_EXIT(extWinAggHandleEmptyWins(pOperator, pInputBlock, false, pWin));
11,829,133✔
1735

1736
    qDebug("%s ext window [%" PRId64 ", %" PRId64 ") agg start, ascScan:%d, startPos:%d, winRows:%d",
11,830,293✔
1737
           GET_TASKID(pOperator->pTaskInfo), pWin->tw.skey, pWin->tw.ekey, ascScan, startPos, winRows);        
1738

1739
    if (!scalarCalc) {
11,830,685✔
1740
      if (pExtW->scalarSupp.pExprInfo) {
4,004,828✔
1741
        SExprSupp* pScalarSup = &pExtW->scalarSupp;
×
1742
        TAOS_CHECK_EXIT(projectApplyFunctions(pScalarSup->pExprInfo, pInputBlock, pInputBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
×
1743
                                     pExtW->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo)));
1744
      }
1745
      
1746
      scalarCalc = true;
4,004,828✔
1747
    }
1748

1749
    if (pWin->tw.skey != pExtW->lastSKey) {
11,830,685✔
1750
      TAOS_CHECK_EXIT(extWinAggSetWinOutputBuf(pOperator, pWin, &pOperator->exprSupp, &pExtW->aggSup, pOperator->pTaskInfo));
9,558,845✔
1751
    }
1752
    
1753
    updateTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pWin->tw, 1);
11,830,685✔
1754
    TAOS_CHECK_EXIT(extWinAggDo(pOperator, startPos, winRows, pInputBlock));
11,830,685✔
1755
    
1756
    pExtW->lastSKey = pWin->tw.skey;
11,829,129✔
1757
    pExtW->lastWinId = extWinGetCurWinIdx(pOperator->pTaskInfo);
11,829,129✔
1758
    startPos += winRows;
11,830,297✔
1759
  }
1760

1761
_exit:
4,083,695✔
1762

1763
  if (code) {
4,084,082✔
1764
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
1765
  }
1766

1767
  return code;
4,084,082✔
1768
}
1769

1770
static int32_t extWinAggOutputRes(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
82,912✔
1771
  SExternalWindowOperator* pExtW = pOperator->info;
82,912✔
1772
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
82,912✔
1773
  SSDataBlock*             pBlock = pExtW->binfo.pRes;
82,912✔
1774
  int32_t                  code = TSDB_CODE_SUCCESS;
82,912✔
1775
  int32_t                  lino = 0;
82,912✔
1776
  SExprInfo*               pExprInfo = pOperator->exprSupp.pExprInfo;
82,912✔
1777
  int32_t                  numOfExprs = pOperator->exprSupp.numOfExprs;
82,912✔
1778
  int32_t*                 rowEntryOffset = pOperator->exprSupp.rowEntryInfoOffset;
82,912✔
1779
  SqlFunctionCtx*          pCtx = pOperator->exprSupp.pCtx;
82,912✔
1780
  int32_t                  numOfWin = pExtW->outWinIdx;
82,912✔
1781

1782
  pBlock->info.version = pTaskInfo->version;
82,912✔
1783
  blockDataCleanup(pBlock);
82,912✔
1784
  taosArrayClear(pExtW->pWinRowIdx);
82,912✔
1785

1786
  for (int32_t i = pExtW->outputWinId; i < pExtW->pWins->size; ++i, pExtW->outputWinId += 1) {
323,928✔
1787
    SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, i);
241,016✔
1788
    int32_t            winIdx = pWin->winOutIdx;
241,016✔
1789
    if (winIdx < 0) {
240,621✔
1790
      continue;
1,960✔
1791
    }
1792
    SResultRow* pRow = (SResultRow*)((char*)pExtW->pResultRow + winIdx * pExtW->aggSup.resultRowSize);
238,661✔
1793

1794
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
238,661✔
1795

1796
    // no results, continue to check the next one
1797
    if (pRow->numOfRows == 0) {
239,056✔
1798
      continue;
×
1799
    }
1800

1801
    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
239,056✔
1802
      uint32_t newSize = pBlock->info.rows + pRow->numOfRows + numOfWin - winIdx;
71,513✔
1803
      TAOS_CHECK_EXIT(blockDataEnsureCapacity(pBlock, newSize));
71,513✔
1804
      qDebug("datablock capacity not sufficient, expand to required:%d, current capacity:%d, %s", newSize,
71,513✔
1805
             pBlock->info.capacity, GET_TASKID(pTaskInfo));
1806
    }
1807

1808
    TAOS_CHECK_EXIT(copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo));
239,056✔
1809

1810
    pBlock->info.rows += pRow->numOfRows;
239,056✔
1811

1812
    TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pExtW->pWinRowIdx, pBlock, pRow->winIdx, pRow->numOfRows));
239,056✔
1813

1814
    if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
239,056✔
1815
      break;
×
1816
    }
1817
  }
1818

1819
  qDebug("%s result generated, rows:%" PRId64 ", groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
82,912✔
1820
         pBlock->info.id.groupId);
1821
         
1822
  pBlock->info.dataLoad = 1;
82,912✔
1823

1824
  *ppRes = (pBlock->info.rows > 0) ? pBlock : NULL;
82,521✔
1825
  pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamBlkWinIdx = pExtW->pWinRowIdx;
82,521✔
1826

1827
_exit:
82,912✔
1828

1829
  if (code != TSDB_CODE_SUCCESS) {
82,912✔
1830
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1831
  }
1832

1833
  return code;
82,912✔
1834
}
1835

1836
static int32_t extWinInitResultRow(SExecTaskInfo*        pTaskInfo, SExternalWindowOperator* pExtW, int32_t winNum) {
83,324✔
1837
  if (EEXT_MODE_SCALAR == pExtW->mode) {
83,324✔
1838
    return TSDB_CODE_SUCCESS;
412✔
1839
  }
1840

1841
  if (winNum <= pExtW->resultRowCapacity) {
82,912✔
1842
    return TSDB_CODE_SUCCESS;
7,718✔
1843
  }
1844
  
1845
  taosMemoryFreeClear(pExtW->pResultRow);
75,194✔
1846
  pExtW->resultRowCapacity = -1;
75,194✔
1847

1848
  int32_t code = 0, lino = 0;
75,194✔
1849
  
1850
  pExtW->pResultRow = taosMemoryCalloc(winNum, pExtW->aggSup.resultRowSize);
75,194✔
1851
  QUERY_CHECK_NULL(pExtW->pResultRow, code, lino, _exit, terrno);
75,194✔
1852

1853
  pExtW->resultRowCapacity = winNum;
75,194✔
1854

1855
_exit:
75,194✔
1856

1857
  if (code) {
75,194✔
1858
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1859
  }
1860

1861
  return code;
75,194✔
1862
}
1863

1864
static void extWinFreeResultRow(SExternalWindowOperator* pExtW) {
82,521✔
1865
  if (pExtW->resultRowCapacity * pExtW->aggSup.resultRowSize >= 1048576) {
82,521✔
1866
    taosMemoryFreeClear(pExtW->pResultRow);
×
1867
    pExtW->resultRowCapacity = -1;
×
1868
  }
1869
  if (pExtW->binfo.pRes && pExtW->binfo.pRes->info.rows * pExtW->aggSup.resultRowSize >= 1048576) {
82,912✔
1870
    blockDataFreeCols(pExtW->binfo.pRes);
×
1871
  }
1872
}
82,521✔
1873

1874
static int32_t extWinInitWindowList(SExternalWindowOperator* pExtW, SExecTaskInfo*        pTaskInfo) {
83,324✔
1875
  if (taosArrayGetSize(pExtW->pWins) > 0) {
83,324✔
1876
    return TSDB_CODE_SUCCESS;
×
1877
  }
1878
  
1879
  int32_t code = 0, lino = 0;
83,324✔
1880
  SStreamRuntimeFuncInfo* pInfo = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
83,324✔
1881
  size_t size = taosArrayGetSize(pInfo->pStreamPesudoFuncVals);
83,324✔
1882
  SExtWinTimeWindow* pWin = taosArrayReserve(pExtW->pWins, size);
83,324✔
1883
  TSDB_CHECK_NULL(pWin, code, lino, _exit, terrno);
83,324✔
1884

1885
  TAOS_CHECK_EXIT(extWinInitResultRow(pTaskInfo, pExtW, size));
83,324✔
1886

1887
  if (pExtW->timeRangeExpr && pExtW->timeRangeExpr->needCalc) {
83,324✔
1888
    TAOS_CHECK_EXIT(scalarCalculateExtWinsTimeRange(pExtW->timeRangeExpr, pInfo, pWin));
18,997✔
1889
    if (qDebugFlag & DEBUG_DEBUG) {
18,997✔
1890
      for (int32_t i = 0; i < size; ++i) {
46,925✔
1891
        qDebug("%s the %d/%d ext window calced initialized, TR[%" PRId64 ", %" PRId64 ")", 
27,928✔
1892
            pTaskInfo->id.str, i, (int32_t)size, pWin[i].tw.skey, pWin[i].tw.ekey);
1893
      }
1894
    }
1895
  } else {
1896
    for (int32_t i = 0; i < size; ++i) {
278,239✔
1897
      SSTriggerCalcParam* pParam = taosArrayGet(pInfo->pStreamPesudoFuncVals, i);
213,912✔
1898

1899
      pWin[i].tw.skey = pParam->wstart;
213,912✔
1900
      pWin[i].tw.ekey = pParam->wend + ((pInfo->triggerType != STREAM_TRIGGER_SLIDING) ? 1 : 0);
213,912✔
1901
      pWin[i].winOutIdx = -1;
213,912✔
1902

1903
      qDebug("%s the %d/%d ext window initialized, TR[%" PRId64 ", %" PRId64 ")", 
213,912✔
1904
          pTaskInfo->id.str, i, (int32_t)size, pWin[i].tw.skey, pWin[i].tw.ekey);
1905
    }
1906
  }
1907
  
1908
  pExtW->outputWinId = pInfo->curIdx;
83,324✔
1909
  pExtW->lastWinId = -1;
83,324✔
1910
  pExtW->blkWinStartIdx = pInfo->curIdx;
83,324✔
1911

1912
_exit:
83,324✔
1913

1914
  if (code) {
83,324✔
1915
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1916
  }
1917

1918
  return code;
83,324✔
1919
}
1920

1921
static bool extWinNonAggGotResBlock(SExternalWindowOperator* pExtW) {
1,236✔
1922
  if (pExtW->multiTableMode && !pExtW->inputHasOrder) {
1,236✔
1923
    return false;
1,236✔
1924
  }
1925
  int32_t remainWin = pExtW->outWinIdx - pExtW->outputWinId;
×
1926
  if (remainWin > 1 && (NULL == pExtW->timeRangeExpr || !pExtW->timeRangeExpr->needCalc)) {
×
1927
    return true;
×
1928
  }
1929
  
1930
  SList* pList = taosArrayGetP(pExtW->pOutputBlocks, pExtW->outputWinId);
×
1931
  if (!pList || listNEles(pList) <= 0) {
×
1932
    return false;
×
1933
  }
1934
  if (listNEles(pList) > 1) {
×
1935
    return true;
×
1936
  }
1937

1938
  SListNode* pNode = listHead(pList);
×
1939
  SArray* pIdx = *(SArray**)((SArray**)pNode->data + 1);
×
1940
  int32_t* winIdx = taosArrayGetLast(pIdx);
×
1941
  if (winIdx && *winIdx < pExtW->blkWinStartIdx) {
×
1942
    return true;
×
1943
  }
1944

1945
  return false;
×
1946
}
1947

1948
static int32_t extWinOpen(SOperatorInfo* pOperator) {
83,736✔
1949
  if (OPTR_IS_OPENED(pOperator)) {
83,736✔
1950
    return TSDB_CODE_SUCCESS;
412✔
1951
  }
1952
  
1953
  int32_t                  code = 0;
83,324✔
1954
  int32_t                  lino = 0;
83,324✔
1955
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
83,324✔
1956
  SOperatorInfo*           pDownstream = pOperator->pDownstream[0];
83,324✔
1957
  SExternalWindowOperator* pExtW = pOperator->info;
83,324✔
1958
  SExprSupp*               pSup = &pOperator->exprSupp;
83,324✔
1959
  
1960
  TAOS_CHECK_EXIT(extWinInitWindowList(pExtW, pTaskInfo));
83,324✔
1961

1962
  while (1) {
4,085,318✔
1963
    pExtW->blkWinIdx = -1;
4,168,642✔
1964
    pExtW->blkWinStartSet = false;
4,168,642✔
1965
    pExtW->blkRowStartIdx = 0;
4,168,642✔
1966
    
1967
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
4,168,642✔
1968
    if (pBlock == NULL) {
4,168,642✔
1969
      if (EEXT_MODE_AGG == pExtW->mode) {
83,324✔
1970
        TAOS_CHECK_EXIT(extWinAggHandleEmptyWins(pOperator, pBlock, true, NULL));
82,912✔
1971
      }
1972
      pExtW->blkWinStartIdx = pExtW->pWins->size;
83,324✔
1973
      break;
83,324✔
1974
    }
1975

1976
    printDataBlock(pBlock, __func__, pTaskInfo->id.str, pTaskInfo->id.queryId);
4,085,318✔
1977

1978
    qDebug("ext window mode:%d got %" PRId64 " rows from downstream", pExtW->mode, pBlock->info.rows);
4,085,318✔
1979
    
1980
    switch (pExtW->mode) {
4,085,318✔
1981
      case EEXT_MODE_SCALAR:
1,236✔
1982
        TAOS_CHECK_EXIT(extWinProjectOpen(pOperator, pBlock));
1,236✔
1983
        if (extWinNonAggGotResBlock(pExtW)) {
1,236✔
1984
          return code;
×
1985
        }
1986
        break;
1,236✔
1987
      case EEXT_MODE_AGG:
4,084,082✔
1988
        TAOS_CHECK_EXIT(extWinAggOpen(pOperator, pBlock));
4,084,082✔
1989
        break;
4,084,082✔
1990
      case EEXT_MODE_INDEFR_FUNC:
×
1991
        TAOS_CHECK_EXIT(extWinIndefRowsOpen(pOperator, pBlock));
×
1992
        if (extWinNonAggGotResBlock(pExtW)) {
×
1993
          return code;
×
1994
        }
1995
        break;
×
1996
      default:
×
1997
        break;
×
1998
    }
1999
  }
2000

2001
  OPTR_SET_OPENED(pOperator);
83,324✔
2002

2003
#if 0
2004
  if (pExtW->mode == EEXT_MODE_AGG) {
2005
    qDebug("ext window before dump final rows num:%d", tSimpleHashGetSize(pExtW->aggSup.pResultRowHashTable));
2006

2007
    code = initGroupedResultInfo(&pExtW->groupResInfo, pExtW->aggSup.pResultRowHashTable, pExtW->binfo.inputTsOrder);
2008
    QUERY_CHECK_CODE(code, lino, _exit);
2009

2010
    qDebug("ext window after dump final rows num:%d", tSimpleHashGetSize(pExtW->aggSup.pResultRowHashTable));
2011
  }
2012
#endif
2013

2014
_exit:
83,324✔
2015

2016
  if (code != 0) {
83,324✔
2017
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
2018
    pTaskInfo->code = code;
×
2019
    T_LONG_JMP(pTaskInfo->env, code);
×
2020
  }
2021
  
2022
  return code;
83,324✔
2023
}
2024

2025
static int32_t extWinNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
161,040✔
2026
  int32_t                  code = 0;
161,040✔
2027
  int32_t                  lino = 0;
161,040✔
2028
  SExternalWindowOperator* pExtW = pOperator->info;
161,040✔
2029
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
161,040✔
2030

2031
  if (pOperator->status == OP_EXEC_DONE) {
161,040✔
2032
    *ppRes = NULL;
77,304✔
2033
    return code;
77,304✔
2034
  }
2035

2036
  extWinRecycleBlkNode(pExtW, &pExtW->pLastBlkNode);
83,736✔
2037

2038
  TAOS_CHECK_EXIT(pOperator->fpSet._openFn(pOperator));
83,736✔
2039

2040
  if (pExtW->mode == EEXT_MODE_SCALAR || pExtW->mode == EEXT_MODE_INDEFR_FUNC) {
83,736✔
2041
    TAOS_CHECK_EXIT(extWinNonAggOutputRes(pOperator, ppRes));
824✔
2042
    if (NULL == *ppRes) {
824✔
2043
      setOperatorCompleted(pOperator);
×
2044
      extWinFreeResultRow(pExtW);
×
2045
    }
2046
  } else {
2047
#if 0    
2048
    doBuildResultDatablock(pOperator, &pExtW->binfo, &pExtW->groupResInfo, pExtW->aggSup.pResultBuf);
2049
    bool hasRemain = hasRemainResults(&pExtW->groupResInfo);
2050
    if (!hasRemain) {
2051
      setOperatorCompleted(pOperator);
2052
      break;
2053
    }
2054
    if (pExtW->binfo.pRes->info.rows > 0) break;
2055
#else
2056
    TAOS_CHECK_EXIT(extWinAggOutputRes(pOperator, ppRes));
82,912✔
2057
    setOperatorCompleted(pOperator);
82,521✔
2058
    extWinFreeResultRow(pExtW);
82,912✔
2059
#endif      
2060
  }
2061

2062
  if (*ppRes) {
83,345✔
2063
    pOperator->resultInfo.totalRows += (*ppRes)->info.rows;
81,385✔
2064
  }
2065
  
2066
_exit:
1,960✔
2067

2068
  if (code) {
83,736✔
2069
    qError("%s %s failed at line %d since %s", GET_TASKID(pTaskInfo), __func__, lino, tstrerror(code));
×
2070
    pTaskInfo->code = code;
×
2071
    T_LONG_JMP(pTaskInfo->env, code);
×
2072
  }
2073

2074
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM && (*ppRes)) {
83,736✔
2075
    printDataBlock(*ppRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo), pTaskInfo->id.queryId);
81,776✔
2076
  }
2077
  
2078
  return code;
83,736✔
2079
}
2080

2081

2082
int32_t createExternalWindowOperator(SOperatorInfo* pDownstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo,
70,708✔
2083
                                     SOperatorInfo** pOptrOut) {
2084
  SExternalWindowPhysiNode* pPhynode = (SExternalWindowPhysiNode*)pNode;
70,708✔
2085
  QRY_PARAM_CHECK(pOptrOut);
70,708✔
2086
  int32_t                  code = 0;
70,708✔
2087
  int32_t                  lino = 0;
70,708✔
2088
  SExternalWindowOperator* pExtW = taosMemoryCalloc(1, sizeof(SExternalWindowOperator));
70,708✔
2089
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
70,708✔
2090
  pOperator->pPhyNode = pNode;
70,708✔
2091
  if (!pExtW || !pOperator) {
70,708✔
2092
    code = terrno;
×
2093
    lino = __LINE__;
×
2094
    goto _error;
×
2095
  }
2096
  
2097
  setOperatorInfo(pOperator, "ExternalWindowOperator", QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW, true, OP_NOT_OPENED,
70,708✔
2098
                  pExtW, pTaskInfo);
2099
                  
2100
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhynode->window.node.pOutputDataBlockDesc);
70,708✔
2101
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
70,708✔
2102
  initBasicInfo(&pExtW->binfo, pResBlock);
70,708✔
2103

2104
  pExtW->primaryTsIndex = ((SColumnNode*)pPhynode->window.pTspk)->slotId;
70,708✔
2105
  pExtW->mode = pPhynode->window.pProjs ? EEXT_MODE_SCALAR : (pPhynode->window.indefRowsFunc ? EEXT_MODE_INDEFR_FUNC : EEXT_MODE_AGG);
70,708✔
2106
  pExtW->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC;
70,708✔
2107
  pExtW->binfo.outputTsOrder = pExtW->binfo.inputTsOrder;
70,708✔
2108

2109
  if (pTaskInfo->pStreamRuntimeInfo != NULL){
70,708✔
2110
    pTaskInfo->pStreamRuntimeInfo->funcInfo.withExternalWindow = true;
70,708✔
2111
  }
2112

2113
  // pExtW->limitInfo = (SLimitInfo){0};
2114
  // initLimitInfo(pPhynode->window.node.pLimit, pPhynode->window.node.pSlimit, &pExtW->limitInfo);
2115

2116
  if (pPhynode->window.pProjs) {
70,708✔
2117
    int32_t    numOfScalarExpr = 0;
412✔
2118
    SExprInfo* pScalarExprInfo = NULL;
412✔
2119
    code = createExprInfo(pPhynode->window.pProjs, NULL, &pScalarExprInfo, &numOfScalarExpr);
412✔
2120
    QUERY_CHECK_CODE(code, lino, _error);
412✔
2121

2122
    code = initExprSupp(&pExtW->scalarSupp, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
412✔
2123
    QUERY_CHECK_CODE(code, lino, _error);
412✔
2124

2125
  //if (pExtW->multiTableMode) {
2126
    pExtW->pOutputBlocks = taosArrayInit_s(POINTER_BYTES, STREAM_CALC_REQ_MAX_WIN_NUM);
412✔
2127
    if (!pExtW->pOutputBlocks) QUERY_CHECK_CODE(terrno, lino, _error);
412✔
2128
  //}
2129
    pExtW->pFreeBlocks = tdListNew(POINTER_BYTES * 2);
412✔
2130
    QUERY_CHECK_NULL(pExtW->pFreeBlocks, code, lino, _error, terrno);
412✔
2131
  } else if (pExtW->mode == EEXT_MODE_AGG) {
70,296✔
2132
    if (pPhynode->window.pExprs != NULL) {
70,296✔
2133
      int32_t    num = 0;
×
2134
      SExprInfo* pSExpr = NULL;
×
2135
      code = createExprInfo(pPhynode->window.pExprs, NULL, &pSExpr, &num);
×
2136
      QUERY_CHECK_CODE(code, lino, _error);
×
2137
    
2138
      code = initExprSupp(&pExtW->scalarSupp, pSExpr, num, &pTaskInfo->storageAPI.functionStore);
×
2139
      if (code != TSDB_CODE_SUCCESS) {
×
2140
        goto _error;
×
2141
      }
2142
      checkIndefRowsFuncs(&pExtW->scalarSupp);
×
2143
    }
2144
    
2145
    size_t keyBufSize = sizeof(int64_t) * 2 + POINTER_BYTES;
70,296✔
2146
    initResultSizeInfo(&pOperator->resultInfo, 4096);
70,296✔
2147
    //code = blockDataEnsureCapacity(pExtW->binfo.pRes, pOperator->resultInfo.capacity);
2148
    //QUERY_CHECK_CODE(code, lino, _error);
2149

2150
    pExtW->pWinRowIdx = taosArrayInit(4096, sizeof(int64_t));
70,296✔
2151
    TSDB_CHECK_NULL(pExtW->pWinRowIdx, code, lino, _error, terrno);
70,296✔
2152
    
2153
    int32_t num = 0;
70,296✔
2154
    SExprInfo* pExprInfo = NULL;
70,296✔
2155
    code = createExprInfo(pPhynode->window.pFuncs, NULL, &pExprInfo, &num);
70,296✔
2156
    QUERY_CHECK_CODE(code, lino, _error);
70,296✔
2157
    code = initAggSup(&pOperator->exprSupp, &pExtW->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, 0, 0);
70,296✔
2158
    QUERY_CHECK_CODE(code, lino, _error);
70,296✔
2159

2160
    nodesWalkExprs(pPhynode->window.pFuncs, extWinHasCountLikeFunc, &pExtW->hasCountFunc);
70,296✔
2161
    if (pExtW->hasCountFunc) {
70,296✔
2162
      code = extWinCreateEmptyInputBlock(pOperator, &pExtW->pEmptyInputBlock);
53,611✔
2163
      QUERY_CHECK_CODE(code, lino, _error);
53,213✔
2164
      qDebug("%s ext window has CountLikeFunc", pOperator->pTaskInfo->id.str);
53,213✔
2165
    } else {
2166
      qDebug("%s ext window doesn't have CountLikeFunc", pOperator->pTaskInfo->id.str);
16,685✔
2167
    }
2168

2169
    code = initExecTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pTaskInfo->window);
69,898✔
2170
    QUERY_CHECK_CODE(code, lino, _error);
70,296✔
2171

2172
    pExtW->lastSKey = INT64_MIN;
70,296✔
2173
  } else {
2174
    size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
×
2175
    
2176
    if (pPhynode->window.pExprs != NULL) {
×
2177
      int32_t    num = 0;
×
2178
      SExprInfo* pSExpr = NULL;
×
2179
      code = createExprInfo(pPhynode->window.pExprs, NULL, &pSExpr, &num);
×
2180
      QUERY_CHECK_CODE(code, lino, _error);
×
2181
    
2182
      code = initExprSupp(&pExtW->scalarSupp, pSExpr, num, &pTaskInfo->storageAPI.functionStore);
×
2183
      if (code != TSDB_CODE_SUCCESS) {
×
2184
        goto _error;
×
2185
      }
2186
    }
2187
    
2188
    int32_t    numOfExpr = 0;
×
2189
    SExprInfo* pExprInfo = NULL;
×
2190
    code = createExprInfo(pPhynode->window.pFuncs, NULL, &pExprInfo, &numOfExpr);
×
2191
    TSDB_CHECK_CODE(code, lino, _error);
×
2192
    
2193
    code = initAggSup(&pOperator->exprSupp, &pExtW->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str,
×
2194
                              pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
×
2195
    TSDB_CHECK_CODE(code, lino, _error);
×
2196
    pOperator->exprSupp.hasWindowOrGroup = false;
×
2197
    
2198
    //code = setFunctionResultOutput(pOperator, &pExtW->binfo, &pExtW->aggSup, MAIN_SCAN, numOfExpr);
2199
    //TSDB_CHECK_CODE(code, lino, _error);
2200
    
2201
    code = filterInitFromNode((SNode*)pNode->pConditions, &pOperator->exprSupp.pFilterInfo, 0,
×
2202
                              pTaskInfo->pStreamRuntimeInfo);
×
2203
    TSDB_CHECK_CODE(code, lino, _error);
×
2204
    
2205
    pExtW->binfo.inputTsOrder = pNode->inputTsOrder;
×
2206
    pExtW->binfo.outputTsOrder = pNode->outputTsOrder;
×
2207
    code = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfExpr, &pExtW->pPseudoColInfo);
×
2208
    TSDB_CHECK_CODE(code, lino, _error);
×
2209

2210
  //if (pExtW->multiTableMode) {
2211
    pExtW->pOutputBlocks = taosArrayInit_s(POINTER_BYTES, STREAM_CALC_REQ_MAX_WIN_NUM);
×
2212
    if (!pExtW->pOutputBlocks) QUERY_CHECK_CODE(terrno, lino, _error);
×
2213
  //}
2214
    pExtW->pFreeBlocks = tdListNew(POINTER_BYTES * 2);
×
2215
    QUERY_CHECK_NULL(pExtW->pFreeBlocks, code, lino, _error, terrno);  
×
2216
  }
2217

2218
  pExtW->pWins = taosArrayInit(4096, sizeof(SExtWinTimeWindow));
70,708✔
2219
  if (!pExtW->pWins) QUERY_CHECK_CODE(terrno, lino, _error);
70,708✔
2220
  
2221
  //initResultRowInfo(&pExtW->binfo.resultRowInfo);
2222

2223
  pExtW->timeRangeExpr = (STimeRangeNode*)pPhynode->pTimeRange;
70,708✔
2224
  if (pExtW->timeRangeExpr) {
70,708✔
2225
    QUERY_CHECK_NULL(pExtW->timeRangeExpr->pStart, code, lino, _error, TSDB_CODE_STREAM_INTERNAL_ERROR);
70,708✔
2226
    QUERY_CHECK_NULL(pExtW->timeRangeExpr->pEnd, code, lino, _error, TSDB_CODE_STREAM_INTERNAL_ERROR);
70,708✔
2227
  }
2228

2229
  if (pPhynode->isSingleTable) {
70,708✔
2230
    pExtW->getWinFp = (pExtW->timeRangeExpr && (pExtW->timeRangeExpr->needCalc || (pTaskInfo->pStreamRuntimeInfo->funcInfo.addOptions & CALC_SLIDING_OVERLAP))) ? extWinGetOvlpWin : extWinGetNoOvlpWin;
15,843✔
2231
    pExtW->multiTableMode = false;
15,843✔
2232
  } else {
2233
    pExtW->getWinFp = (pExtW->timeRangeExpr && (pExtW->timeRangeExpr->needCalc || (pTaskInfo->pStreamRuntimeInfo->funcInfo.addOptions & CALC_SLIDING_OVERLAP))) ? extWinGetMultiTbOvlpWin : extWinGetMultiTbNoOvlpWin;
54,865✔
2234
    pExtW->multiTableMode = true;
54,477✔
2235
  }
2236
  pExtW->inputHasOrder = pPhynode->inputHasOrder;
70,320✔
2237

2238
  pOperator->fpSet = createOperatorFpSet(extWinOpen, extWinNext, NULL, destroyExternalWindowOperatorInfo,
70,320✔
2239
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2240
  setOperatorResetStateFn(pOperator, resetExternalWindowOperator);
70,708✔
2241
  code = appendDownstream(pOperator, &pDownstream, 1);
70,708✔
2242
  if (code != 0) {
70,708✔
2243
    goto _error;
×
2244
  }
2245

2246
  *pOptrOut = pOperator;
70,708✔
2247
  return code;
70,708✔
2248

2249
_error:
×
2250

2251
  if (pExtW != NULL) {
×
2252
    destroyExternalWindowOperatorInfo(pExtW);
×
2253
  }
2254

2255
  destroyOperatorAndDownstreams(pOperator, &pDownstream, 1);
×
2256
  pTaskInfo->code = code;
×
2257
  qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code));
×
2258
  return code;
×
2259
}
2260

2261

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc