• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4822

27 Oct 2025 05:42AM UTC coverage: 59.732% (+1.0%) from 58.728%
#4822

push

travis-ci

web-flow
Merge pull request #33377 from taosdata/fix/main/rename-udf-path

fix: update UDF example links to correct file paths

121214 of 258518 branches covered (46.89%)

Branch coverage included in aggregate %.

193636 of 268583 relevant lines covered (72.1%)

4002399.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

46.26
/source/libs/executor/src/externalwindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "operator.h"
18
#include "querytask.h"
19
#include "tdatablock.h"
20
#include "stream.h"
21
#include "filter.h"
22
#include "cmdnodes.h"
23

24
typedef struct SBlockList {
25
  const SSDataBlock* pSrcBlock;
26
  SList*             pBlocks;
27
  int32_t            blockRowNumThreshold;
28
} SBlockList;
29

30

31
typedef int32_t (*extWinGetWinFp)(SOperatorInfo*, int64_t*, int32_t*, SDataBlockInfo*, SExtWinTimeWindow**, int32_t*);
32

33
typedef struct SExtWindowStat {
34
  int64_t resBlockCreated;
35
  int64_t resBlockDestroyed;
36
  int64_t resBlockRecycled;
37
  int64_t resBlockReused;
38
  int64_t resBlockAppend;
39
} SExtWindowStat;
40

41
typedef struct SExternalWindowOperator {
42
  SOptrBasicInfo     binfo;
43
  SExprSupp          scalarSupp;
44
  int32_t            primaryTsIndex;
45
  EExtWinMode        mode;
46
  bool               multiTableMode;
47
  bool               inputHasOrder;
48
  SArray*            pWins;           // SArray<SExtWinTimeWindow>
49
  SArray*            pPseudoColInfo;  
50
  STimeRangeNode*    timeRangeExpr;
51
  
52
  extWinGetWinFp     getWinFp;
53

54
  bool               blkWinStartSet;
55
  int32_t            blkWinStartIdx;
56
  int32_t            blkWinIdx;
57
  int32_t            blkRowStartIdx;
58
  int32_t            outputWinId;
59
  int32_t            outWinIdx;
60

61
  // for project&indefRows
62
  SList*             pFreeBlocks;    // SList<SSDatablock*+SAarray*>
63
  SArray*            pOutputBlocks;  // SArray<SList*>, for each window, we have a list of blocks
64
  SListNode*         pLastBlkNode; 
65
  SSDataBlock*       pTmpBlock;
66
  
67
  // for agg
68
  SAggSupporter      aggSup;
69
  STimeWindowAggSupp twAggSup;
70

71
  int32_t            resultRowCapacity;
72
  SResultRow*        pResultRow;
73

74
  int64_t            lastSKey;
75
  int32_t            lastWinId;
76
  SSDataBlock*       pEmptyInputBlock;
77
  bool               hasCountFunc;
78
  SExtWindowStat     stat;
79
  SArray*            pWinRowIdx;
80
} SExternalWindowOperator;
81

82

83
static int32_t extWinBlockListAddBlock(SExternalWindowOperator* pExtW, SList* pList, int32_t rows, SSDataBlock** ppBlock, SArray** ppIdx) {
×
84
  SSDataBlock* pRes = NULL;
×
85
  int32_t code = 0, lino = 0;
×
86

87
  if (listNEles(pExtW->pFreeBlocks) > 0) {
×
88
    SListNode* pNode = tdListPopHead(pExtW->pFreeBlocks);
×
89
    *ppBlock = *(SSDataBlock**)pNode->data;
×
90
    *ppIdx = *(SArray**)((SArray**)pNode->data + 1);
×
91
    tdListAppendNode(pList, pNode);
×
92
    pExtW->stat.resBlockReused++;
×
93
  } else {
94
    TAOS_CHECK_EXIT(createOneDataBlock(pExtW->binfo.pRes, false, &pRes));
×
95
    TAOS_CHECK_EXIT(blockDataEnsureCapacity(pRes, TMAX(rows, 4096)));
×
96
    SArray* pIdx = taosArrayInit(10, sizeof(int64_t));
×
97
    TSDB_CHECK_NULL(pIdx, code, lino, _exit, terrno);
×
98
    void* res[2] = {pRes, pIdx};
×
99
    TAOS_CHECK_EXIT(tdListAppend(pList, res));
×
100

101
    *ppBlock = pRes;
×
102
    *ppIdx = pIdx;
×
103
    pExtW->stat.resBlockCreated++;
×
104
  }
105
  
106
_exit:
×
107

108
  if (code) {
×
109
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
110
    blockDataDestroy(pRes);
×
111
  }
112
  
113
  return code;
×
114
}
115

116
static int32_t extWinGetLastBlockFromList(SExternalWindowOperator* pExtW, SList* pList, int32_t rows, SSDataBlock** ppBlock, SArray** ppIdx) {
×
117
  int32_t    code = 0, lino = 0;
×
118
  SSDataBlock* pRes = NULL;
×
119

120
  SListNode* pNode = TD_DLIST_TAIL(pList);
×
121
  if (NULL == pNode) {
×
122
    TAOS_CHECK_EXIT(extWinBlockListAddBlock(pExtW, pList, rows, ppBlock, ppIdx));
×
123
    return code;
×
124
  }
125

126
  pRes = *(SSDataBlock**)pNode->data;
×
127
  if ((pRes->info.rows + rows) > pRes->info.capacity) {
×
128
    TAOS_CHECK_EXIT(extWinBlockListAddBlock(pExtW, pList, rows, ppBlock, ppIdx));
×
129
    return code;
×
130
  }
131

132
  *ppIdx = *(SArray**)((SSDataBlock**)pNode->data + 1);
×
133
  *ppBlock = pRes;
×
134
  pExtW->stat.resBlockAppend++;
×
135

136
_exit:
×
137

138
  if (code) {
×
139
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
140
  }
141
  
142
  return code;
×
143
}
144

145
static void extWinDestroyBlockList(void* p) {
×
146
  if (NULL == p) {
×
147
    return;
×
148
  }
149

150
  SListNode* pTmp = NULL;
×
151
  SList** ppList = (SList**)p;
×
152
  if ((*ppList) && TD_DLIST_NELES(*ppList) > 0) {
×
153
    SListNode* pNode = TD_DLIST_HEAD(*ppList);
×
154
    while (pNode) {
×
155
      SSDataBlock* pBlock = *(SSDataBlock**)pNode->data;
×
156
      blockDataDestroy(pBlock);
×
157
      SArray* pIdx = *(SArray**)((SArray**)pNode->data + 1);
×
158
      taosArrayDestroy(pIdx);
×
159
      pTmp = pNode;
×
160
      pNode = pNode->dl_next_;
×
161
      taosMemoryFree(pTmp);
×
162
    }
163
  }
164
  taosMemoryFree(*ppList);
×
165
}
166

167

168
static void extWinRecycleBlkNode(SExternalWindowOperator* pExtW, SListNode** ppNode) {
618✔
169
  if (NULL == ppNode || NULL == *ppNode) {
618!
170
    return;
618✔
171
  }
172

173
  SSDataBlock* pBlock = *(SSDataBlock**)(*ppNode)->data;
×
174
  SArray* pIdx = *(SArray**)((SArray**)(*ppNode)->data + 1);
×
175
  
176
  if (listNEles(pExtW->pFreeBlocks) >= 10) {
×
177
    blockDataDestroy(pBlock);
×
178
    taosArrayDestroy(pIdx);
×
179
    taosMemoryFreeClear(*ppNode);
×
180
    pExtW->stat.resBlockDestroyed++;
×
181
    return;
×
182
  }
183
  
184
  blockDataCleanup(pBlock);
×
185
  taosArrayClear(pIdx);
×
186
  tdListPrependNode(pExtW->pFreeBlocks, *ppNode);
×
187
  *ppNode = NULL;
×
188
  pExtW->stat.resBlockRecycled++;
×
189
}
190

191
static void extWinRecycleBlockList(SExternalWindowOperator* pExtW, void* p) {
×
192
  if (NULL == p) {
×
193
    return;
×
194
  }
195

196
  SListNode* pTmp = NULL;
×
197
  SList** ppList = (SList**)p;
×
198
  if ((*ppList) && TD_DLIST_NELES(*ppList) > 0) {
×
199
    SListNode* pNode = TD_DLIST_HEAD(*ppList);
×
200
    while (pNode) {
×
201
      pTmp = pNode;
×
202
      pNode = pNode->dl_next_;
×
203
      extWinRecycleBlkNode(pExtW, &pTmp);
×
204
    }
205
  }
206
  taosMemoryFree(*ppList);
×
207
}
208
static void extWinDestroyBlkNode(SExternalWindowOperator* pInfo, SListNode* pNode) {
203✔
209
  if (NULL == pNode) {
203!
210
    return;
203✔
211
  }
212

213
  SSDataBlock* pBlock = *(SSDataBlock**)pNode->data;
×
214
  SArray* pIdx = *(SArray**)((SArray**)pNode->data + 1);
×
215
  
216
  blockDataDestroy(pBlock);
×
217
  taosArrayDestroy(pIdx);
×
218

219
  taosMemoryFree(pNode);
×
220

221
  pInfo->stat.resBlockDestroyed++;
×
222
}
223

224

225
void destroyExternalWindowOperatorInfo(void* param) {
203✔
226
  if (NULL == param) {
203!
227
    return;
×
228
  }
229
  SExternalWindowOperator* pInfo = (SExternalWindowOperator*)param;
203✔
230
  cleanupBasicInfo(&pInfo->binfo);
203✔
231

232
  taosArrayDestroyEx(pInfo->pOutputBlocks, extWinDestroyBlockList);
203✔
233
  taosArrayDestroy(pInfo->pWins);
203✔
234
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
203✔
235
  taosArrayDestroy(pInfo->pWinRowIdx);
203✔
236
  
237
  taosArrayDestroy(pInfo->pPseudoColInfo);
203✔
238
  blockDataDestroy(pInfo->pTmpBlock);
203✔
239
  blockDataDestroy(pInfo->pEmptyInputBlock);
203✔
240

241
  extWinDestroyBlkNode(pInfo, pInfo->pLastBlkNode);
203✔
242
  if (pInfo->pFreeBlocks) {
203!
243
    SListNode *node;
244
    while ((node = TD_DLIST_HEAD(pInfo->pFreeBlocks)) != NULL) {
×
245
      TD_DLIST_POP(pInfo->pFreeBlocks, node);
×
246
      extWinDestroyBlkNode(pInfo, node);
×
247
    }
248
    taosMemoryFree(pInfo->pFreeBlocks);
×
249
  }
250
  
251
  cleanupAggSup(&pInfo->aggSup);
203✔
252
  cleanupExprSupp(&pInfo->scalarSupp);
203✔
253
  taosMemoryFreeClear(pInfo->pResultRow);
203!
254

255
  pInfo->binfo.resultRowInfo.openWindow = tdListFree(pInfo->binfo.resultRowInfo.openWindow);
203✔
256

257
  qDebug("ext window stat at destroy, created:%" PRId64 ", destroyed:%" PRId64 ", recycled:%" PRId64 ", reused:%" PRId64 ", append:%" PRId64, 
203✔
258
      pInfo->stat.resBlockCreated, pInfo->stat.resBlockDestroyed, pInfo->stat.resBlockRecycled, 
259
      pInfo->stat.resBlockReused, pInfo->stat.resBlockAppend);
260

261
  taosMemoryFreeClear(pInfo);
203!
262
}
263

264
static int32_t extWinOpen(SOperatorInfo* pOperator);
265
static int32_t extWinNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
266

267
typedef struct SMergeAlignedExternalWindowOperator {
268
  SExternalWindowOperator* pExtW;
269
  int64_t curTs;
270
  SResultRow*  pResultRow;
271
} SMergeAlignedExternalWindowOperator;
272

273
void destroyMergeAlignedExternalWindowOperator(void* pOperator) {
4✔
274
  SMergeAlignedExternalWindowOperator* pMlExtInfo = (SMergeAlignedExternalWindowOperator*)pOperator;
4✔
275
  destroyExternalWindowOperatorInfo(pMlExtInfo->pExtW);
4✔
276
}
4✔
277

278
int64_t* extWinExtractTsCol(SSDataBlock* pBlock, int32_t primaryTsIndex, SExecTaskInfo* pTaskInfo) {
10,527✔
279
  TSKEY* tsCols = NULL;
10,527✔
280

281
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
10,527!
282
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, primaryTsIndex);
10,527✔
283
    if (!pColDataInfo) {
10,527!
284
      pTaskInfo->code = terrno;
×
285
      T_LONG_JMP(pTaskInfo->env, terrno);
×
286
    }
287

288
    tsCols = (int64_t*)pColDataInfo->pData;
10,527✔
289
    if (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0) {
10,527!
290
      int32_t code = blockDataUpdateTsWindow(pBlock, primaryTsIndex);
10,527✔
291
      if (code != TSDB_CODE_SUCCESS) {
10,527!
292
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
293
        pTaskInfo->code = code;
×
294
        T_LONG_JMP(pTaskInfo->env, code);
×
295
      }
296
    }
297
  }
298

299
  return tsCols;
10,527✔
300
}
301

302
static int32_t extWinGetCurWinIdx(SExecTaskInfo* pTaskInfo) {
62,416✔
303
  return pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx;
62,416✔
304
}
305

306
static void extWinIncCurWinIdx(SOperatorInfo* pOperator) {
×
307
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
×
308
  pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx++;
×
309
}
×
310

311
static void extWinSetCurWinIdx(SOperatorInfo* pOperator, int32_t idx) {
61,456✔
312
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
61,456✔
313
  pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx = idx;
61,456✔
314
}
61,456✔
315

316

317
static void extWinIncCurWinOutIdx(SStreamRuntimeInfo* pStreamRuntimeInfo) {
×
318
  pStreamRuntimeInfo->funcInfo.curOutIdx++;
×
319
}
×
320

321

322
static const STimeWindow* extWinGetNextWin(SExternalWindowOperator* pExtW, SExecTaskInfo* pTaskInfo) {
×
323
  int32_t curIdx = extWinGetCurWinIdx(pTaskInfo);
×
324
  if (curIdx + 1 >= pExtW->pWins->size) return NULL;
×
325
  return taosArrayGet(pExtW->pWins, curIdx + 1);
×
326
}
327

328

329
static int32_t extWinAppendWinIdx(SExecTaskInfo*       pTaskInfo, SArray* pIdx, SSDataBlock* pBlock, int32_t currWinIdx, int32_t rows) {
858✔
330
  int32_t  code = 0, lino = 0;
858✔
331
  int64_t* lastRes = taosArrayGetLast(pIdx);
858✔
332
  int32_t* lastWinIdx = (int32_t*)lastRes;
858✔
333
  int32_t* lastRowIdx = lastWinIdx ? (lastWinIdx + 1) : NULL;
858✔
334
  int64_t  res = 0;
858✔
335
  int32_t* pWinIdx = (int32_t*)&res;
858✔
336
  int32_t* pRowIdx = pWinIdx + 1;
858✔
337

338
  if (lastWinIdx && *lastWinIdx == currWinIdx) {
858!
339
    return code;
×
340
  }
341

342
  *pWinIdx = currWinIdx;
858✔
343
  *pRowIdx = pBlock->info.rows - rows;
858✔
344

345
  TSDB_CHECK_NULL(taosArrayPush(pIdx, &res), code, lino, _exit, terrno);
858!
346

347
_exit:
858✔
348

349
  if (code) {
858!
350
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
351
  }
352

353
  return code;
858✔
354
}
355

356

357
static int32_t mergeAlignExtWinSetOutputBuf(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, const STimeWindow* pWin, SResultRow** pResult,
3✔
358
                                       SExprSupp* pExprSup, SAggSupporter* pAggSup) {
359
  if (*pResult == NULL) {
3!
360
    *pResult = getNewResultRow(pAggSup->pResultBuf, &pAggSup->currentPageId, pAggSup->resultRowSize);
3✔
361
    if (!*pResult) {
3!
362
      qError("get new resultRow failed, err:%s", tstrerror(terrno));
×
363
      return terrno;
×
364
    }
365
    pResultRowInfo->cur = (SResultRowPosition){.pageId = (*pResult)->pageId, .offset = (*pResult)->offset};
3✔
366
  }
367
  
368
  (*pResult)->win = *pWin;
3✔
369
  (*pResult)->winIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
3✔
370
  
371
  return setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
3✔
372
}
373

374

375
static int32_t mergeAlignExtWinGetWinFromTs(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW, TSKEY ts, STimeWindow** ppWin) {
3✔
376
  int32_t blkWinIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
3✔
377
  
378
  // TODO handle desc order
379
  for (int32_t i = blkWinIdx; i < pExtW->pWins->size; ++i) {
3!
380
    STimeWindow* pWin = taosArrayGet(pExtW->pWins, i);
3✔
381
    if (ts == pWin->skey) {
3!
382
      extWinSetCurWinIdx(pOperator, i);
3✔
383
      *ppWin = pWin;
3✔
384
      return TSDB_CODE_SUCCESS;
3✔
385
    } else if (ts < pWin->skey) {
×
386
      qError("invalid ts %" PRId64 " for current window idx %d skey %" PRId64, ts, i, pWin->skey);
×
387
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
388
    }
389
  }
390
  
391
  qError("invalid ts %" PRId64 " to find merge aligned ext window, size:%d", ts, (int32_t)pExtW->pWins->size);
×
392
  return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
393
}
394

395
static int32_t mergeAlignExtWinFinalizeResult(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, SSDataBlock* pResultBlock) {
3✔
396
  int32_t        code = 0, lino = 0;
3✔
397
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
3✔
398
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
3✔
399
  SExprSupp*     pSup = &pOperator->exprSupp;
3✔
400
  SResultRow*  pResultRow = pMlExtInfo->pResultRow;
3✔
401
  
402
  finalizeResultRows(pExtW->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pOperator->pTaskInfo);
3✔
403
  
404
  if (pResultRow->numOfRows > 0) {
3!
405
    TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pExtW->pWinRowIdx, pResultBlock, pResultRow->winIdx, pResultRow->numOfRows));
3!
406
  }
407

408
_exit:
3✔
409

410
  if (code) {
3!
411
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
412
  }
413

414
  return code;
3✔
415
}
416

417
static int32_t mergeAlignExtWinAggDo(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock, SSDataBlock* pResultBlock) {
3✔
418
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
3✔
419
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
3✔
420

421
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
3✔
422
  SExprSupp*     pSup = &pOperator->exprSupp;
3✔
423
  int32_t        code = 0, lino = 0;
3✔
424
  STimeWindow *pWin = NULL;
3✔
425

426
  int32_t startPos = 0;
3✔
427
  int64_t* tsCols = extWinExtractTsCol(pBlock, pExtW->primaryTsIndex, pTaskInfo);
3✔
428
  TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
3✔
429
  
430
  code = mergeAlignExtWinGetWinFromTs(pOperator, pExtW, ts, &pWin);
3✔
431
  if (code) {
3!
432
    qError("failed to get time window for ts:%" PRId64 ", prim ts index:%d, error:%s", ts, pExtW->primaryTsIndex, tstrerror(code));
×
433
    TAOS_CHECK_EXIT(code);
×
434
  }
435

436
  if (pMlExtInfo->curTs != INT64_MIN && pMlExtInfo->curTs != pWin->skey) {
3!
437
    TAOS_CHECK_EXIT(mergeAlignExtWinFinalizeResult(pOperator, pResultRowInfo, pResultBlock));
×
438
    resetResultRow(pMlExtInfo->pResultRow, pExtW->aggSup.resultRowSize - sizeof(SResultRow));
×
439
  }
440
  
441
  TAOS_CHECK_EXIT(mergeAlignExtWinSetOutputBuf(pOperator, pResultRowInfo, pWin, &pMlExtInfo->pResultRow, pSup, &pExtW->aggSup));
3!
442

443
  int32_t currPos = startPos;
3✔
444
  pMlExtInfo->curTs = pWin->skey;
3✔
445
  
446
  while (++currPos < pBlock->info.rows) {
3!
447
    if (tsCols[currPos] == pMlExtInfo->curTs) continue;
×
448

449
    qDebug("current ts:%" PRId64 ", startPos:%d, currPos:%d, tsCols[currPos]:%" PRId64,
×
450
      pMlExtInfo->curTs, startPos, currPos, tsCols[currPos]); 
451
    TAOS_CHECK_EXIT(applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pExtW->twAggSup.timeWindowData, startPos,
×
452
                                           currPos - startPos, pBlock->info.rows, pSup->numOfExprs));
453

454
    TAOS_CHECK_EXIT(mergeAlignExtWinFinalizeResult(pOperator, pResultRowInfo, pResultBlock));
×
455
    resetResultRow(pMlExtInfo->pResultRow, pExtW->aggSup.resultRowSize - sizeof(SResultRow));
×
456

457
    TAOS_CHECK_EXIT(mergeAlignExtWinGetWinFromTs(pOperator, pExtW, tsCols[currPos], &pWin));
×
458
    
459
    qDebug("ext window align2 start:%" PRId64 ", end:%" PRId64, pWin->skey, pWin->ekey);
×
460
    startPos = currPos;
×
461
    
462
    TAOS_CHECK_EXIT(mergeAlignExtWinSetOutputBuf(pOperator, pResultRowInfo, pWin, &pMlExtInfo->pResultRow, pSup, &pExtW->aggSup));
×
463

464
    pMlExtInfo->curTs = pWin->skey;
×
465
  }
466

467
  code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pExtW->twAggSup.timeWindowData, startPos,
3✔
468
                                         currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
3✔
469

470
_exit:
3✔
471

472
  if (code != 0) {
3!
473
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
474
    T_LONG_JMP(pTaskInfo->env, code);
×
475
  }
476
  
477
  return code;
3✔
478
}
479

480
static int32_t mergeAlignExtWinBuildWinRowIdx(SOperatorInfo* pOperator, SSDataBlock* pInput, SSDataBlock* pResult) {
×
481
  SExternalWindowOperator* pExtW = pOperator->info;
×
482
  int64_t* tsCols = extWinExtractTsCol(pInput, pExtW->primaryTsIndex, pOperator->pTaskInfo);
×
483
  STimeWindow* pWin = NULL;
×
484
  int32_t code = 0, lino = 0;
×
485
  int64_t prevTs = INT64_MIN;
×
486
  
487
  for (int32_t i = 0; i < pInput->info.rows; ++i) {
×
488
    if (prevTs == tsCols[i]) {
×
489
      continue;
×
490
    }
491
    
492
    TAOS_CHECK_EXIT(mergeAlignExtWinGetWinFromTs(pOperator, pExtW, tsCols[i], &pWin));
×
493
    TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pExtW->pWinRowIdx, pResult, extWinGetCurWinIdx(pOperator->pTaskInfo), pInput->info.rows - i));
×
494

495
    prevTs = tsCols[i];
×
496
  }
497

498
_exit:
×
499

500
  if (code != 0) {
×
501
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
502
  }
503

504
  return code;  
×
505
}
506

507
static int32_t mergeAlignExtWinProjectDo(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
×
508
                                            SSDataBlock* pResultBlock) {
509
  SExternalWindowOperator* pExtW = pOperator->info;
×
510
  SExprSupp*               pExprSup = &pExtW->scalarSupp;
×
511
  int32_t                  code = 0, lino = 0;
×
512
  
513
  TAOS_CHECK_EXIT(projectApplyFunctions(pExprSup->pExprInfo, pResultBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
×
514
                        GET_STM_RTINFO(pOperator->pTaskInfo)));
515

516
  TAOS_CHECK_EXIT(mergeAlignExtWinBuildWinRowIdx(pOperator, pBlock, pResultBlock));
×
517

518
_exit:
×
519

520
  if (code != 0) {
×
521
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
522
  }
523

524
  return code;
×
525
}
526

527
void mergeAlignExtWinDo(SOperatorInfo* pOperator) {
4✔
528
  SExecTaskInfo*                       pTaskInfo = pOperator->pTaskInfo;
4✔
529
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
4✔
530
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
4✔
531
  SResultRow*                          pResultRow = NULL;
4✔
532
  int32_t                              code = 0;
4✔
533
  SSDataBlock*                         pRes = pExtW->binfo.pRes;
4✔
534
  SExprSupp*                           pSup = &pOperator->exprSupp;
4✔
535
  int32_t                              lino = 0;
4✔
536

537
  taosArrayClear(pExtW->pWinRowIdx);
4✔
538
  blockDataCleanup(pRes);
4✔
539

540
  while (1) {
3✔
541
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
7✔
542

543
    if (pBlock == NULL) {
7✔
544
      // close last time window
545
      if (pMlExtInfo->curTs != INT64_MIN && EEXT_MODE_AGG == pExtW->mode) {
4!
546
        TAOS_CHECK_EXIT(mergeAlignExtWinFinalizeResult(pOperator, &pExtW->binfo.resultRowInfo, pRes));
3!
547
      }
548
      setOperatorCompleted(pOperator);
4✔
549
      break;
4✔
550
    }
551

552
    pRes->info.scanFlag = pBlock->info.scanFlag;
3✔
553
    code = setInputDataBlock(pSup, pBlock, pExtW->binfo.inputTsOrder, pBlock->info.scanFlag, true);
3✔
554
    QUERY_CHECK_CODE(code, lino, _exit);
3!
555

556
    printDataBlock(pBlock, __func__, "externalwindowAlign", pTaskInfo->id.queryId);
3✔
557
    qDebug("ext windowpExtWAlign->scalarMode:%d", pExtW->mode);
3!
558

559
    if (EEXT_MODE_SCALAR == pExtW->mode) {
3!
560
      TAOS_CHECK_EXIT(mergeAlignExtWinProjectDo(pOperator, &pExtW->binfo.resultRowInfo, pBlock, pRes));
×
561
    } else {
562
      TAOS_CHECK_EXIT(mergeAlignExtWinAggDo(pOperator, &pExtW->binfo.resultRowInfo, pBlock, pRes));
3!
563
    }
564

565
    if (pRes->info.rows >= pOperator->resultInfo.threshold) {
3!
566
      break;
×
567
    }
568
  }
569

570
  pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamBlkWinIdx = pExtW->pWinRowIdx;
4✔
571
  
572
_exit:
4✔
573

574
  if (code != 0) {
4!
575
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
576
    pTaskInfo->code = code;
×
577
    T_LONG_JMP(pTaskInfo->env, code);
×
578
  }
579
}
4✔
580

581
static int32_t mergeAlignExtWinNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
7✔
582
  SExecTaskInfo*                       pTaskInfo = pOperator->pTaskInfo;
7✔
583
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
7✔
584
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
7✔
585
  int32_t                              code = 0;
7✔
586
  int32_t lino = 0;
7✔
587

588
  if (pOperator->status == OP_EXEC_DONE) {
7✔
589
    (*ppRes) = NULL;
3✔
590
    return TSDB_CODE_SUCCESS;
3✔
591
  }
592

593
  SSDataBlock* pRes = pExtW->binfo.pRes;
4✔
594
  blockDataCleanup(pRes);
4✔
595

596
  if (taosArrayGetSize(pExtW->pWins) <= 0) {
4!
597
    size_t size = taosArrayGetSize(pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPesudoFuncVals);
4✔
598
    STimeWindow* pWin = taosArrayReserve(pExtW->pWins, size);
4✔
599
    TSDB_CHECK_NULL(pWin, code, lino, _exit, terrno);
4!
600

601
    for (int32_t i = 0; i < size; ++i) {
8✔
602
      SSTriggerCalcParam* pParam = taosArrayGet(pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPesudoFuncVals, i);
4✔
603
      pWin[i].skey = pParam->wstart;
4✔
604
      pWin[i].ekey = pParam->wstart + 1;
4✔
605
    }
606
    
607
    pExtW->outputWinId = pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx;
4✔
608
  }
609

610
  mergeAlignExtWinDo(pOperator);
4✔
611
  
612
  size_t rows = pRes->info.rows;
4✔
613
  pOperator->resultInfo.totalRows += rows;
4✔
614
  (*ppRes) = (rows == 0) ? NULL : pRes;
4✔
615

616
_exit:
4✔
617

618
  if (code != 0) {
4!
619
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
620
    pTaskInfo->code = code;
×
621
    T_LONG_JMP(pTaskInfo->env, code);
×
622
  }
623
  return code;
4✔
624
}
625

626
int32_t resetMergeAlignedExtWinOperator(SOperatorInfo* pOperator) {
4✔
627
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
4✔
628
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
4✔
629
  SExecTaskInfo*                       pTaskInfo = pOperator->pTaskInfo;
4✔
630
  SMergeAlignedIntervalPhysiNode * pPhynode = (SMergeAlignedIntervalPhysiNode*)pOperator->pPhyNode;
4✔
631
  pOperator->status = OP_NOT_OPENED;
4✔
632

633
  taosArrayClear(pExtW->pWins);
4✔
634

635
  resetBasicOperatorState(&pExtW->binfo);
4✔
636
  pMlExtInfo->pResultRow = NULL;
4✔
637
  pMlExtInfo->curTs = INT64_MIN;
4✔
638

639
  int32_t code = resetAggSup(&pOperator->exprSupp, &pExtW->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
4✔
640
                             sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
4✔
641
                             &pTaskInfo->storageAPI.functionStore);
642
  if (code == 0) {
4!
643
    colDataDestroy(&pExtW->twAggSup.timeWindowData);
4✔
644
    code = initExecTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pTaskInfo->window);
4✔
645
  }
646
  return code;
4✔
647
}
648

649
int32_t createMergeAlignedExternalWindowOperator(SOperatorInfo* pDownstream, SPhysiNode* pNode,
4✔
650
                                                 SExecTaskInfo* pTaskInfo, SOperatorInfo** ppOptrOut) {
651
  SMergeAlignedIntervalPhysiNode* pPhynode = (SMergeAlignedIntervalPhysiNode*)pNode;
4✔
652
  int32_t code = 0;
4✔
653
  int32_t lino = 0;
4✔
654
  SMergeAlignedExternalWindowOperator* pMlExtInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedExternalWindowOperator));
4!
655
  SOperatorInfo*                       pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
4!
656

657
  if (pTaskInfo->pStreamRuntimeInfo != NULL){
4!
658
    pTaskInfo->pStreamRuntimeInfo->funcInfo.withExternalWindow = true;
4✔
659
  }
660
  pOperator->pPhyNode = pNode;
4✔
661
  if (!pMlExtInfo || !pOperator) {
4!
662
    code = terrno;
×
663
    goto _error;
×
664
  }
665

666
  pMlExtInfo->pExtW = taosMemoryCalloc(1, sizeof(SExternalWindowOperator));
4!
667
  if (!pMlExtInfo->pExtW) {
4!
668
    code = terrno;
×
669
    goto _error;
×
670
  }
671

672
  SExternalWindowOperator* pExtW = pMlExtInfo->pExtW;
4✔
673
  SExprSupp* pSup = &pOperator->exprSupp;
4✔
674
  pSup->hasWindowOrGroup = true;
4✔
675
  pMlExtInfo->curTs = INT64_MIN;
4✔
676

677
  pExtW->primaryTsIndex = ((SColumnNode*)pPhynode->window.pTspk)->slotId;
4✔
678
  pExtW->mode = pPhynode->window.pProjs ? EEXT_MODE_SCALAR : EEXT_MODE_AGG;
4!
679
  pExtW->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC;
4✔
680
  pExtW->binfo.outputTsOrder = pExtW->binfo.inputTsOrder;
4✔
681

682
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
4✔
683
  initResultSizeInfo(&pOperator->resultInfo, 4096);
4✔
684

685
  int32_t num = 0;
4✔
686
  SExprInfo* pExprInfo = NULL;
4✔
687
  code = createExprInfo(pPhynode->window.pFuncs, NULL, &pExprInfo, &num);
4✔
688
  QUERY_CHECK_CODE(code, lino, _error);
4!
689

690
  if (pExtW->mode == EEXT_MODE_AGG) {
4!
691
    code = initAggSup(pSup, &pExtW->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
4✔
692
                      &pTaskInfo->storageAPI.functionStore);
693
    QUERY_CHECK_CODE(code, lino, _error);
4!
694
  }
695

696
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhynode->window.node.pOutputDataBlockDesc);
4✔
697
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
4!
698
  initBasicInfo(&pExtW->binfo, pResBlock);
4✔
699

700
  pExtW->pWins = taosArrayInit(4096, sizeof(STimeWindow));
4✔
701
  if (!pExtW->pWins) QUERY_CHECK_CODE(terrno, lino, _error);
4!
702

703
  pExtW->pWinRowIdx = taosArrayInit(4096, sizeof(int64_t));
4✔
704
  TSDB_CHECK_NULL(pExtW->pWinRowIdx, code, lino, _error, terrno);
4!
705

706
  initResultRowInfo(&pExtW->binfo.resultRowInfo);
4✔
707
  code = blockDataEnsureCapacity(pExtW->binfo.pRes, pOperator->resultInfo.capacity);
4✔
708
  QUERY_CHECK_CODE(code, lino, _error);
4!
709
  setOperatorInfo(pOperator, "MergeAlignedExternalWindowOperator", QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW, false, OP_NOT_OPENED, pMlExtInfo, pTaskInfo);
4✔
710
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignExtWinNext, NULL,
4✔
711
                                         destroyMergeAlignedExternalWindowOperator, optrDefaultBufFn, NULL,
712
                                         optrDefaultGetNextExtFn, NULL);
713
  setOperatorResetStateFn(pOperator, resetMergeAlignedExtWinOperator);
4✔
714

715
  code = appendDownstream(pOperator, &pDownstream, 1);
4✔
716
  QUERY_CHECK_CODE(code, lino, _error);
4!
717
  *ppOptrOut = pOperator;
4✔
718
  return code;
4✔
719
  
720
_error:
×
721
  if (pMlExtInfo) destroyMergeAlignedExternalWindowOperator(pMlExtInfo);
×
722
  destroyOperatorAndDownstreams(pOperator, &pDownstream, 1);
×
723
  pTaskInfo->code = code;
×
724
  return code;
×
725
}
726

727
static int32_t resetExternalWindowExprSupp(SExternalWindowOperator* pExtW, SExecTaskInfo* pTaskInfo,
343✔
728
                                           SExternalWindowPhysiNode* pPhynode) {
729
  int32_t    code = 0, lino = 0, num = 0;
343✔
730
  SExprInfo* pExprInfo = NULL;
343✔
731
  cleanupExprSuppWithoutFilter(&pExtW->scalarSupp);
343✔
732

733
  SNodeList* pNodeList = NULL;
343✔
734
  if (pPhynode->window.pProjs) {
343!
735
    pNodeList = pPhynode->window.pProjs;
×
736
  } else {
737
    pNodeList = pPhynode->window.pExprs;
343✔
738
  }
739

740
  code = createExprInfo(pNodeList, NULL, &pExprInfo, &num);
343✔
741
  QUERY_CHECK_CODE(code, lino, _error);
343!
742
  code = initExprSupp(&pExtW->scalarSupp, pExprInfo, num, &pTaskInfo->storageAPI.functionStore);
343✔
743
  QUERY_CHECK_CODE(code, lino, _error);
343!
744
  return code;
343✔
745
_error:
×
746
  if (code != TSDB_CODE_SUCCESS) {
×
747
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
748
    pTaskInfo->code = code;
×
749
  }
750
  return code;
×
751
}
752

753
static int32_t resetExternalWindowOperator(SOperatorInfo* pOperator) {
343✔
754
  int32_t code = 0, lino = 0;
343✔
755
  SExternalWindowOperator* pExtW = pOperator->info;
343✔
756
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
343✔
757
  SExternalWindowPhysiNode* pPhynode = (SExternalWindowPhysiNode*)pOperator->pPhyNode;
343✔
758
  pOperator->status = OP_NOT_OPENED;
343✔
759

760
  //resetBasicOperatorState(&pExtW->binfo);
761
  initResultRowInfo(&pExtW->binfo.resultRowInfo);
343✔
762

763
  pExtW->outputWinId = 0;
343✔
764
  pExtW->lastWinId = -1;
343✔
765
  taosArrayClear(pExtW->pWins);
343✔
766
  extWinRecycleBlkNode(pExtW, &pExtW->pLastBlkNode);
343✔
767

768
/*
769
  int32_t code = blockDataEnsureCapacity(pExtW->binfo.pRes, pOperator->resultInfo.capacity);
770
  if (code == 0) {
771
    code = resetAggSup(&pOperator->exprSupp, &pExtW->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
772
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
773
                       &pTaskInfo->storageAPI.functionStore);
774
  }
775
*/
776
  TAOS_CHECK_EXIT(resetExternalWindowExprSupp(pExtW, pTaskInfo, pPhynode));
343!
777
  colDataDestroy(&pExtW->twAggSup.timeWindowData);
343✔
778
  TAOS_CHECK_EXIT(initExecTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pTaskInfo->window));
343!
779

780
  pExtW->outWinIdx = 0;
343✔
781
  pExtW->lastSKey = INT64_MIN;
343✔
782

783
  qDebug("%s ext window stat at reset, created:%" PRId64 ", destroyed:%" PRId64 ", recycled:%" PRId64 ", reused:%" PRId64 ", append:%" PRId64, 
343✔
784
      pTaskInfo->id.str, pExtW->stat.resBlockCreated, pExtW->stat.resBlockDestroyed, pExtW->stat.resBlockRecycled, 
785
      pExtW->stat.resBlockReused, pExtW->stat.resBlockAppend);
786

787
_exit:
30✔
788

789
  if (code) {
343!
790
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
791
  }
792
  
793
  return code;
343✔
794
}
795

796
static EDealRes extWinHasCountLikeFunc(SNode* pNode, void* res) {
1,585✔
797
  if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
1,585✔
798
    SFunctionNode* pFunc = (SFunctionNode*)pNode;
561✔
799
    if (fmIsCountLikeFunc(pFunc->funcId) || (pFunc->hasOriginalFunc && fmIsCountLikeFunc(pFunc->originalFuncId))) {
561!
800
      *(bool*)res = true;
163✔
801
      return DEAL_RES_END;
163✔
802
    }
803
  }
804
  return DEAL_RES_CONTINUE;
1,422✔
805
}
806

807

808
static int32_t extWinCreateEmptyInputBlock(SOperatorInfo* pOperator, SSDataBlock** ppBlock) {
163✔
809
  int32_t code = TSDB_CODE_SUCCESS;
163✔
810
  int32_t lino = 0;
163✔
811
  SSDataBlock* pBlock = NULL;
163✔
812
  if (!tsCountAlwaysReturnValue) {
163!
813
    return TSDB_CODE_SUCCESS;
×
814
  }
815

816
  SExternalWindowOperator* pExtW = pOperator->info;
163✔
817

818
  if (!pExtW->hasCountFunc) {
163!
819
    return TSDB_CODE_SUCCESS;
×
820
  }
821

822
  code = createDataBlock(&pBlock);
163✔
823
  if (code) {
163!
824
    return code;
×
825
  }
826

827
  pBlock->info.rows = 1;
163✔
828
  pBlock->info.capacity = 0;
163✔
829

830
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
838✔
831
    SColumnInfoData colInfo = {0};
675✔
832
    colInfo.hasNull = true;
675✔
833
    colInfo.info.type = TSDB_DATA_TYPE_NULL;
675✔
834
    colInfo.info.bytes = 1;
675✔
835

836
    SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
675✔
837
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
1,414✔
838
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
739✔
839
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
739✔
840
        int32_t slotId = pFuncParam->pCol->slotId;
523✔
841
        int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
523✔
842
        if (slotId >= numOfCols) {
523✔
843
          code = taosArrayEnsureCap(pBlock->pDataBlock, slotId + 1);
337✔
844
          QUERY_CHECK_CODE(code, lino, _end);
337!
845

846
          for (int32_t k = numOfCols; k < slotId + 1; ++k) {
690✔
847
            void* tmp = taosArrayPush(pBlock->pDataBlock, &colInfo);
353✔
848
            QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
353!
849
          }
850
        }
851
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
216✔
852
        // do nothing
853
      }
854
    }
855
  }
856

857
  code = blockDataEnsureCapacity(pBlock, pBlock->info.rows);
163✔
858
  QUERY_CHECK_CODE(code, lino, _end);
163!
859

860
  for (int32_t i = 0; i < blockDataGetNumOfCols(pBlock); ++i) {
516✔
861
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
353✔
862
    QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
353!
863
    colDataSetNULL(pColInfoData, 0);
864
  }
865
  *ppBlock = pBlock;
163✔
866

867
_end:
163✔
868
  if (code != TSDB_CODE_SUCCESS) {
163!
869
    blockDataDestroy(pBlock);
×
870
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
871
  }
872
  return code;
163✔
873
}
874

875

876

877
static int extWinTsWinCompare(const void* pLeft, const void* pRight) {
16,158✔
878
  int64_t ts = *(int64_t*)pLeft;
16,158✔
879
  SExtWinTimeWindow* pWin = (SExtWinTimeWindow*)pRight;
16,158✔
880
  if (ts < pWin->tw.skey) {
16,158✔
881
    return -1;
5,715✔
882
  }
883
  if (ts >= pWin->tw.ekey) {
10,443✔
884
    return 1;
302✔
885
  }
886

887
  return 0;
10,141✔
888
}
889

890

891
static int32_t extWinGetMultiTbWinFromTs(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW, int64_t* tsCol, int64_t rowNum, int32_t* startPos) {
5,843✔
892
  int32_t idx = taosArraySearchIdx(pExtW->pWins, tsCol, extWinTsWinCompare, TD_EQ);
5,843✔
893
  if (idx >= 0) {
5,843✔
894
    *startPos = 0;
5,841✔
895
    return idx;
5,841✔
896
  }
897

898
  SExtWinTimeWindow* pWin = NULL;
2✔
899
  int32_t w = 0;
2✔
900
  for (int64_t i = 1; i < rowNum; ++i) {
4✔
901
    for (; w < pExtW->pWins->size; ++w) {
4!
902
      pWin = TARRAY_GET_ELEM(pExtW->pWins, w);
4✔
903
      if (tsCol[i] < pWin->tw.skey) {
4✔
904
        break;
2✔
905
      }
906
      
907
      if (tsCol[i] < pWin->tw.ekey) {
2!
908
        *startPos = i;
×
909
        return w;
×
910
      }
911
    }
912
  }
913

914
  return -1;
2✔
915
}
916

917
static int32_t extWinGetNoOvlpWin(SOperatorInfo* pOperator, int64_t* tsCol, int32_t* startPos, SDataBlockInfo* pInfo, SExtWinTimeWindow** ppWin, int32_t* winRows) {
288✔
918
  SExternalWindowOperator* pExtW = pOperator->info;
288✔
919
  if ((*startPos) >= pInfo->rows) {
288✔
920
    qDebug("%s %s blk rowIdx %d reach the end, size: %d, skip block", 
72✔
921
        GET_TASKID(pOperator->pTaskInfo), __func__, *startPos, (int32_t)pInfo->rows);
922
    *ppWin = NULL;
72✔
923
    return TSDB_CODE_SUCCESS;
72✔
924
  }
925
  
926
  if (pExtW->blkWinIdx < 0) {
216✔
927
    pExtW->blkWinIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
72✔
928
  } else {
929
    pExtW->blkWinIdx++;
144✔
930
  }
931

932
  if (pExtW->blkWinIdx >= pExtW->pWins->size) {
216!
933
    qDebug("%s %s ext win blk idx %d reach the end, size: %d, skip block", 
×
934
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (int32_t)pExtW->pWins->size);
935
    *ppWin = NULL;
×
936
    return TSDB_CODE_SUCCESS;
×
937
  }
938
  
939
  SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
216✔
940
  if (tsCol[pInfo->rows - 1] < pWin->tw.skey) {
216!
941
    qDebug("%s %s block end ts %" PRId64 " is small than curr win %d skey %" PRId64 ", skip block", 
×
942
        GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[pInfo->rows - 1], pExtW->blkWinIdx, pWin->tw.skey);
943
    *ppWin = NULL;
×
944
    return TSDB_CODE_SUCCESS;
×
945
  }
946

947
  int32_t r = *startPos;
216✔
948

949
  qDebug("%s %s start to get novlp win from winIdx %d rowIdx %d", GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, r);
216✔
950

951
  // TODO handle desc order
952
  for (; pExtW->blkWinIdx < pExtW->pWins->size; ++pExtW->blkWinIdx) {
240!
953
    pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
240✔
954
    for (; r < pInfo->rows; ++r) {
265!
955
      if (tsCol[r] < pWin->tw.skey) {
265✔
956
        continue;
25✔
957
      }
958

959
      if (tsCol[r] < pWin->tw.ekey) {
240✔
960
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
216✔
961
        *ppWin = pWin;
216✔
962
        *startPos = r;
216✔
963
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
216✔
964

965
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
216✔
966
            GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, r, tsCol[r], tsCol[r + *winRows - 1]);
967
        
968
        return TSDB_CODE_SUCCESS;
216✔
969
      }
970

971
      break;
24✔
972
    }
973

974
    if (r == pInfo->rows) {
24!
975
      break;
×
976
    }
977
  }
978

979
  qDebug("%s %s no more ext win in block, TR[%" PRId64 ", %" PRId64 "), skip it", 
×
980
      GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
981

982
  *ppWin = NULL;
×
983
  return TSDB_CODE_SUCCESS;
×
984
}
985

986
static int32_t extWinGetOvlpWin(SOperatorInfo* pOperator, int64_t* tsCol, int32_t* startPos, SDataBlockInfo* pInfo, SExtWinTimeWindow** ppWin, int32_t* winRows) {
20✔
987
  SExternalWindowOperator* pExtW = pOperator->info;
20✔
988
  if (pExtW->blkWinIdx < 0) {
20✔
989
    pExtW->blkWinIdx = pExtW->blkWinStartIdx;
9✔
990
  } else {
991
    pExtW->blkWinIdx++;
11✔
992
  }
993

994
  if (pExtW->blkWinIdx >= pExtW->pWins->size) {
20✔
995
    qDebug("%s %s ext win blk idx %d reach the end, size: %d, skip block", 
8!
996
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (int32_t)pExtW->pWins->size);
997
    *ppWin = NULL;
8✔
998
    return TSDB_CODE_SUCCESS;
8✔
999
  }
1000
  
1001
  SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
12✔
1002
  if (tsCol[pInfo->rows - 1] < pWin->tw.skey) {
12!
1003
    qDebug("%s %s block end ts %" PRId64 " is small than curr win %d skey %" PRId64 ", skip block", 
×
1004
        GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[pInfo->rows - 1], pExtW->blkWinIdx, pWin->tw.skey);
1005
    *ppWin = NULL;
×
1006
    return TSDB_CODE_SUCCESS;
×
1007
  }
1008

1009
  int64_t r = 0;
12✔
1010

1011
  qDebug("%s %s start to get ovlp win from winIdx %d rowIdx %d", GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pExtW->blkRowStartIdx);
12!
1012
  
1013
  // TODO handle desc order
1014
  for (; pExtW->blkWinIdx < pExtW->pWins->size; ++pExtW->blkWinIdx) {
28!
1015
    pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
28✔
1016
    for (r = pExtW->blkRowStartIdx; r < pInfo->rows; ++r) {
136✔
1017
      if (tsCol[r] < pWin->tw.skey) {
135✔
1018
        pExtW->blkRowStartIdx = r + 1;
108✔
1019
        continue;
108✔
1020
      }
1021

1022
      if (tsCol[r] < pWin->tw.ekey) {
27✔
1023
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
11✔
1024
        *ppWin = pWin;
11✔
1025
        *startPos = r;
11✔
1026
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
11✔
1027

1028
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
11!
1029
            GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, (int32_t)r, tsCol[r], tsCol[r + *winRows - 1]);
1030
        
1031
        if ((r + *winRows) < pInfo->rows) {
11✔
1032
          pExtW->blkWinStartIdx = pExtW->blkWinIdx + 1;
3✔
1033
          pExtW->blkWinStartSet = true;
3✔
1034
        }
1035
        
1036
        return TSDB_CODE_SUCCESS;
11✔
1037
      }
1038

1039
      break;
16✔
1040
    }
1041

1042
    if (r >= pInfo->rows) {
17✔
1043
      if (!pExtW->blkWinStartSet) {
1!
1044
        pExtW->blkWinStartIdx = pExtW->blkWinIdx;
1✔
1045
      }
1046
      
1047
      break;
1✔
1048
    }
1049
  }
1050

1051
  qDebug("%s %s no more ext win in block, TR[%" PRId64 ", %" PRId64 "), skip it", 
1!
1052
      GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1053

1054
  *ppWin = NULL;
1✔
1055
  return TSDB_CODE_SUCCESS;
1✔
1056
}
1057

1058

1059
static int32_t extWinGetMultiTbNoOvlpWin(SOperatorInfo* pOperator, int64_t* tsCol, int32_t* startPos, SDataBlockInfo* pInfo, SExtWinTimeWindow** ppWin, int32_t* winRows) {
30,218✔
1060
  SExternalWindowOperator* pExtW = pOperator->info;
30,218✔
1061
  if ((*startPos) >= pInfo->rows) {
30,218✔
1062
    qDebug("%s %s blk rowIdx %d reach the end, size: %d, skip block", 
5,841!
1063
        GET_TASKID(pOperator->pTaskInfo), __func__, *startPos, (int32_t)pInfo->rows);
1064
    *ppWin = NULL;
5,841✔
1065
    return TSDB_CODE_SUCCESS;
5,841✔
1066
  }
1067
  
1068
  if (pExtW->blkWinIdx < 0) {
24,377✔
1069
    pExtW->blkWinIdx = extWinGetMultiTbWinFromTs(pOperator, pExtW, tsCol, pInfo->rows, startPos);
5,843✔
1070
    if (pExtW->blkWinIdx < 0) {
5,843✔
1071
      qDebug("%s %s blk TR[%" PRId64 ", %" PRId64 ") not in any win, skip block", 
2!
1072
          GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1073
      *ppWin = NULL;
2✔
1074
      return TSDB_CODE_SUCCESS;
2✔
1075
    }
1076

1077
    extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
5,841✔
1078
    *ppWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
5,841✔
1079
    *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, *startPos, (*ppWin)->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
5,841✔
1080

1081
    qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
5,841!
1082
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (*ppWin)->tw.skey, (*ppWin)->tw.ekey, *winRows, *startPos, tsCol[*startPos], tsCol[*startPos + *winRows - 1]);
1083
    
1084
    return TSDB_CODE_SUCCESS;
5,841✔
1085
  } else {
1086
    pExtW->blkWinIdx++;
18,534✔
1087
  }
1088

1089
  if (pExtW->blkWinIdx >= pExtW->pWins->size) {
18,534!
1090
    qDebug("%s %s ext win blk idx %d reach the end, size: %d, skip block", 
×
1091
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (int32_t)pExtW->pWins->size);
1092
    *ppWin = NULL;
×
1093
    return TSDB_CODE_SUCCESS;
×
1094
  }
1095
  
1096
  SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
18,534✔
1097
  if (tsCol[pInfo->rows - 1] < pWin->tw.skey) {
18,535!
1098
    qDebug("%s %s block end ts %" PRId64 " is small than curr win %d skey %" PRId64 ", skip block", 
×
1099
        GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[pInfo->rows - 1], pExtW->blkWinIdx, pWin->tw.skey);
1100
    *ppWin = NULL;
×
1101
    return TSDB_CODE_SUCCESS;
×
1102
  }
1103

1104
  int32_t r = *startPos;
18,535✔
1105

1106
  qDebug("%s %s start to get mnovlp win from winIdx %d rowIdx %d", GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, r);
18,535!
1107

1108
  // TODO handle desc order
1109
  for (; pExtW->blkWinIdx < pExtW->pWins->size; ++pExtW->blkWinIdx) {
18,538!
1110
    pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
18,538✔
1111
    for (; r < pInfo->rows; ++r) {
18,545!
1112
      if (tsCol[r] < pWin->tw.skey) {
18,545✔
1113
        continue;
6✔
1114
      }
1115

1116
      if (tsCol[r] < pWin->tw.ekey) {
18,539!
1117
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
18,539✔
1118
        *ppWin = pWin;
18,538✔
1119
        *startPos = r;
18,538✔
1120
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
18,538✔
1121

1122
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
18,536!
1123
            GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, r, tsCol[r], tsCol[r + *winRows - 1]);
1124
        
1125
        return TSDB_CODE_SUCCESS;
18,540✔
1126
      }
1127

1128
      break;
×
1129
    }
1130

1131
    if (r == pInfo->rows) {
×
1132
      break;
×
1133
    }
1134
  }
1135

1136
  qDebug("%s %s no more ext win in block, TR[%" PRId64 ", %" PRId64 "), skip it", 
×
1137
      GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1138

1139
  *ppWin = NULL;
×
1140
  return TSDB_CODE_SUCCESS;
×
1141
}
1142

1143
static int32_t extWinGetFirstWinFromTs(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW, int64_t* tsCol,
4,600✔
1144
                                       int64_t rowNum, int32_t* startPos) {
1145
  SExtWinTimeWindow* pWin = NULL;
4,600✔
1146
  int32_t            idx = taosArraySearchIdx(pExtW->pWins, tsCol, extWinTsWinCompare, TD_EQ);
4,600✔
1147
  if (idx >= 0) {
4,600✔
1148
    for (int i = idx - 1; i >= 0; --i) {
4,300!
1149
      pWin = TARRAY_GET_ELEM(pExtW->pWins, i);
×
1150
      if (extWinTsWinCompare(tsCol, pWin) == 0) {
×
1151
        idx = i;
×
1152
      } else {
1153
        break;
×
1154
      }
1155
    }
1156
    *startPos = 0;
4,300✔
1157
    return idx;
4,300✔
1158
  }
1159

1160
  pWin = NULL;
300✔
1161
  int32_t w = 0;
300✔
1162
  for (int64_t i = 1; i < rowNum; ++i) {
602✔
1163
    for (; w < pExtW->pWins->size; ++w) {
702!
1164
      pWin = TARRAY_GET_ELEM(pExtW->pWins, w);
702✔
1165
      if (tsCol[i] < pWin->tw.skey) {
702✔
1166
        break;
302✔
1167
      }
1168

1169
      if (tsCol[i] < pWin->tw.ekey) {
400✔
1170
        *startPos = i;
100✔
1171
        return w;
100✔
1172
      }
1173
    }
1174
  }
1175

1176
  return -1;
200✔
1177
}
1178

1179
static int32_t extWinGetMultiTbOvlpWin(SOperatorInfo* pOperator, int64_t* tsCol, int32_t* startPos, SDataBlockInfo* pInfo, SExtWinTimeWindow** ppWin, int32_t* winRows) {
10,600✔
1180
  SExternalWindowOperator* pExtW = pOperator->info;
10,600✔
1181
  if (pExtW->blkWinIdx < 0) {
10,600✔
1182
    pExtW->blkWinIdx = extWinGetFirstWinFromTs(pOperator, pExtW, tsCol, pInfo->rows, startPos);
4,600✔
1183
    if (pExtW->blkWinIdx < 0) {
4,600✔
1184
      qDebug("%s %s blk TR[%" PRId64 ", %" PRId64 ") not in any win, skip block", 
200!
1185
          GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1186
      *ppWin = NULL;
200✔
1187
      return TSDB_CODE_SUCCESS;
200✔
1188
    }
1189

1190
    extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
4,400✔
1191
    *ppWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
4,400✔
1192
    *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, *startPos, (*ppWin)->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
4,400✔
1193
    
1194
    qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
4,400!
1195
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (*ppWin)->tw.skey, (*ppWin)->tw.ekey, *winRows, *startPos, tsCol[*startPos], tsCol[*startPos + *winRows - 1]);
1196
    
1197
    return TSDB_CODE_SUCCESS;
4,399✔
1198
  } else {
1199
    pExtW->blkWinIdx++;
6,000✔
1200
  }
1201

1202
  if (pExtW->blkWinIdx >= pExtW->pWins->size) {
6,000✔
1203
    qDebug("%s %s ext win blk idx %d reach the end, size: %d, skip block", 
4,100!
1204
        GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, (int32_t)pExtW->pWins->size);
1205
    *ppWin = NULL;
4,099✔
1206
    return TSDB_CODE_SUCCESS;
4,099✔
1207
  }
1208
  
1209
  SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
1,900✔
1210
  if (tsCol[pInfo->rows - 1] < pWin->tw.skey) {
1,900✔
1211
    qDebug("%s %s block end ts %" PRId64 " is small than curr win %d skey %" PRId64 ", skip block", 
300!
1212
        GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[pInfo->rows - 1], pExtW->blkWinIdx, pWin->tw.skey);
1213
    *ppWin = NULL;
300✔
1214
    return TSDB_CODE_SUCCESS;
300✔
1215
  }
1216

1217
  int64_t r = 0;
1,600✔
1218

1219
  qDebug("%s %s start to get movlp win from winIdx %d rowIdx %d", GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pExtW->blkRowStartIdx);
1,600!
1220

1221
  // TODO handle desc order
1222
  for (; pExtW->blkWinIdx < pExtW->pWins->size; ++pExtW->blkWinIdx) {
1,600!
1223
    pWin = taosArrayGet(pExtW->pWins, pExtW->blkWinIdx);
1,600✔
1224
    for (r = pExtW->blkRowStartIdx; r < pInfo->rows; ++r) {
5,304!
1225
      if (tsCol[r] < pWin->tw.skey) {
5,304✔
1226
        pExtW->blkRowStartIdx = r + 1;
3,704✔
1227
        continue;
3,704✔
1228
      }
1229

1230
      if (tsCol[r] < pWin->tw.ekey) {
1,600!
1231
        extWinSetCurWinIdx(pOperator, pExtW->blkWinIdx);
1,600✔
1232
        *ppWin = pWin;
1,600✔
1233
        *startPos = r;
1,600✔
1234
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
1,600✔
1235

1236
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
1,600!
1237
            GET_TASKID(pOperator->pTaskInfo), __func__, pExtW->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, (int32_t)r, tsCol[r], tsCol[r + *winRows - 1]);
1238
        
1239
        return TSDB_CODE_SUCCESS;
1,600✔
1240
      }
1241

1242
      break;
×
1243
    }
1244

1245
    if (r >= pInfo->rows) {
×
1246
      break;
×
1247
    }
1248
  }
1249

1250
  qDebug("%s %s no more ext win in block, TR[%" PRId64 ", %" PRId64 "), skip it", 
×
1251
      GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1252

1253
  *ppWin = NULL;
×
1254
  return TSDB_CODE_SUCCESS;
×
1255
}
1256

1257

1258
static int32_t extWinGetWinStartPos(STimeWindow win, const SDataBlockInfo* pBlockInfo, int32_t lastEndPos, int32_t order, int32_t* nextPos, int64_t* tsCol) {
×
1259
  bool ascQuery = order == TSDB_ORDER_ASC;
×
1260

1261
  if (win.ekey <= pBlockInfo->window.skey && ascQuery) {
×
1262
    return -2;
×
1263
  }
1264
//if (win.skey > pBlockInfo->window.ekey && !ascQuery) return -2;
1265

1266
  if (win.skey > pBlockInfo->window.ekey && ascQuery) return -1;
×
1267
//if (win.ekey < pBlockInfo->window.skey && !ascQuery) return -1;
1268

1269
  while (true) {
1270
    if (win.ekey <= tsCol[lastEndPos + 1] && ascQuery) return -2;
×
1271
    if (win.skey <= tsCol[lastEndPos + 1] && ascQuery) break;
×
1272
    lastEndPos++;
×
1273
  }
1274

1275
  *nextPos = lastEndPos + 1;
×
1276
  return 0;
×
1277
}
1278

1279
static int32_t extWinAggSetWinOutputBuf(SOperatorInfo* pOperator, SExtWinTimeWindow* win, SExprSupp* pSupp, 
25,017✔
1280
                                     SAggSupporter* pAggSup, SExecTaskInfo* pTaskInfo) {
1281
  int32_t code = 0, lino = 0;
25,017✔
1282
  SResultRow* pResultRow = NULL;
25,017✔
1283
  SExternalWindowOperator* pExtW = (SExternalWindowOperator*)pOperator->info;
25,017✔
1284
  
1285
#if 0
1286
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
1287
                                                  true, tableGroupId, pTaskInfo, true, pAggSup, true);
1288
  if (pResultRow == NULL) {
1289
    qError("failed to set result output buffer, error:%s", tstrerror(pTaskInfo->code));
1290
    return pTaskInfo->code;
1291
  }
1292

1293
  qDebug("current result rows num:%d", tSimpleHashGetSize(pAggSup->pResultRowHashTable));
1294

1295
#else
1296
  if (win->winOutIdx >= 0) {
25,017✔
1297
    pResultRow = (SResultRow*)((char*)pExtW->pResultRow + win->winOutIdx * pAggSup->resultRowSize);
24,162✔
1298
  } else {
1299
    win->winOutIdx = pExtW->outWinIdx++;
855✔
1300
    
1301
    qDebug("set window [%" PRId64 ", %" PRId64 "] outIdx:%d", win->tw.skey, win->tw.ekey, win->winOutIdx);
855✔
1302

1303
    pResultRow = (SResultRow*)((char*)pExtW->pResultRow + win->winOutIdx * pAggSup->resultRowSize);
855✔
1304
    
1305
    memset(pResultRow, 0, pAggSup->resultRowSize);
855✔
1306

1307
    pResultRow->winIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
855✔
1308
    TAOS_SET_POBJ_ALIGNED(&pResultRow->win, &win->tw);
855✔
1309
  }
1310
#endif
1311

1312
  // set time window for current result
1313
  TAOS_CHECK_EXIT(setResultRowInitCtx(pResultRow, pSupp->pCtx, pSupp->numOfExprs, pSupp->rowEntryInfoOffset));
25,017!
1314

1315
_exit:
25,015✔
1316
  
1317
  if (code) {
25,015!
1318
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1319
  }
1320

1321
  return code;
25,015✔
1322
}
1323

1324
static int32_t extWinAggDo(SOperatorInfo* pOperator, int32_t startPos, int32_t forwardRows,
30,856✔
1325
                                  SSDataBlock* pInputBlock) {
1326
  if (forwardRows == 0) return 0;
30,856!
1327
  SExprSupp*               pSup = &pOperator->exprSupp;
30,856✔
1328
  SExternalWindowOperator* pExtW = pOperator->info;
30,856✔
1329
  return applyAggFunctionOnPartialTuples(pOperator->pTaskInfo, pSup->pCtx, &pExtW->twAggSup.timeWindowData, startPos,
30,856✔
1330
                                         forwardRows, pInputBlock->info.rows, pSup->numOfExprs);
30,856✔
1331

1332
}
1333

1334
static bool extWinLastWinClosed(SExternalWindowOperator* pExtW) {
×
1335
  if (pExtW->outWinIdx <= 0 || (pExtW->multiTableMode && !pExtW->inputHasOrder)) {
×
1336
    return false;
×
1337
  }
1338

1339
  if (NULL == pExtW->timeRangeExpr || !pExtW->timeRangeExpr->needCalc) {
×
1340
    return true;
×
1341
  }
1342

1343
  SList* pList = taosArrayGetP(pExtW->pOutputBlocks, pExtW->outWinIdx - 1);
×
1344
  if (0 == listNEles(pList)) {
×
1345
    return true;
×
1346
  }
1347

1348
  SListNode* pNode = listTail(pList);
×
1349
  SArray* pBlkWinIdx = *((SArray**)pNode->data + 1);
×
1350
  int64_t* pIdx = taosArrayGetLast(pBlkWinIdx);
×
1351
  if (pIdx && *(int32_t*)pIdx < pExtW->blkWinStartIdx) {
×
1352
    return true;
×
1353
  }
1354

1355
  return false;
×
1356
}
1357

1358
static int32_t extWinGetWinResBlock(SOperatorInfo* pOperator, int32_t rows, SExtWinTimeWindow* pWin, SSDataBlock** ppRes, SArray** ppIdx) {
×
1359
  SExternalWindowOperator* pExtW = pOperator->info;
×
1360
  SList*                   pList = NULL;
×
1361
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
×
1362
  
1363
  if (pWin->winOutIdx >= 0) {
×
1364
    pList = taosArrayGetP(pExtW->pOutputBlocks, pWin->winOutIdx);
×
1365
  } else {
1366
    if (extWinLastWinClosed(pExtW)) {
×
1367
      pWin->winOutIdx = pExtW->outWinIdx - 1;
×
1368
      pList = taosArrayGetP(pExtW->pOutputBlocks, pWin->winOutIdx);
×
1369
    } else {
1370
      pWin->winOutIdx = pExtW->outWinIdx++;
×
1371
      pList = tdListNew(POINTER_BYTES * 2);
×
1372
      TSDB_CHECK_NULL(pList, code, lino, _exit, terrno);
×
1373
      SList** ppList = taosArrayGet(pExtW->pOutputBlocks, pWin->winOutIdx);
×
1374
      extWinRecycleBlockList(pExtW, ppList);
×
1375
      *ppList = pList;
×
1376
    }
1377
  }
1378
  
1379
  TAOS_CHECK_EXIT(extWinGetLastBlockFromList(pExtW, pList, rows, ppRes, ppIdx));
×
1380

1381
_exit:
×
1382

1383
  if (code) {
×
1384
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1385
  }
1386

1387
  return code;
×
1388
}
1389

1390
static int32_t extWinProjectDo(SOperatorInfo* pOperator, SSDataBlock* pInputBlock, int32_t startPos, int32_t rows, SExtWinTimeWindow* pWin) {
×
1391
  SExternalWindowOperator* pExtW = pOperator->info;
×
1392
  SExprSupp*               pExprSup = &pExtW->scalarSupp;
×
1393
  SSDataBlock*             pResBlock = NULL;
×
1394
  SArray*                  pIdx = NULL;
×
1395
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
×
1396
  
1397
  TAOS_CHECK_EXIT(extWinGetWinResBlock(pOperator, rows, pWin, &pResBlock, &pIdx));
×
1398

1399
  qDebug("%s %s win[%" PRId64 ", %" PRId64 "] got res block %p winRowIdx %p, winOutIdx:%d, capacity:%d", 
×
1400
      pOperator->pTaskInfo->id.str, __func__, pWin->tw.skey, pWin->tw.ekey, pResBlock, pIdx, pWin->winOutIdx, pResBlock->info.capacity);
1401
  
1402
  if (!pExtW->pTmpBlock) {
×
1403
    TAOS_CHECK_EXIT(createOneDataBlock(pInputBlock, false, &pExtW->pTmpBlock));
×
1404
  } else {
1405
    blockDataCleanup(pExtW->pTmpBlock);
×
1406
  }
1407
  
1408
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pExtW->pTmpBlock, TMAX(1, rows)));
×
1409

1410
  qDebug("%s %s start to copy %d rows to tmp blk", pOperator->pTaskInfo->id.str, __func__, rows);
×
1411
  TAOS_CHECK_EXIT(blockDataMergeNRows(pExtW->pTmpBlock, pInputBlock, startPos, rows));
×
1412

1413
  qDebug("%s %s start to apply project to tmp blk", pOperator->pTaskInfo->id.str, __func__);
×
1414
  TAOS_CHECK_EXIT(projectApplyFunctionsWithSelect(pExprSup->pExprInfo, pResBlock, pExtW->pTmpBlock, pExprSup->pCtx, pExprSup->numOfExprs,
×
1415
        NULL, GET_STM_RTINFO(pOperator->pTaskInfo), true, pExprSup->hasIndefRowsFunc));
1416

1417
  TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pIdx, pResBlock, extWinGetCurWinIdx(pOperator->pTaskInfo), rows));
×
1418

1419
_exit:
×
1420

1421
  if (code) {
×
1422
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1423
  } else {
1424
    qDebug("%s %s project succeed", pOperator->pTaskInfo->id.str, __func__);
×
1425
  }
1426
  
1427
  return code;
×
1428
}
1429

1430
static int32_t extWinProjectOpen(SOperatorInfo* pOperator, SSDataBlock* pInputBlock) {
×
1431
  SExternalWindowOperator* pExtW = pOperator->info;
×
1432
  int64_t*                 tsCol = extWinExtractTsCol(pInputBlock, pExtW->primaryTsIndex, pOperator->pTaskInfo);
×
1433
  SExtWinTimeWindow*       pWin = NULL;
×
1434
  bool                     ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
×
1435
  int32_t                  startPos = 0, winRows = 0;
×
1436
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
×
1437
  
1438
  while (true) {
1439
    TAOS_CHECK_EXIT((*pExtW->getWinFp)(pOperator, tsCol, &startPos, &pInputBlock->info, &pWin, &winRows));
×
1440
    if (pWin == NULL) {
×
1441
      break;
×
1442
    }
1443

1444
    qDebug("%s ext window [%" PRId64 ", %" PRId64 ") project start, ascScan:%d, startPos:%d, winRows:%d",
×
1445
           GET_TASKID(pOperator->pTaskInfo), pWin->tw.skey, pWin->tw.ekey, ascScan, startPos, winRows);        
1446
    
1447
    TAOS_CHECK_EXIT(extWinProjectDo(pOperator, pInputBlock, startPos, winRows, pWin));
×
1448
    
1449
    startPos += winRows;
×
1450
  }
1451
  
1452
_exit:
×
1453

1454
  if (code) {
×
1455
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
1456
  }
1457

1458
  return code;
×
1459
}
1460

1461
static int32_t extWinIndefRowsDoImpl(SOperatorInfo* pOperator, SSDataBlock* pRes, SSDataBlock* pBlock) {
×
1462
  SExternalWindowOperator* pExtW = pOperator->info;
×
1463
  SOptrBasicInfo*     pInfo = &pExtW->binfo;
×
1464
  SExprSupp*          pSup = &pOperator->exprSupp;
×
1465
  SExecTaskInfo*      pTaskInfo = pOperator->pTaskInfo;
×
1466
  int32_t order = pInfo->inputTsOrder;
×
1467
  int32_t scanFlag = pBlock->info.scanFlag;
×
1468
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
×
1469

1470
  SExprSupp* pScalarSup = &pExtW->scalarSupp;
×
1471
  if (pScalarSup->pExprInfo != NULL) {
×
1472
    TAOS_CHECK_EXIT(projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
×
1473
                                 pExtW->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo)));
1474
  }
1475

1476
  TAOS_CHECK_EXIT(setInputDataBlock(pSup, pBlock, order, scanFlag, false));
×
1477

1478
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pRes, pRes->info.rows + pBlock->info.rows));
×
1479

1480
  TAOS_CHECK_EXIT(projectApplyFunctions(pSup->pExprInfo, pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
×
1481
                               pExtW->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo)));
1482

1483
_exit:
×
1484

1485
  if (code) {
×
1486
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __FUNCTION__, lino, tstrerror(code));
×
1487
  }
1488

1489
  return code;
×
1490
}
1491

1492
static int32_t extWinIndefRowsSetWinOutputBuf(SExternalWindowOperator* pExtW, SExtWinTimeWindow* win, SExprSupp* pSupp, 
×
1493
                                     SAggSupporter* pAggSup, SExecTaskInfo* pTaskInfo, bool reset) {
1494
  int32_t code = 0, lino = 0;
×
1495
  SResultRow* pResultRow = NULL;
×
1496

1497
  pResultRow = (SResultRow*)((char*)pExtW->pResultRow + win->winOutIdx * pAggSup->resultRowSize);
×
1498
  
1499
  qDebug("set window [%" PRId64 ", %" PRId64 "] outIdx:%d", win->tw.skey, win->tw.ekey, win->winOutIdx);
×
1500

1501
  if (reset) {
×
1502
    memset(pResultRow, 0, pAggSup->resultRowSize);
×
1503
    for (int32_t k = 0; k < pSupp->numOfExprs; ++k) {
×
1504
      SqlFunctionCtx* pCtx = &pSupp->pCtx[k];
×
1505
      pCtx->pOutput = NULL;
×
1506
    }
1507
  }
1508

1509
  TAOS_SET_POBJ_ALIGNED(&pResultRow->win, &win->tw);
×
1510

1511
  // set time window for current result
1512
  TAOS_CHECK_EXIT(setResultRowInitCtx(pResultRow, pSupp->pCtx, pSupp->numOfExprs, pSupp->rowEntryInfoOffset));
×
1513

1514
_exit:
×
1515
  
1516
  if (code) {
×
1517
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1518
  }
1519

1520
  return code;
×
1521
}
1522

1523
static int32_t extWinGetSetWinResBlockBuf(SOperatorInfo* pOperator, int32_t rows, SExtWinTimeWindow* pWin, SSDataBlock** ppRes, SArray** ppIdx) {
×
1524
  SExternalWindowOperator* pExtW = pOperator->info;
×
1525
  SList*                   pList = NULL;
×
1526
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
×
1527
  
1528
  if (pWin->winOutIdx >= 0) {
×
1529
    pList = taosArrayGetP(pExtW->pOutputBlocks, pWin->winOutIdx);
×
1530
    TAOS_CHECK_EXIT(extWinIndefRowsSetWinOutputBuf(pExtW, pWin, &pOperator->exprSupp, &pExtW->aggSup, pOperator->pTaskInfo, false));
×
1531
  } else {
1532
    if (extWinLastWinClosed(pExtW)) {
×
1533
      pWin->winOutIdx = pExtW->outWinIdx - 1;
×
1534
      pList = taosArrayGetP(pExtW->pOutputBlocks, pWin->winOutIdx);
×
1535
    } else {
1536
      pWin->winOutIdx = pExtW->outWinIdx++;
×
1537
      pList = tdListNew(POINTER_BYTES * 2);
×
1538
      TSDB_CHECK_NULL(pList, code, lino, _exit, terrno);
×
1539
      SList** ppList = taosArrayGet(pExtW->pOutputBlocks, pWin->winOutIdx);
×
1540
      extWinRecycleBlockList(pExtW, ppList);
×
1541
      *ppList = pList;
×
1542
    }
1543
    TAOS_CHECK_EXIT(extWinIndefRowsSetWinOutputBuf(pExtW, pWin, &pOperator->exprSupp, &pExtW->aggSup, pOperator->pTaskInfo, true));
×
1544
  }
1545
  
1546
  TAOS_CHECK_EXIT(extWinGetLastBlockFromList(pExtW, pList, rows, ppRes, ppIdx));
×
1547

1548
_exit:
×
1549

1550
  if (code) {
×
1551
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1552
  }
1553

1554
  return code;
×
1555
}
1556

1557

1558
static int32_t extWinIndefRowsDo(SOperatorInfo* pOperator, SSDataBlock* pInputBlock, int32_t startPos, int32_t rows, SExtWinTimeWindow* pWin) {
×
1559
  SExternalWindowOperator* pExtW = pOperator->info;
×
1560
  SSDataBlock*             pResBlock = NULL;
×
1561
  SArray*                  pIdx = NULL;
×
1562
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
×
1563
  
1564
  TAOS_CHECK_EXIT(extWinGetSetWinResBlockBuf(pOperator, rows, pWin, &pResBlock, &pIdx));
×
1565
  
1566
  if (!pExtW->pTmpBlock) {
×
1567
    TAOS_CHECK_EXIT(createOneDataBlock(pInputBlock, false, &pExtW->pTmpBlock));
×
1568
  } else {
1569
    blockDataCleanup(pExtW->pTmpBlock);
×
1570
  }
1571
  
1572
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pExtW->pTmpBlock, TMAX(1, rows)));
×
1573

1574
  TAOS_CHECK_EXIT(blockDataMergeNRows(pExtW->pTmpBlock, pInputBlock, startPos, rows));
×
1575
  TAOS_CHECK_EXIT(extWinIndefRowsDoImpl(pOperator, pResBlock, pExtW->pTmpBlock));
×
1576

1577
  TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pIdx, pResBlock, extWinGetCurWinIdx(pOperator->pTaskInfo), rows));
×
1578

1579
_exit:
×
1580

1581
  if (code) {
×
1582
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1583
  }
1584
  
1585
  return code;
×
1586
}
1587

1588

1589
static int32_t extWinIndefRowsOpen(SOperatorInfo* pOperator, SSDataBlock* pInputBlock) {
×
1590
  SExternalWindowOperator* pExtW = pOperator->info;
×
1591
  int64_t*                 tsCol = extWinExtractTsCol(pInputBlock, pExtW->primaryTsIndex, pOperator->pTaskInfo);
×
1592
  SExtWinTimeWindow*       pWin = NULL;
×
1593
  bool                     ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
×
1594
  int32_t                  startPos = 0, winRows = 0;
×
1595
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
×
1596
  
1597
  while (true) {
1598
    TAOS_CHECK_EXIT((*pExtW->getWinFp)(pOperator, tsCol, &startPos, &pInputBlock->info, &pWin, &winRows));
×
1599
    if (pWin == NULL) {
×
1600
      break;
×
1601
    }
1602

1603
    qDebug("%s ext window [%" PRId64 ", %" PRId64 ") indefRows start, ascScan:%d, startPos:%d, winRows:%d",
×
1604
           GET_TASKID(pOperator->pTaskInfo), pWin->tw.skey, pWin->tw.ekey, ascScan, startPos, winRows);        
1605
    
1606
    TAOS_CHECK_EXIT(extWinIndefRowsDo(pOperator, pInputBlock, startPos, winRows, pWin));
×
1607
    
1608
    startPos += winRows;
×
1609
  }
1610
  
1611
_exit:
×
1612

1613
  if (code) {
×
1614
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
1615
  }
1616

1617
  return code;
×
1618
}
1619

1620
static int32_t extWinNonAggOutputRes(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
1621
  SExternalWindowOperator* pExtW = pOperator->info;
×
1622
  int32_t                  numOfWin = pExtW->outWinIdx;
×
1623
  int32_t                  code = TSDB_CODE_SUCCESS;
×
1624
  int32_t                  lino = 0;
×
1625
  SSDataBlock*             pRes = NULL;
×
1626

1627
  for (; pExtW->outputWinId < numOfWin; pExtW->outputWinId++, extWinIncCurWinOutIdx(pOperator->pTaskInfo->pStreamRuntimeInfo)) {
×
1628
    SList* pList = taosArrayGetP(pExtW->pOutputBlocks, pExtW->outputWinId);
×
1629
    if (listNEles(pList) <= 0) {
×
1630
      continue;
×
1631
    }
1632

1633
    SListNode* pNode = tdListPopHead(pList);
×
1634
    pRes = *(SSDataBlock**)pNode->data;
×
1635
    pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamBlkWinIdx = *(SArray**)((SArray**)pNode->data + 1);
×
1636
    pExtW->pLastBlkNode = pNode;
×
1637

1638
    if (listNEles(pList) <= 0) {
×
1639
      pExtW->outputWinId++;
×
1640
      extWinIncCurWinOutIdx(pOperator->pTaskInfo->pStreamRuntimeInfo);
×
1641
    }
1642

1643
    break;
×
1644
  }
1645

1646
  if (pRes) {
×
1647
    qDebug("%s result generated, rows:%" PRId64 , GET_TASKID(pOperator->pTaskInfo), pRes->info.rows);
×
1648
    pRes->info.version = pOperator->pTaskInfo->version;
×
1649
    pRes->info.dataLoad = 1;
×
1650
  } else {
1651
    pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamBlkWinIdx = NULL;
×
1652
    qDebug("%s ext window done", GET_TASKID(pOperator->pTaskInfo));
×
1653
  }
1654

1655
  *ppRes = (pRes && pRes->info.rows > 0) ? pRes : NULL;
×
1656

1657
_exit:
×
1658

1659
  if (code != TSDB_CODE_SUCCESS) {
×
1660
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1661
  }
1662

1663
  return code;
×
1664
}
1665

1666
static int32_t extWinAggHandleEmptyWins(SOperatorInfo* pOperator, SSDataBlock* pBlock, bool allRemains, SExtWinTimeWindow* pWin) {
30,882✔
1667
  int32_t code = 0, lino = 0;
30,882✔
1668
  SExternalWindowOperator* pExtW = (SExternalWindowOperator*)pOperator->info;
30,882✔
1669
  SExprSupp* pSup = &pOperator->exprSupp;
30,882✔
1670
  int32_t currIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
30,882✔
1671

1672
  if (NULL == pExtW->pEmptyInputBlock || (pWin && pWin->tw.skey == pExtW->lastSKey)) {
30,881✔
1673
    goto _exit;
5,958✔
1674
  }
1675

1676
  bool ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
24,923✔
1677
  int32_t endIdx = allRemains ? (pExtW->pWins->size - 1) : (currIdx - 1);
24,923✔
1678
  SResultRowInfo* pResultRowInfo = &pExtW->binfo.resultRowInfo;
24,923✔
1679
  SSDataBlock* pInput = pExtW->pEmptyInputBlock;
24,923✔
1680

1681
  if ((pExtW->lastWinId + 1) <= endIdx) {
24,923✔
1682
    TAOS_CHECK_EXIT(setInputDataBlock(pSup, pExtW->pEmptyInputBlock, pExtW->binfo.inputTsOrder, MAIN_SCAN, true));
82!
1683
  }
1684
  
1685
  for (int32_t i = pExtW->lastWinId + 1; i <= endIdx; ++i) {
25,173✔
1686
    SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pWins, i);
250✔
1687

1688
    extWinSetCurWinIdx(pOperator, i);
250✔
1689
    qDebug("%s %dth ext empty window start:%" PRId64 ", end:%" PRId64 ", ascScan:%d",
250✔
1690
           GET_TASKID(pOperator->pTaskInfo), i, pWin->tw.skey, pWin->tw.ekey, ascScan);
1691

1692
    TAOS_CHECK_EXIT(extWinAggSetWinOutputBuf(pOperator, pWin, pSup, &pExtW->aggSup, pOperator->pTaskInfo));
250!
1693

1694
    updateTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pWin->tw, 1);
250✔
1695
    code = extWinAggDo(pOperator, 0, 1, pInput);
250✔
1696
    pExtW->lastWinId = i;  
250✔
1697
    TAOS_CHECK_EXIT(code);
250!
1698
  }
1699

1700
  
1701
_exit:
24,923✔
1702

1703
  if (code) {
30,881!
1704
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __FUNCTION__, lino, tstrerror(code));
×
1705
  } else {
1706
    if (pBlock) {
30,881✔
1707
      TAOS_CHECK_EXIT(setInputDataBlock(pSup, pBlock, pExtW->binfo.inputTsOrder, MAIN_SCAN, true));
30,606!
1708
    }
1709

1710
    if (!allRemains) {
30,878✔
1711
      extWinSetCurWinIdx(pOperator, currIdx);  
30,603✔
1712
    }
1713
  }
1714

1715
  return code;
30,878✔
1716
}
1717

1718
static int32_t extWinAggOpen(SOperatorInfo* pOperator, SSDataBlock* pInputBlock) {
10,524✔
1719
  SExternalWindowOperator* pExtW = (SExternalWindowOperator*)pOperator->info;
10,524✔
1720
  int32_t                  startPos = 0, winRows = 0;
10,524✔
1721
  int64_t*                 tsCol = extWinExtractTsCol(pInputBlock, pExtW->primaryTsIndex, pOperator->pTaskInfo);
10,524✔
1722
  bool                     ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
10,524✔
1723
  int32_t                  code = 0, lino = 0;
10,524✔
1724
  SExtWinTimeWindow*       pWin = NULL;
10,524✔
1725
  bool                     scalarCalc = false;
10,524✔
1726

1727
  while (true) {
1728
    TAOS_CHECK_EXIT((*pExtW->getWinFp)(pOperator, tsCol, &startPos, &pInputBlock->info, &pWin, &winRows));
41,126!
1729
    if (pWin == NULL) {
41,128✔
1730
      break;
10,522✔
1731
    }
1732

1733
    TAOS_CHECK_EXIT(extWinAggHandleEmptyWins(pOperator, pInputBlock, false, pWin));
30,606!
1734

1735
    qDebug("%s ext window [%" PRId64 ", %" PRId64 ") agg start, ascScan:%d, startPos:%d, winRows:%d",
30,603✔
1736
           GET_TASKID(pOperator->pTaskInfo), pWin->tw.skey, pWin->tw.ekey, ascScan, startPos, winRows);        
1737

1738
    if (!scalarCalc) {
30,607✔
1739
      if (pExtW->scalarSupp.pExprInfo) {
10,321!
1740
        SExprSupp* pScalarSup = &pExtW->scalarSupp;
×
1741
        TAOS_CHECK_EXIT(projectApplyFunctions(pScalarSup->pExprInfo, pInputBlock, pInputBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
×
1742
                                     pExtW->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo)));
1743
      }
1744
      
1745
      scalarCalc = true;
10,321✔
1746
    }
1747

1748
    if (pWin->tw.skey != pExtW->lastSKey) {
30,607✔
1749
      TAOS_CHECK_EXIT(extWinAggSetWinOutputBuf(pOperator, pWin, &pOperator->exprSupp, &pExtW->aggSup, pOperator->pTaskInfo));
24,766!
1750
    }
1751
    
1752
    updateTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pWin->tw, 1);
30,606✔
1753
    TAOS_CHECK_EXIT(extWinAggDo(pOperator, startPos, winRows, pInputBlock));
30,606!
1754
    
1755
    pExtW->lastSKey = pWin->tw.skey;
30,606✔
1756
    pExtW->lastWinId = extWinGetCurWinIdx(pOperator->pTaskInfo);
30,606✔
1757
    startPos += winRows;
30,602✔
1758
  }
1759

1760
_exit:
10,522✔
1761

1762
  if (code) {
10,522!
1763
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
1764
  }
1765

1766
  return code;
10,522✔
1767
}
1768

1769

1770
static int32_t extWinAggOutputRes(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
275✔
1771
  SExternalWindowOperator* pExtW = pOperator->info;
275✔
1772
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
275✔
1773
  SSDataBlock*    pBlock = pExtW->binfo.pRes;
275✔
1774
  int32_t         code = TSDB_CODE_SUCCESS;
275✔
1775
  int32_t         lino = 0;
275✔
1776
  SExprInfo*      pExprInfo = pOperator->exprSupp.pExprInfo;
275✔
1777
  int32_t         numOfExprs = pOperator->exprSupp.numOfExprs;
275✔
1778
  int32_t*        rowEntryOffset = pOperator->exprSupp.rowEntryInfoOffset;
275✔
1779
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
275✔
1780
  int32_t         numOfWin = pExtW->outWinIdx;
275✔
1781

1782
  pBlock->info.version = pTaskInfo->version;
275✔
1783
  blockDataCleanup(pBlock);
275✔
1784
  taosArrayClear(pExtW->pWinRowIdx);
275✔
1785

1786
  for (; pExtW->outputWinId < numOfWin; pExtW->outputWinId += 1) {
1,130✔
1787
    SResultRow* pRow = (SResultRow*)((char*)pExtW->pResultRow + pExtW->outputWinId * pExtW->aggSup.resultRowSize);
855✔
1788

1789
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
855✔
1790

1791
    // no results, continue to check the next one
1792
    if (pRow->numOfRows == 0) {
854!
1793
      continue;
×
1794
    }
1795

1796
    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
854✔
1797
      uint32_t newSize = pBlock->info.rows + pRow->numOfRows + numOfWin - pExtW->outputWinId;
204✔
1798
      TAOS_CHECK_EXIT(blockDataEnsureCapacity(pBlock, newSize));
204!
1799
      qDebug("datablock capacity not sufficient, expand to required:%d, current capacity:%d, %s", newSize,
204✔
1800
             pBlock->info.capacity, GET_TASKID(pTaskInfo));
1801
    }
1802

1803
    TAOS_CHECK_EXIT(copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo));
854!
1804

1805
    pBlock->info.rows += pRow->numOfRows;
855✔
1806

1807
    TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pExtW->pWinRowIdx, pBlock, pRow->winIdx, pRow->numOfRows));
855!
1808

1809
    if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
855!
1810
      break;
×
1811
    }
1812
  }
1813

1814
  qDebug("%s result generated, rows:%" PRId64 ", groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
275✔
1815
         pBlock->info.id.groupId);
1816
         
1817
  pBlock->info.dataLoad = 1;
275✔
1818

1819
  *ppRes = (pBlock->info.rows > 0) ? pBlock : NULL;
275✔
1820
  pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamBlkWinIdx = pExtW->pWinRowIdx;
275✔
1821

1822
_exit:
275✔
1823

1824
  if (code != TSDB_CODE_SUCCESS) {
275!
1825
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1826
  }
1827

1828
  return code;
275✔
1829
}
1830

1831
static int32_t extWinInitResultRow(SExecTaskInfo*        pTaskInfo, SExternalWindowOperator* pExtW, int32_t winNum) {
275✔
1832
  if (EEXT_MODE_SCALAR == pExtW->mode) {
275!
1833
    return TSDB_CODE_SUCCESS;
×
1834
  }
1835

1836
  if (winNum <= pExtW->resultRowCapacity) {
275✔
1837
    return TSDB_CODE_SUCCESS;
61✔
1838
  }
1839
  
1840
  taosMemoryFreeClear(pExtW->pResultRow);
214!
1841
  pExtW->resultRowCapacity = -1;
214✔
1842

1843
  int32_t code = 0, lino = 0;
214✔
1844
  
1845
  pExtW->pResultRow = taosMemoryCalloc(winNum, pExtW->aggSup.resultRowSize);
214!
1846
  QUERY_CHECK_NULL(pExtW->pResultRow, code, lino, _exit, terrno);
214!
1847

1848
  pExtW->resultRowCapacity = winNum;
214✔
1849

1850
_exit:
214✔
1851

1852
  if (code) {
214!
1853
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1854
  }
1855

1856
  return code;
214✔
1857
}
1858

1859
static void extWinFreeResultRow(SExternalWindowOperator* pExtW) {
275✔
1860
  if (pExtW->resultRowCapacity * pExtW->aggSup.resultRowSize >= 1048576) {
275!
1861
    taosMemoryFreeClear(pExtW->pResultRow);
×
1862
    pExtW->resultRowCapacity = -1;
×
1863
  }
1864
  if (pExtW->binfo.pRes && pExtW->binfo.pRes->info.rows * pExtW->aggSup.resultRowSize >= 1048576) {
275!
1865
    blockDataFreeCols(pExtW->binfo.pRes);
×
1866
  }
1867
}
275✔
1868

1869
static int32_t extWinInitWindowList(SExternalWindowOperator* pExtW, SExecTaskInfo*        pTaskInfo) {
275✔
1870
  if (taosArrayGetSize(pExtW->pWins) > 0) {
275!
1871
    return TSDB_CODE_SUCCESS;
×
1872
  }
1873
  
1874
  int32_t code = 0, lino = 0;
275✔
1875
  SStreamRuntimeFuncInfo* pInfo = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
275✔
1876
  size_t size = taosArrayGetSize(pInfo->pStreamPesudoFuncVals);
275✔
1877
  SExtWinTimeWindow* pWin = taosArrayReserve(pExtW->pWins, size);
275✔
1878
  TSDB_CHECK_NULL(pWin, code, lino, _exit, terrno);
275!
1879

1880
  TAOS_CHECK_EXIT(extWinInitResultRow(pTaskInfo, pExtW, size));
275!
1881

1882
  if (pExtW->timeRangeExpr && pExtW->timeRangeExpr->needCalc) {
275!
1883
    TAOS_CHECK_EXIT(scalarCalculateExtWinsTimeRange(pExtW->timeRangeExpr, pInfo, pWin));
93!
1884
    if (qDebugFlag & DEBUG_DEBUG) {
93!
1885
      for (int32_t i = 0; i < size; ++i) {
227✔
1886
        qDebug("%s the %d/%d ext window calced initialized, TR[%" PRId64 ", %" PRId64 ")", 
134!
1887
            pTaskInfo->id.str, i, (int32_t)size, pWin[i].tw.skey, pWin[i].tw.ekey);
1888
      }
1889
    }
1890
  } else {
1891
    for (int32_t i = 0; i < size; ++i) {
910✔
1892
      SSTriggerCalcParam* pParam = taosArrayGet(pInfo->pStreamPesudoFuncVals, i);
727✔
1893

1894
      pWin[i].tw.skey = pParam->wstart;
727✔
1895
      pWin[i].tw.ekey = pParam->wend + ((pInfo->triggerType != STREAM_TRIGGER_SLIDING) ? 1 : 0);
727✔
1896
      pWin[i].winOutIdx = -1;
727✔
1897

1898
      qDebug("%s the %d/%d ext window initialized, TR[%" PRId64 ", %" PRId64 ")", 
727✔
1899
          pTaskInfo->id.str, i, (int32_t)size, pWin[i].tw.skey, pWin[i].tw.ekey);
1900
    }
1901
  }
1902
  
1903
  pExtW->outputWinId = pInfo->curIdx;
276✔
1904
  pExtW->lastWinId = -1;
276✔
1905
  pExtW->blkWinStartIdx = pInfo->curIdx;
276✔
1906

1907
_exit:
276✔
1908

1909
  if (code) {
276!
1910
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1911
  }
1912

1913
  return code;
275✔
1914
}
1915

1916
static bool extWinNonAggGotResBlock(SExternalWindowOperator* pExtW) {
×
1917
  if (pExtW->multiTableMode && !pExtW->inputHasOrder) {
×
1918
    return false;
×
1919
  }
1920
  int32_t remainWin = pExtW->outWinIdx - pExtW->outputWinId;
×
1921
  if (remainWin > 1 && (NULL == pExtW->timeRangeExpr || !pExtW->timeRangeExpr->needCalc)) {
×
1922
    return true;
×
1923
  }
1924
  
1925
  SList* pList = taosArrayGetP(pExtW->pOutputBlocks, pExtW->outputWinId);
×
1926
  if (!pList || listNEles(pList) <= 0) {
×
1927
    return false;
×
1928
  }
1929
  if (listNEles(pList) > 1) {
×
1930
    return true;
×
1931
  }
1932

1933
  SListNode* pNode = listHead(pList);
×
1934
  SArray* pIdx = *(SArray**)((SArray**)pNode->data + 1);
×
1935
  int32_t* winIdx = taosArrayGetLast(pIdx);
×
1936
  if (winIdx && *winIdx < pExtW->blkWinStartIdx) {
×
1937
    return true;
×
1938
  }
1939

1940
  return false;
×
1941
}
1942

1943
static int32_t extWinOpen(SOperatorInfo* pOperator) {
275✔
1944
  if (OPTR_IS_OPENED(pOperator)) {
275!
1945
    return TSDB_CODE_SUCCESS;
×
1946
  }
1947
  
1948
  int32_t                  code = 0;
275✔
1949
  int32_t                  lino = 0;
275✔
1950
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
275✔
1951
  SOperatorInfo*           pDownstream = pOperator->pDownstream[0];
275✔
1952
  SExternalWindowOperator* pExtW = pOperator->info;
275✔
1953
  SExprSupp*               pSup = &pOperator->exprSupp;
275✔
1954
  
1955
  TAOS_CHECK_EXIT(extWinInitWindowList(pExtW, pTaskInfo));
275!
1956

1957
  while (1) {
10,522✔
1958
    pExtW->blkWinIdx = -1;
10,797✔
1959
    pExtW->blkWinStartSet = false;
10,797✔
1960
    pExtW->blkRowStartIdx = 0;
10,797✔
1961
    
1962
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
10,797✔
1963
    if (pBlock == NULL) {
10,799✔
1964
      if (EEXT_MODE_AGG == pExtW->mode) {
275!
1965
        TAOS_CHECK_EXIT(extWinAggHandleEmptyWins(pOperator, pBlock, true, NULL));
275!
1966
      }
1967
      pExtW->blkWinStartIdx = pExtW->pWins->size;
275✔
1968
      break;
275✔
1969
    }
1970

1971
    printDataBlock(pBlock, __func__, pTaskInfo->id.str, pTaskInfo->id.queryId);
10,524✔
1972

1973
    qDebug("ext window mode:%d got %" PRId64 " rows from downstream", pExtW->mode, pBlock->info.rows);
10,524✔
1974
    
1975
    switch (pExtW->mode) {
10,524!
1976
      case EEXT_MODE_SCALAR:
×
1977
        TAOS_CHECK_EXIT(extWinProjectOpen(pOperator, pBlock));
×
1978
        if (extWinNonAggGotResBlock(pExtW)) {
×
1979
          return code;
×
1980
        }
1981
        break;
×
1982
      case EEXT_MODE_AGG:
10,524✔
1983
        TAOS_CHECK_EXIT(extWinAggOpen(pOperator, pBlock));
10,524!
1984
        break;
10,522✔
1985
      case EEXT_MODE_INDEFR_FUNC:
×
1986
        TAOS_CHECK_EXIT(extWinIndefRowsOpen(pOperator, pBlock));
×
1987
        if (extWinNonAggGotResBlock(pExtW)) {
×
1988
          return code;
×
1989
        }
1990
        break;
×
1991
      default:
×
1992
        break;
×
1993
    }
1994
  }
1995

1996
  OPTR_SET_OPENED(pOperator);
275✔
1997

1998
#if 0
1999
  if (pExtW->mode == EEXT_MODE_AGG) {
2000
    qDebug("ext window before dump final rows num:%d", tSimpleHashGetSize(pExtW->aggSup.pResultRowHashTable));
2001

2002
    code = initGroupedResultInfo(&pExtW->groupResInfo, pExtW->aggSup.pResultRowHashTable, pExtW->binfo.inputTsOrder);
2003
    QUERY_CHECK_CODE(code, lino, _exit);
2004

2005
    qDebug("ext window after dump final rows num:%d", tSimpleHashGetSize(pExtW->aggSup.pResultRowHashTable));
2006
  }
2007
#endif
2008

2009
_exit:
275✔
2010

2011
  if (code != 0) {
275!
2012
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
2013
    pTaskInfo->code = code;
×
2014
    T_LONG_JMP(pTaskInfo->env, code);
×
2015
  }
2016
  
2017
  return code;
275✔
2018
}
2019

2020
static int32_t extWinNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
542✔
2021
  int32_t                  code = 0;
542✔
2022
  int32_t                  lino = 0;
542✔
2023
  SExternalWindowOperator* pExtW = pOperator->info;
542✔
2024
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
542✔
2025

2026
  if (pOperator->status == OP_EXEC_DONE) {
542✔
2027
    *ppRes = NULL;
267✔
2028
    return code;
267✔
2029
  }
2030

2031
  extWinRecycleBlkNode(pExtW, &pExtW->pLastBlkNode);
275✔
2032

2033
  TAOS_CHECK_EXIT(pOperator->fpSet._openFn(pOperator));
275!
2034

2035
  if (pExtW->mode == EEXT_MODE_SCALAR || pExtW->mode == EEXT_MODE_INDEFR_FUNC) {
275!
2036
    TAOS_CHECK_EXIT(extWinNonAggOutputRes(pOperator, ppRes));
×
2037
    if (NULL == *ppRes) {
×
2038
      setOperatorCompleted(pOperator);
×
2039
      extWinFreeResultRow(pExtW);
×
2040
    }
2041
  } else {
2042
#if 0    
2043
    doBuildResultDatablock(pOperator, &pExtW->binfo, &pExtW->groupResInfo, pExtW->aggSup.pResultBuf);
2044
    bool hasRemain = hasRemainResults(&pExtW->groupResInfo);
2045
    if (!hasRemain) {
2046
      setOperatorCompleted(pOperator);
2047
      break;
2048
    }
2049
    if (pExtW->binfo.pRes->info.rows > 0) break;
2050
#else
2051
    TAOS_CHECK_EXIT(extWinAggOutputRes(pOperator, ppRes));
275!
2052
    setOperatorCompleted(pOperator);
275✔
2053
    extWinFreeResultRow(pExtW);
275✔
2054
#endif      
2055
  }
2056

2057
  if (*ppRes) {
275✔
2058
    pOperator->resultInfo.totalRows += (*ppRes)->info.rows;
270✔
2059
  }
2060
  
2061
_exit:
5✔
2062

2063
  if (code) {
275!
2064
    qError("%s %s failed at line %d since %s", GET_TASKID(pTaskInfo), __func__, lino, tstrerror(code));
×
2065
    pTaskInfo->code = code;
×
2066
    T_LONG_JMP(pTaskInfo->env, code);
×
2067
  }
2068

2069
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM && (*ppRes)) {
275!
2070
    printDataBlock(*ppRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo), pTaskInfo->id.queryId);
270✔
2071
  }
2072
  
2073
  return code;
275✔
2074
}
2075

2076

2077
int32_t createExternalWindowOperator(SOperatorInfo* pDownstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo,
199✔
2078
                                     SOperatorInfo** pOptrOut) {
2079
  SExternalWindowPhysiNode* pPhynode = (SExternalWindowPhysiNode*)pNode;
199✔
2080
  QRY_PARAM_CHECK(pOptrOut);
199!
2081
  int32_t                  code = 0;
199✔
2082
  int32_t                  lino = 0;
199✔
2083
  SExternalWindowOperator* pExtW = taosMemoryCalloc(1, sizeof(SExternalWindowOperator));
199!
2084
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
199!
2085
  pOperator->pPhyNode = pNode;
199✔
2086
  if (!pExtW || !pOperator) {
199!
2087
    code = terrno;
×
2088
    lino = __LINE__;
×
2089
    goto _error;
×
2090
  }
2091
  
2092
  setOperatorInfo(pOperator, "ExternalWindowOperator", QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW, true, OP_NOT_OPENED,
199✔
2093
                  pExtW, pTaskInfo);
2094
                  
2095
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhynode->window.node.pOutputDataBlockDesc);
199✔
2096
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
199!
2097
  initBasicInfo(&pExtW->binfo, pResBlock);
199✔
2098

2099
  pExtW->primaryTsIndex = ((SColumnNode*)pPhynode->window.pTspk)->slotId;
199✔
2100
  pExtW->mode = pPhynode->window.pProjs ? EEXT_MODE_SCALAR : (pPhynode->window.indefRowsFunc ? EEXT_MODE_INDEFR_FUNC : EEXT_MODE_AGG);
199!
2101
  pExtW->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC;
199✔
2102
  pExtW->binfo.outputTsOrder = pExtW->binfo.inputTsOrder;
199✔
2103

2104
  if (pTaskInfo->pStreamRuntimeInfo != NULL){
199!
2105
    pTaskInfo->pStreamRuntimeInfo->funcInfo.withExternalWindow = true;
199✔
2106
  }
2107

2108
  // pExtW->limitInfo = (SLimitInfo){0};
2109
  // initLimitInfo(pPhynode->window.node.pLimit, pPhynode->window.node.pSlimit, &pExtW->limitInfo);
2110

2111
  if (pPhynode->window.pProjs) {
199!
2112
    int32_t    numOfScalarExpr = 0;
×
2113
    SExprInfo* pScalarExprInfo = NULL;
×
2114
    code = createExprInfo(pPhynode->window.pProjs, NULL, &pScalarExprInfo, &numOfScalarExpr);
×
2115
    QUERY_CHECK_CODE(code, lino, _error);
×
2116

2117
    code = initExprSupp(&pExtW->scalarSupp, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
×
2118
    QUERY_CHECK_CODE(code, lino, _error);
×
2119

2120
  //if (pExtW->multiTableMode) {
2121
    pExtW->pOutputBlocks = taosArrayInit_s(POINTER_BYTES, STREAM_CALC_REQ_MAX_WIN_NUM);
×
2122
    if (!pExtW->pOutputBlocks) QUERY_CHECK_CODE(terrno, lino, _error);
×
2123
  //}
2124
    pExtW->pFreeBlocks = tdListNew(POINTER_BYTES * 2);
×
2125
    QUERY_CHECK_NULL(pExtW->pFreeBlocks, code, lino, _error, terrno);
×
2126
  } else if (pExtW->mode == EEXT_MODE_AGG) {
199!
2127
    if (pPhynode->window.pExprs != NULL) {
199!
2128
      int32_t    num = 0;
×
2129
      SExprInfo* pSExpr = NULL;
×
2130
      code = createExprInfo(pPhynode->window.pExprs, NULL, &pSExpr, &num);
×
2131
      QUERY_CHECK_CODE(code, lino, _error);
×
2132
    
2133
      code = initExprSupp(&pExtW->scalarSupp, pSExpr, num, &pTaskInfo->storageAPI.functionStore);
×
2134
      if (code != TSDB_CODE_SUCCESS) {
×
2135
        goto _error;
×
2136
      }
2137
      checkIndefRowsFuncs(&pExtW->scalarSupp);
×
2138
    }
2139
    
2140
    size_t keyBufSize = sizeof(int64_t) * 2 + POINTER_BYTES;
199✔
2141
    initResultSizeInfo(&pOperator->resultInfo, 4096);
199✔
2142
    //code = blockDataEnsureCapacity(pExtW->binfo.pRes, pOperator->resultInfo.capacity);
2143
    //QUERY_CHECK_CODE(code, lino, _error);
2144

2145
    pExtW->pWinRowIdx = taosArrayInit(4096, sizeof(int64_t));
199✔
2146
    TSDB_CHECK_NULL(pExtW->pWinRowIdx, code, lino, _error, terrno);
199!
2147
    
2148
    int32_t num = 0;
199✔
2149
    SExprInfo* pExprInfo = NULL;
199✔
2150
    code = createExprInfo(pPhynode->window.pFuncs, NULL, &pExprInfo, &num);
199✔
2151
    QUERY_CHECK_CODE(code, lino, _error);
199!
2152
    code = initAggSup(&pOperator->exprSupp, &pExtW->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, 0, 0);
199✔
2153
    QUERY_CHECK_CODE(code, lino, _error);
199!
2154

2155
    nodesWalkExprs(pPhynode->window.pFuncs, extWinHasCountLikeFunc, &pExtW->hasCountFunc);
199✔
2156
    if (pExtW->hasCountFunc) {
199✔
2157
      code = extWinCreateEmptyInputBlock(pOperator, &pExtW->pEmptyInputBlock);
163✔
2158
      QUERY_CHECK_CODE(code, lino, _error);
163!
2159
      qDebug("%s ext window has CountLikeFunc", pOperator->pTaskInfo->id.str);
163✔
2160
    } else {
2161
      qDebug("%s ext window doesn't have CountLikeFunc", pOperator->pTaskInfo->id.str);
36!
2162
    }
2163

2164
    code = initExecTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pTaskInfo->window);
199✔
2165
    QUERY_CHECK_CODE(code, lino, _error);
199!
2166

2167
    pExtW->lastSKey = INT64_MIN;
199✔
2168
  } else {
2169
    size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
×
2170
    
2171
    if (pPhynode->window.pExprs != NULL) {
×
2172
      int32_t    num = 0;
×
2173
      SExprInfo* pSExpr = NULL;
×
2174
      code = createExprInfo(pPhynode->window.pExprs, NULL, &pSExpr, &num);
×
2175
      QUERY_CHECK_CODE(code, lino, _error);
×
2176
    
2177
      code = initExprSupp(&pExtW->scalarSupp, pSExpr, num, &pTaskInfo->storageAPI.functionStore);
×
2178
      if (code != TSDB_CODE_SUCCESS) {
×
2179
        goto _error;
×
2180
      }
2181
    }
2182
    
2183
    int32_t    numOfExpr = 0;
×
2184
    SExprInfo* pExprInfo = NULL;
×
2185
    code = createExprInfo(pPhynode->window.pFuncs, NULL, &pExprInfo, &numOfExpr);
×
2186
    TSDB_CHECK_CODE(code, lino, _error);
×
2187
    
2188
    code = initAggSup(&pOperator->exprSupp, &pExtW->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str,
×
2189
                              pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
×
2190
    TSDB_CHECK_CODE(code, lino, _error);
×
2191
    pOperator->exprSupp.hasWindowOrGroup = false;
×
2192
    
2193
    //code = setFunctionResultOutput(pOperator, &pExtW->binfo, &pExtW->aggSup, MAIN_SCAN, numOfExpr);
2194
    //TSDB_CHECK_CODE(code, lino, _error);
2195
    
2196
    code = filterInitFromNode((SNode*)pNode->pConditions, &pOperator->exprSupp.pFilterInfo, 0,
×
2197
                              pTaskInfo->pStreamRuntimeInfo);
×
2198
    TSDB_CHECK_CODE(code, lino, _error);
×
2199
    
2200
    pExtW->binfo.inputTsOrder = pNode->inputTsOrder;
×
2201
    pExtW->binfo.outputTsOrder = pNode->outputTsOrder;
×
2202
    code = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfExpr, &pExtW->pPseudoColInfo);
×
2203
    TSDB_CHECK_CODE(code, lino, _error);
×
2204

2205
  //if (pExtW->multiTableMode) {
2206
    pExtW->pOutputBlocks = taosArrayInit_s(POINTER_BYTES, STREAM_CALC_REQ_MAX_WIN_NUM);
×
2207
    if (!pExtW->pOutputBlocks) QUERY_CHECK_CODE(terrno, lino, _error);
×
2208
  //}
2209
    pExtW->pFreeBlocks = tdListNew(POINTER_BYTES * 2);
×
2210
    QUERY_CHECK_NULL(pExtW->pFreeBlocks, code, lino, _error, terrno);  
×
2211
  }
2212

2213
  pExtW->pWins = taosArrayInit(4096, sizeof(SExtWinTimeWindow));
199✔
2214
  if (!pExtW->pWins) QUERY_CHECK_CODE(terrno, lino, _error);
199!
2215
  
2216
  //initResultRowInfo(&pExtW->binfo.resultRowInfo);
2217

2218
  pExtW->timeRangeExpr = (STimeRangeNode*)pPhynode->pTimeRange;
199✔
2219
  if (pExtW->timeRangeExpr) {
199!
2220
    QUERY_CHECK_NULL(pExtW->timeRangeExpr->pStart, code, lino, _error, TSDB_CODE_STREAM_INTERNAL_ERROR);
199!
2221
    QUERY_CHECK_NULL(pExtW->timeRangeExpr->pEnd, code, lino, _error, TSDB_CODE_STREAM_INTERNAL_ERROR);
199!
2222
  }
2223

2224
  if (pPhynode->isSingleTable) {
199✔
2225
    pExtW->getWinFp = (pExtW->timeRangeExpr && (pExtW->timeRangeExpr->needCalc || (pTaskInfo->pStreamRuntimeInfo->funcInfo.addOptions & CALC_SLIDING_OVERLAP))) ? extWinGetOvlpWin : extWinGetNoOvlpWin;
75!
2226
    pExtW->multiTableMode = false;
75✔
2227
  } else {
2228
    pExtW->getWinFp = (pExtW->timeRangeExpr && (pExtW->timeRangeExpr->needCalc || (pTaskInfo->pStreamRuntimeInfo->funcInfo.addOptions & CALC_SLIDING_OVERLAP))) ? extWinGetMultiTbOvlpWin : extWinGetMultiTbNoOvlpWin;
124!
2229
    pExtW->multiTableMode = true;
124✔
2230
  }
2231
  pExtW->inputHasOrder = pPhynode->inputHasOrder;
199✔
2232

2233
  pOperator->fpSet = createOperatorFpSet(extWinOpen, extWinNext, NULL, destroyExternalWindowOperatorInfo,
199✔
2234
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2235
  setOperatorResetStateFn(pOperator, resetExternalWindowOperator);
199✔
2236
  code = appendDownstream(pOperator, &pDownstream, 1);
199✔
2237
  if (code != 0) {
199!
2238
    goto _error;
×
2239
  }
2240

2241
  *pOptrOut = pOperator;
199✔
2242
  return code;
199✔
2243

2244
_error:
×
2245

2246
  if (pExtW != NULL) {
×
2247
    destroyExternalWindowOperatorInfo(pExtW);
×
2248
  }
2249

2250
  destroyOperatorAndDownstreams(pOperator, &pDownstream, 1);
×
2251
  pTaskInfo->code = code;
×
2252
  qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code));
×
2253
  return code;
×
2254
}
2255

2256

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc