• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5011

03 Apr 2026 03:59PM UTC coverage: 72.3% (+0.008%) from 72.292%
#5011

push

travis-ci

web-flow
merge: from main to 3.0 branch #35067

4053 of 5985 new or added lines in 68 files covered. (67.72%)

732 existing lines in 143 files now uncovered.

257430 of 356056 relevant lines covered (72.3%)

131834103.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.85
/source/libs/executor/src/externalwindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "operator.h"
18
#include "querytask.h"
19
#include "tdatablock.h"
20
#include "stream.h"
21
#include "filter.h"
22
#include "cmdnodes.h"
23

24
#define EXT_WIN_RES_ROWS_ALLOC_SIZE 10
25
#define EXT_WIN_CALC_GROUP_SIZE 1000
26
#define EXT_WIN_TYPE_STR(_isMAExtW) ((_isMAExtW) ? "mergeAligned" : "")
27

28
typedef struct SBlockList {
29
  const SSDataBlock* pSrcBlock;
30
  SList*             pBlocks;
31
  int32_t            blockRowNumThreshold;
32
} SBlockList;
33

34

35
typedef int32_t (*extWinGetWinFp)(SOperatorInfo*, int64_t*, int32_t*, SDataBlockInfo*, SExtWinTimeWindow**, int32_t*);
36

37
typedef struct SExtWindowStat {
38
  int64_t resBlockCreated;
39
  int64_t resBlockDestroyed;
40
  int64_t resBlockRecycled;
41
  int64_t resBlockReused;
42
  int64_t resBlockAppend;
43
} SExtWindowStat;
44

45

46
typedef struct SExtWinCalcGrpCtx {
47
  uint64_t           groupId;
48
  SArray*            pWins;           // SArray<SExtWinTimeWindow>
49
  int32_t            curIdx; // for pesudo func calculation
50

51
  bool               blkWinStartSet;
52
  int32_t            blkWinStartIdx;
53
  int32_t            blkWinIdx;
54
  int32_t            blkRowStartIdx;
55

56
  SArray*            outWinBufIdx;
57
  int32_t            outWinTotalNum;      // agg: total output win num
58
  int32_t            outWinNum;           // already output win num
59
  int32_t            outWinIdx;           // current output win idx
60
  int32_t            outWinLastIdx;
61
  
62
  int32_t            lastWinIdx;
63
  int64_t            lastSKey;
64
  int64_t            lastEKey;
65
  int32_t            lastWinId;  
66
} SExtWinCalcGrpCtx;
67

68
typedef struct SExtWinTrigGrpCtx {
69
  SExtWinCalcGrpCtx* pCCtx;
70
  int32_t            lastCtxIter;
71

72
  SSHashObj*         pCGCtxs; // calc groups ctxs, groupId => SExtWinCalcGrpCtx
73
} SExtWinTrigGrpCtx;
74

75

76
typedef struct SExtWinResultRows {
77
  int32_t             resRowsSize;
78
  int32_t             resRowsIdx;
79
  int32_t             resRowSize;
80
  int32_t             resRowIdx;
81
  int64_t             resRowAllcNum;
82
  SResultRow**        pResultRows;
83
} SExtWinResultRows;
84

85
typedef struct SExternalWindowOperator {
86
  SOptrBasicInfo     binfo;
87
  SExprSupp          scalarSupp;
88
  int32_t            primaryTsIndex;
89
  EExtWinMode        mode;
90
  bool               multiTableMode;
91
  bool               inputHasOrder;
92
  bool               needGroupSort;
93
  bool               extWinSplit;
94
  bool               calcWithPartition;
95
  SArray*            pPseudoColInfo;  
96
  STimeRangeNode*    timeRangeExpr;
97
  
98
  extWinGetWinFp     getWinFp;
99

100
  uint64_t           lastTGrpId;
101
  uint64_t           lastCGrpId;
102
  SExtWinTrigGrpCtx* pTGrpCtx;
103
  bool               ownTGrpCtx;
104
  SArray*            pGrpIds;       // single calc or trig group ids
105
  SArray*            pCTGrpIds;
106
  
107
  // for project&indefRows
108
  SList*             pFreeBlocks;    // SList<SSDatablock*+SAarray*>
109
  SArray*            pOutputBlocks;  // SArray<SList*>, for each window, we have a list of blocks
110
  SListNode*         pLastBlkNode; 
111
  SSDataBlock*       pTmpBlock;
112
  
113
  // for agg
114
  SAggSupporter      aggSup;
115
  STimeWindowAggSupp twAggSup;
116

117
  SExtWinResultRows  resultRows;
118

119
  // for output
120
  SSTriggerGroupCalcInfo* pLastOutput;
121
  int32_t                 lastOutputIter;
122
  int32_t                 lastGrpIdx;    // index in pGrpIds or pCTGrpIds
123

124
  int32_t            outWinIdx;
125
  int32_t            resWinIdx;        // for result win allocation
126
  SSDataBlock*       pEmptyInputBlock;
127
  bool               hasCountFunc;
128
  SExtWindowStat     stat;
129
  SArray*            pWinRowIdx;
130
  bool               isMergeAlignedExtW;
131

132
  // for vtable window query
133
  bool               isDynWindow;
134
  int32_t            orgTableVgId;
135
  tb_uid_t           orgTableUid;
136
  STimeWindow        orgTableTimeRange;
137

138
  SExecTaskInfo*     pTaskInfo;  // for pRunnerGrpCtx cleanup in destroy
139
} SExternalWindowOperator;
140

141
static char* extWinModeStr(EExtWinMode mode) {
4,780,642✔
142
  switch (mode) {
4,780,642✔
143
    case EEXT_MODE_SCALAR:
87,264✔
144
      return "scalar";
87,264✔
145
    case EEXT_MODE_AGG:
4,693,378✔
146
      return "agg";
4,693,378✔
NEW
147
    case EEXT_MODE_INDEFR_FUNC:
×
NEW
148
      return "indefRows";
×
NEW
149
    default:
×
NEW
150
      break;
×
151
  }
152

NEW
153
  return "unknown";
×
154
}
155

156

157
static int extWinGrpIdCompare(const void* p1, const void* p2) {
808✔
158
  uint64_t* gId1 = (uint64_t*)p1;
808✔
159
  uint64_t* gId2 = (uint64_t*)p2;
808✔
160

161
  if (*gId1 == *gId2) {
808✔
NEW
162
    return 0;
×
163
  }
164
  
165
  return (*gId1) < (*gId2) ? -1 : 1;
808✔
166
}
167

168
static void extWinDestroyCGrpCtx(void* param) {
1,378,586✔
169
  if (NULL == param) {
1,378,586✔
NEW
170
    return;
×
171
  }
172

173
  SExtWinCalcGrpCtx* pCtx = (SExtWinCalcGrpCtx*)param;
1,378,586✔
174

175
  taosArrayDestroy(pCtx->pWins);
1,378,586✔
176
}
177

178
static void extWinDestroyTGrpCtx(void* param) {
1,246,882✔
179
  if (NULL == param) {
1,246,882✔
NEW
180
    return;
×
181
  }
182

183
  SExtWinTrigGrpCtx* pCtx = (SExtWinTrigGrpCtx*)param;
1,246,882✔
184

185
  if (pCtx->pCGCtxs) {
1,246,882✔
186
    tSimpleHashSetFreeFp(pCtx->pCGCtxs, extWinDestroyCGrpCtx);
91,304✔
187
    tSimpleHashCleanup(pCtx->pCGCtxs);
91,304✔
188
  } else if (pCtx->pCCtx) {
1,155,578✔
189
    extWinDestroyCGrpCtx(pCtx->pCCtx);
1,155,578✔
190
    taosMemoryFree(pCtx->pCCtx);
1,155,578✔
191
  }
192
}
193

194
static int32_t extWinBlockListAddBlock(SExternalWindowOperator* pExtW, SList* pList, int32_t rows, SSDataBlock** ppBlock, SArray** ppIdx) {
4,090,904✔
195
  SSDataBlock* pRes = NULL;
4,090,904✔
196
  int32_t code = 0, lino = 0;
4,090,904✔
197

198
  if (listNEles(pExtW->pFreeBlocks) > 0) {
4,090,904✔
199
    SListNode* pNode = tdListPopHead(pExtW->pFreeBlocks);
×
200
    *ppBlock = *(SSDataBlock**)pNode->data;
×
201
    *ppIdx = *(SArray**)((SArray**)pNode->data + 1);
×
202
    tdListAppendNode(pList, pNode);
×
203
    pExtW->stat.resBlockReused++;
×
204
  } else {
205
    TAOS_CHECK_EXIT(createOneDataBlock(pExtW->binfo.pRes, false, &pRes));
4,090,904✔
206
    TAOS_CHECK_EXIT(blockDataEnsureCapacity(pRes, TMAX(rows, 4096)));
4,090,904✔
207
    SArray* pIdx = taosArrayInit(10, sizeof(int64_t));
4,090,904✔
208
    TSDB_CHECK_NULL(pIdx, code, lino, _exit, terrno);
4,090,904✔
209
    void* res[2] = {pRes, pIdx};
4,090,904✔
210
    TAOS_CHECK_EXIT(tdListAppend(pList, res));
4,090,904✔
211

212
    *ppBlock = pRes;
4,089,288✔
213
    *ppIdx = pIdx;
4,089,288✔
214
    pExtW->stat.resBlockCreated++;
4,089,288✔
215
  }
216

217
  (*ppBlock)->info.id.groupId = pExtW->lastCGrpId;
4,090,904✔
218
  // ensure projection path blocks also carry the matching T-group id
219
  (*ppBlock)->info.id.baseGId = pExtW->lastTGrpId;
4,090,904✔
220

221
  
222
_exit:
4,090,904✔
223

224
  if (code) {
4,090,904✔
225
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
226
    blockDataDestroy(pRes);
×
227
  }
228
  
229
  return code;
4,090,904✔
230
}
231

232
static void extWinPostUpdateStreamRt(SStreamRuntimeFuncInfo* pStream, SOperatorInfo* pOperator, SExternalWindowOperator* pExtW) {
4,190,288✔
233
  if (pStream->curGrpCalc) {
4,190,288✔
234
    pStream->createTable = &pStream->curGrpCalc->createTable;
4,077,572✔
235
    pStream->pStreamPesudoFuncVals = pStream->curGrpCalc->pParams;
4,077,572✔
236
    pStream->pStreamPartColVals = pStream->curGrpCalc->pGroupColVals;
4,077,572✔
237
  }
238
  
239
  pStream->groupId = pExtW->lastTGrpId;
4,190,288✔
240

241
  qDebug("%s streamRt updated, groupId %" PRIu64, GET_TASKID(pOperator->pTaskInfo), pStream->groupId);
4,190,288✔
242
}
4,190,692✔
243

244
static void extWinAssignBlockGrpId(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW, SBlockID* pId) {
7,642,511✔
245
  uint64_t currentCGrpId = (pExtW->pTGrpCtx != NULL && pExtW->pTGrpCtx->pCCtx != NULL)
15,116,150✔
246
                               ? pExtW->pTGrpCtx->pCCtx->groupId
7,418,291✔
247
                               : pExtW->lastCGrpId;
15,115,746✔
248

249
  if (pExtW->calcWithPartition && !pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo.isMultiGroupCalc) {
7,642,511✔
250
    // For partitioned non-multi-group output, the effective merge group is the
251
    // current calc-group id. If we keep using lastTGrpId here, all output
252
    // blocks collapse to group 0 and the downstream merge path may concatenate
253
    // different partition groups without re-ordering by _group_id.
254
    pId->groupId = currentCGrpId;
4,223,820✔
255
    pId->baseGId = 0;
4,223,820✔
256
  } else if (pExtW->extWinSplit) {
3,418,691✔
257
    pId->groupId = currentCGrpId;
208,464✔
258
    pId->baseGId = pExtW->lastTGrpId;
208,464✔
259
  } else {
260
    pId->groupId = pExtW->lastTGrpId;
3,210,227✔
261
    pId->baseGId = 0;
3,210,227✔
262
  }
263

264
  qDebug("%s extWin res block assigned groupId %" PRIu64 " baseGid %" PRIu64, GET_TASKID(pOperator->pTaskInfo), pId->groupId, pId->baseGId);
7,642,511✔
265
}
7,642,511✔
266

267
static FORCE_INLINE void extWinResetBlockCalcState(SExtWinCalcGrpCtx* pCCtx) {
268
  if (pCCtx == NULL) {
4,399,670✔
NEW
269
    return;
×
270
  }
271

272
  pCCtx->blkWinIdx = -1;
4,399,670✔
273
  pCCtx->blkWinStartSet = false;
4,399,670✔
274
  pCCtx->blkRowStartIdx = 0;
4,399,670✔
275
}
276

277
static int32_t extWinGetLastBlockFromList(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW, SList* pList, int32_t rows, SSDataBlock** ppBlock, SArray** ppIdx) {
10,180,396✔
278
  int32_t    code = 0, lino = 0;
10,180,396✔
279
  SSDataBlock* pRes = NULL;
10,180,396✔
280

281
  SListNode* pNode = TD_DLIST_TAIL(pList);
10,180,396✔
282
  if (NULL == pNode) {
10,180,396✔
283
    TAOS_CHECK_EXIT(extWinBlockListAddBlock(pExtW, pList, rows, ppBlock, ppIdx));
4,089,692✔
284
    return code;
4,089,692✔
285
  }
286

287
  pRes = *(SSDataBlock**)pNode->data;
6,090,704✔
288
  if ((pRes->info.rows + rows) > pRes->info.capacity) {
6,090,704✔
289
    TAOS_CHECK_EXIT(extWinBlockListAddBlock(pExtW, pList, rows, ppBlock, ppIdx));
1,212✔
290
    return code;
1,212✔
291
  }
292

293
  extWinAssignBlockGrpId(pOperator, pExtW, &pRes->info.id);
6,089,492✔
294
  
295
  *ppIdx = *(SArray**)((SSDataBlock**)pNode->data + 1);
6,089,492✔
296
  *ppBlock = pRes;
6,089,492✔
297
  pExtW->stat.resBlockAppend++;
6,089,492✔
298

299
_exit:
6,089,492✔
300

301
  if (code) {
6,089,492✔
302
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
303
  }
304
  
305
  return code;
6,089,492✔
306
}
307

308
static void extWinDestroyBlockList(void* p) {
4,089,692✔
309
  if (NULL == p) {
4,089,692✔
310
    return;
×
311
  }
312

313
  SListNode* pTmp = NULL;
4,089,692✔
314
  SList** ppList = (SList**)p;
4,089,692✔
315
  if ((*ppList) && TD_DLIST_NELES(*ppList) > 0) {
4,089,692✔
316
    SListNode* pNode = TD_DLIST_HEAD(*ppList);
808✔
317
    while (pNode) {
1,616✔
318
      SSDataBlock* pBlock = *(SSDataBlock**)pNode->data;
808✔
319
      blockDataDestroy(pBlock);
808✔
320
      SArray* pIdx = *(SArray**)((SArray**)pNode->data + 1);
808✔
321
      taosArrayDestroy(pIdx);
808✔
322
      pTmp = pNode;
808✔
323
      pNode = pNode->dl_next_;
808✔
324
      taosMemoryFree(pTmp);
808✔
325
    }
326
  }
327
  taosMemoryFree(*ppList);
4,089,692✔
328
}
329

330

331
static void extWinRecycleBlkNode(SExternalWindowOperator* pExtW, SListNode** ppNode) {
5,694,047✔
332
  if (NULL == ppNode || NULL == *ppNode) {
5,694,047✔
333
    return;
1,604,759✔
334
  }
335

336
  SSDataBlock* pBlock = *(SSDataBlock**)(*ppNode)->data;
4,089,288✔
337
  SArray* pIdx = *(SArray**)((SArray**)(*ppNode)->data + 1);
4,089,288✔
338
  
339
  if (listNEles(pExtW->pFreeBlocks) >= 10) {
4,089,288✔
340
    blockDataDestroy(pBlock);
4,031,920✔
341
    taosArrayDestroy(pIdx);
4,031,112✔
342
    taosMemoryFreeClear(*ppNode);
4,031,920✔
343
    pExtW->stat.resBlockDestroyed++;
4,031,920✔
344
    return;
4,031,920✔
345
  }
346
  
347
  blockDataCleanup(pBlock);
57,368✔
348
  taosArrayClear(pIdx);
57,368✔
349
  tdListPrependNode(pExtW->pFreeBlocks, *ppNode);
57,368✔
350
  *ppNode = NULL;
57,368✔
351
  pExtW->stat.resBlockRecycled++;
57,368✔
352
}
353

UNCOV
354
static void extWinRecycleBlockList(SExternalWindowOperator* pExtW, void* p) {
×
UNCOV
355
  if (NULL == p) {
×
356
    return;
×
357
  }
358

UNCOV
359
  SListNode* pTmp = NULL;
×
UNCOV
360
  SList** ppList = (SList**)p;
×
UNCOV
361
  if ((*ppList) && TD_DLIST_NELES(*ppList) > 0) {
×
362
    SListNode* pNode = TD_DLIST_HEAD(*ppList);
×
363
    while (pNode) {
×
364
      pTmp = pNode;
×
365
      pNode = pNode->dl_next_;
×
366
      extWinRecycleBlkNode(pExtW, &pTmp);
×
367
    }
368
  }
UNCOV
369
  taosMemoryFree(*ppList);
×
NEW
370
  *ppList = NULL;
×
371
}
372
static void extWinDestroyBlkNode(SExternalWindowOperator* pInfo, SListNode* pNode) {
1,496,598✔
373
  if (NULL == pNode) {
1,496,598✔
374
    return;
1,438,422✔
375
  }
376

377
  SSDataBlock* pBlock = *(SSDataBlock**)pNode->data;
58,176✔
378
  SArray* pIdx = *(SArray**)((SArray**)pNode->data + 1);
58,176✔
379
  
380
  blockDataDestroy(pBlock);
58,176✔
381
  taosArrayDestroy(pIdx);
58,176✔
382

383
  taosMemoryFree(pNode);
58,176✔
384

385
  pInfo->stat.resBlockDestroyed++;
58,176✔
386
}
387

388

389
static void destroyExternalWindowOperatorInfo(void* param) {
1,439,230✔
390
  if (NULL == param) {
1,439,230✔
391
    return;
×
392
  }
393
  SExternalWindowOperator* pInfo = (SExternalWindowOperator*)param;
1,439,230✔
394
  cleanupBasicInfo(&pInfo->binfo);
1,439,230✔
395

396
  taosArrayDestroyEx(pInfo->pOutputBlocks, extWinDestroyBlockList);
1,439,230✔
397
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
1,439,230✔
398
  taosArrayDestroy(pInfo->pWinRowIdx);
1,439,230✔
399
  
400
  taosArrayDestroy(pInfo->pPseudoColInfo);
1,439,230✔
401
  blockDataDestroy(pInfo->pTmpBlock);
1,439,230✔
402
  blockDataDestroy(pInfo->pEmptyInputBlock);
1,439,230✔
403

404
  extWinDestroyBlkNode(pInfo, pInfo->pLastBlkNode);
1,439,230✔
405
  if (pInfo->pFreeBlocks) {
1,439,230✔
406
    SListNode *node;
407
    while ((node = TD_DLIST_HEAD(pInfo->pFreeBlocks)) != NULL) {
107,868✔
408
      TD_DLIST_POP(pInfo->pFreeBlocks, node);
57,368✔
409
      extWinDestroyBlkNode(pInfo, node);
57,368✔
410
    }
411
    taosMemoryFree(pInfo->pFreeBlocks);
50,500✔
412
  }
413

414
  taosArrayDestroy(pInfo->pGrpIds);
1,439,230✔
415
  taosArrayDestroy(pInfo->pCTGrpIds);
1,439,230✔
416

417
  if (pInfo->ownTGrpCtx && pInfo->pTGrpCtx) {
1,439,230✔
418
    extWinDestroyTGrpCtx(pInfo->pTGrpCtx);
1,216,582✔
419
    taosMemoryFree(pInfo->pTGrpCtx);
1,216,582✔
420
    pInfo->pTGrpCtx = NULL;
1,216,582✔
421
  }
422

423
  // Free executor-specific pRunnerGrpCtx stored inside grouped calc info entries.
424
  // These contain SExtWinTrigGrpCtx which must be cleaned up here (before
425
  // doDestroyTask calls tDestroyStRtFuncInfo) since extWinDestroyTGrpCtx is
426
  // only accessible from this translation unit.
427
  if (pInfo->pTaskInfo != NULL && pInfo->pTaskInfo->pStreamRuntimeInfo != NULL) {
1,439,230✔
428
    SStreamRuntimeFuncInfo* pRt = &pInfo->pTaskInfo->pStreamRuntimeInfo->funcInfo;
399,556✔
429
    if (pRt->pGroupCalcInfos != NULL) {
399,556✔
430
      int32_t iter = 0;
70,296✔
431
      void*   pIter = NULL;
70,296✔
432
      while ((pIter = tSimpleHashIterate(pRt->pGroupCalcInfos, pIter, &iter)) != NULL) {
209,676✔
433
        SSTriggerGroupCalcInfo* pGrpCalc = (SSTriggerGroupCalcInfo*)pIter;
139,380✔
434
        if (pGrpCalc->pRunnerGrpCtx != NULL) {
139,380✔
435
          extWinDestroyTGrpCtx(pGrpCalc->pRunnerGrpCtx);
30,300✔
436
          taosMemoryFree(pGrpCalc->pRunnerGrpCtx);
30,300✔
437
          pGrpCalc->pRunnerGrpCtx = NULL;
30,300✔
438
        }
439
      }
440
    }
441
  }
442

443
  cleanupAggSup(&pInfo->aggSup);
1,439,230✔
444
  cleanupExprSupp(&pInfo->scalarSupp);
1,439,230✔
445
  for (int32_t i = 0; i < pInfo->resultRows.resRowsSize; ++i) {
15,395,210✔
446
    if (pInfo->resultRows.pResultRows && pInfo->resultRows.pResultRows[i]) {
13,955,980✔
447
      taosMemoryFreeClear(pInfo->resultRows.pResultRows[i]);
1,202,489✔
448
    }
449
  }
450
  taosMemoryFreeClear(pInfo->resultRows.pResultRows);
1,439,230✔
451
  
452
  pInfo->binfo.resultRowInfo.openWindow = tdListFree(pInfo->binfo.resultRowInfo.openWindow);
1,439,230✔
453

454
  qDebug("ext window stat at destroy, created:%" PRId64 ", destroyed:%" PRId64 ", recycled:%" PRId64 ", reused:%" PRId64 ", append:%" PRId64, 
1,439,230✔
455
      pInfo->stat.resBlockCreated, pInfo->stat.resBlockDestroyed, pInfo->stat.resBlockRecycled, 
456
      pInfo->stat.resBlockReused, pInfo->stat.resBlockAppend);
457

458
  taosMemoryFreeClear(pInfo);
1,439,230✔
459
}
460

461
static int32_t extWinOpen(SOperatorInfo* pOperator);
462
static int32_t extWinNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
463
static int32_t mergeAlignExtWinNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
464

465
static FORCE_INLINE void extWinIntersectTimeRange(STimeWindow* pDst, const STimeWindow* pRange) {
466
  if (pDst == NULL || pRange == NULL) {
687,608✔
NEW
467
    return;
×
468
  }
469

470
  pDst->skey = TMAX(pDst->skey, pRange->skey);
687,608✔
471
  pDst->ekey = TMIN(pDst->ekey, pRange->ekey);
687,608✔
472
}
473

474
static void extWinApplyTimeRangeToTableScan(SOperatorInfo* pScanOp, const STimeWindow* pTimeRange) {
343,804✔
475
  if (pScanOp == NULL || pScanOp->info == NULL || pTimeRange == NULL) {
343,804✔
NEW
476
    return;
×
477
  }
478

479
  STableScanInfo* pScanInfo = (STableScanInfo*)pScanOp->info;
343,804✔
480
  extWinIntersectTimeRange(&pScanInfo->base.cond.twindows, pTimeRange);
343,804✔
481
  extWinIntersectTimeRange(&pScanInfo->base.orgCond.twindows, pTimeRange);
343,804✔
482
}
483

484
static void extWinApplyTimeRangeToExchangeParam(SOperatorParam* pParam, const STimeWindow* pTimeRange) {
23,432✔
485
  if (pParam == NULL || pParam->value == NULL || pTimeRange == NULL) {
23,432✔
486
    return;
23,432✔
487
  }
488

NEW
489
  SExchangeOperatorParam* pExcParam = (SExchangeOperatorParam*)pParam->value;
×
NEW
490
  if (pExcParam->multiParams) {
×
NEW
491
    return;
×
492
  }
493

NEW
494
  if (pExcParam->basic.paramType == 0) {
×
NEW
495
    pExcParam->basic.paramType = DYN_TYPE_EXCHANGE_PARAM;
×
496
  }
NEW
497
  extWinIntersectTimeRange(&pExcParam->basic.window, pTimeRange);
×
498
}
499

500
static int32_t extWinApplyNonStreamTimeRangeToOperatorTree(SOperatorInfo* pOperator, const STimeWindow* pTimeRange,
393,496✔
501
                                                           int32_t* pScanAppliedNum, SExecTaskInfo* pRootTaskInfo) {
502
  if (pOperator == NULL || pTimeRange == NULL || pScanAppliedNum == NULL || pRootTaskInfo == NULL) {
393,496✔
NEW
503
    return TSDB_CODE_INVALID_PARA;
×
504
  }
505

506
  if (pOperator->pTaskInfo != pRootTaskInfo) {
393,496✔
NEW
507
    return TSDB_CODE_SUCCESS;
×
508
  }
509

510
  if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
393,496✔
511
    extWinApplyTimeRangeToTableScan(pOperator, pTimeRange);
343,804✔
512
    ++(*pScanAppliedNum);
343,804✔
513
  }
514

515
  if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) {
393,496✔
516
    extWinApplyTimeRangeToExchangeParam(pOperator->pOperatorGetParam, pTimeRange);
23,432✔
517
  }
518

519
  if (pOperator->numOfDownstream <= 0 || pOperator->pDownstream == NULL) {
393,496✔
520
    return TSDB_CODE_SUCCESS;
367,236✔
521
  }
522

523
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
65,852✔
524
    SOperatorInfo* pChild = pOperator->pDownstream[i];
39,592✔
525
    if (pChild == NULL) {
39,592✔
NEW
526
      continue;
×
527
    }
528

529
    if (pOperator->pDownstreamGetParams != NULL &&
39,592✔
NEW
530
        pChild->operatorType == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) {
×
NEW
531
      extWinApplyTimeRangeToExchangeParam(pOperator->pDownstreamGetParams[i], pTimeRange);
×
532
    }
533

534
    int32_t code = extWinApplyNonStreamTimeRangeToOperatorTree(pChild, pTimeRange, pScanAppliedNum, pRootTaskInfo);
39,592✔
535
    if (code != TSDB_CODE_SUCCESS) {
39,592✔
NEW
536
      return code;
×
537
    }
538
  }
539

540
  return TSDB_CODE_SUCCESS;
26,260✔
541
}
542

543
static int32_t extWinApplyNonStreamTimeRangeToDownstream(SOperatorInfo* pOperator, const STimeWindow* pTimeRange) {
1,395,598✔
544
  if (pOperator == NULL || pTimeRange == NULL) {
1,395,598✔
NEW
545
    return TSDB_CODE_INVALID_PARA;
×
546
  }
547

548
  if (pTimeRange->skey == INT64_MAX || pTimeRange->ekey == INT64_MIN || pTimeRange->skey > pTimeRange->ekey) {
1,395,598✔
549
    return TSDB_CODE_SUCCESS;
1,041,694✔
550
  }
551

552
  if (pOperator->numOfDownstream <= 0 || pOperator->pDownstream == NULL) {
353,904✔
NEW
553
    return TSDB_CODE_SUCCESS;
×
554
  }
555

556
  int32_t scanAppliedNum = 0;
353,904✔
557
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
707,808✔
558
    SOperatorInfo* pDownstream = pOperator->pDownstream[i];
353,904✔
559
    if (pDownstream == NULL) {
353,904✔
NEW
560
      continue;
×
561
    }
562

563
    int32_t code = extWinApplyNonStreamTimeRangeToOperatorTree(pDownstream, pTimeRange, &scanAppliedNum,
353,904✔
564
                                                                pOperator->pTaskInfo);
565
    if (code != TSDB_CODE_SUCCESS) {
353,904✔
NEW
566
      return code;
×
567
    }
568
  }
569

570
  qInfo("%s apply non-stream extWin timerange:[%" PRId64 ", %" PRId64 "] to downstream tree, tableScanCnt:%d",
353,904✔
571
        GET_TASKID(pOperator->pTaskInfo), pTimeRange->skey, pTimeRange->ekey, scanAppliedNum);
572

573
  return TSDB_CODE_SUCCESS;
353,904✔
574
}
575

576
typedef struct SMergeAlignedExternalWindowOperator {
577
  SExternalWindowOperator* pExtW;
578
  int64_t curTs;
579
  SResultRow*  pResultRow;
580
  SSDataBlock* pNewGroup;
581
  int32_t lastFinalizedWinIdx;  // tracks last emitted window index for filling empty-window gaps (vtable COLS merge)
582
} SMergeAlignedExternalWindowOperator;
583

584
static int32_t extWinInitNonStreamWindowDataFromBlock(SExternalWindowPhysiNode* pPhynode, SExecTaskInfo* pTaskInfo,
585
                                                       STimeWindow* pTimeRange);
586

587
static void destroyMergeAlignedExternalWindowOperator(void* pOperator) {
42,824✔
588
  SMergeAlignedExternalWindowOperator* pMAExtW = (SMergeAlignedExternalWindowOperator*)pOperator;
42,824✔
589
  destroyExternalWindowOperatorInfo(pMAExtW->pExtW);
42,824✔
590
  taosMemoryFreeClear(pMAExtW);
42,824✔
591
}
42,824✔
592

593
static int64_t* extWinExtractTsCol(SSDataBlock* pBlock, int32_t primaryTsIndex, SExecTaskInfo* pTaskInfo) {
4,451,786✔
594
  TSKEY* tsCols = NULL;
4,451,786✔
595

596
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
4,451,786✔
597
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, primaryTsIndex);
4,451,786✔
598
    if (!pColDataInfo) {
4,451,786✔
599
      pTaskInfo->code = terrno;
×
600
      T_LONG_JMP(pTaskInfo->env, terrno);
×
601
    }
602

603
    tsCols = (int64_t*)pColDataInfo->pData;
4,451,786✔
604
    if (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0) {
4,451,786✔
605
      int32_t code = blockDataUpdateTsWindow(pBlock, primaryTsIndex);
4,162,118✔
606
      if (code != TSDB_CODE_SUCCESS) {
4,162,118✔
607
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
608
        pTaskInfo->code = code;
×
609
        T_LONG_JMP(pTaskInfo->env, code);
×
610
      }
611
    }
612
  }
613

614
  return tsCols;
4,451,786✔
615
}
616

617
static FORCE_INLINE SExternalWindowOperator* extWinGetCoreInfo(SOperatorInfo* pOperator) {
618
  if (pOperator == NULL) {
76,304,288✔
NEW
619
    return NULL;
×
620
  }
621

622
  // getNextFn is an execution-record wrapper; _nextFn is the real operator callback.
623
  if (pOperator->fpSet._nextFn == mergeAlignExtWinNext) {
76,304,288✔
624
    SMergeAlignedExternalWindowOperator* pMAExtW = pOperator->info;
8,809,220✔
625
    return (pMAExtW != NULL) ? pMAExtW->pExtW : NULL;
8,809,220✔
626
  }
627

628
  return pOperator->info;
67,486,988✔
629
}
630

631
static FORCE_INLINE SExtWinCalcGrpCtx* extWinGetScopedCalcGrpCtx(SOperatorInfo* pOperator) {
632
  SExternalWindowOperator* pExtW = extWinGetCoreInfo(pOperator);
76,301,056✔
633
  if (pExtW == NULL || pExtW->pTGrpCtx == NULL || pExtW->pTGrpCtx->pCCtx == NULL) {
76,301,056✔
634
    return NULL;
148,268✔
635
  }
636

637
  // In partitioned external-window queries, different calc-groups may share
638
  // the same trigger-group. Their window cursors must stay isolated per
639
  // calc-group instead of falling back to the task-global cursor.
640
  if (pExtW->calcWithPartition) {
76,153,596✔
641
    return pExtW->pTGrpCtx->pCCtx;
41,805,112✔
642
  }
643

644
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
34,318,184✔
645
  if (pTaskInfo != NULL && pTaskInfo->pStreamRuntimeInfo != NULL) {
34,348,484✔
646
    SStreamRuntimeFuncInfo* pInfo = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
34,348,080✔
647
    if (pInfo->isMultiGroupCalc) {
34,348,080✔
NEW
648
      return pExtW->pTGrpCtx->pCCtx;
×
649
    }
650
  }
651

652
  return NULL;
34,348,484✔
653
}
654

655
static int32_t extWinGetCurWinIdx(SOperatorInfo* pOperator) {
2,147,483,647✔
656
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2,147,483,647✔
657
  if (!pTaskInfo->pStreamRuntimeInfo) {
2,147,483,647✔
658
    return 0;
2,147,483,647✔
659
  }
660
  SExtWinCalcGrpCtx* pCCtx = extWinGetScopedCalcGrpCtx(pOperator);
42,671,692✔
661
  if (pCCtx != NULL) {
42,671,692✔
662
    return pCCtx->curIdx;
21,060,520✔
663
  }
664

665
  return pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx;
21,611,172✔
666
}
667

668
static void extWinSetCurWinIdx(SOperatorInfo* pOperator, int32_t idx) {
2,147,483,647✔
669
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2,147,483,647✔
670
  if (pTaskInfo->pStreamRuntimeInfo) {
2,147,483,647✔
671
    SExtWinCalcGrpCtx* pCCtx = extWinGetScopedCalcGrpCtx(pOperator);
29,541,692✔
672
    if (pCCtx != NULL) {
29,541,692✔
673
      pCCtx->curIdx = idx;
16,693,280✔
674
    }
675
    pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx = idx;
29,541,692✔
676
  }
677
}
2,147,483,647✔
678

679

680
static void extWinIncCurWinOutIdx(SOperatorInfo* pOperator) {
4,088,884✔
681
  if (pOperator == NULL || pOperator->pTaskInfo->pStreamRuntimeInfo == NULL) {
4,088,884✔
NEW
682
    return;
×
683
  }
684

685
  SExtWinCalcGrpCtx* pCCtx = extWinGetScopedCalcGrpCtx(pOperator);
4,088,884✔
686
  if (pCCtx != NULL) {
4,088,884✔
687
    pCCtx->curIdx++;
4,039,596✔
688
  }
689
  pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx++;
4,101,004✔
690
}
691

692

693
static int32_t extWinAppendWinIdx(SExecTaskInfo*       pTaskInfo, SArray* pIdx, SSDataBlock* pBlock, int32_t currWinIdx, int32_t rows) {
2,147,483,647✔
694
  int32_t  code = 0, lino = 0;
2,147,483,647✔
695
  int64_t* lastRes = taosArrayGetLast(pIdx);
2,147,483,647✔
696
  int32_t* lastWinIdx = (int32_t*)lastRes;
2,147,483,647✔
697
  int32_t* lastRowIdx = lastWinIdx ? (lastWinIdx + 1) : NULL;
2,147,483,647✔
698
  int64_t  res = 0;
2,147,483,647✔
699
  int32_t* pWinIdx = (int32_t*)&res;
2,147,483,647✔
700
  int32_t* pRowIdx = pWinIdx + 1;
2,147,483,647✔
701

702
  if (lastWinIdx && *lastWinIdx == currWinIdx) {
2,147,483,647✔
703
    return code;
2,147,483,647✔
704
  }
705

706
  *pWinIdx = currWinIdx;
427,453,946✔
707
  *pRowIdx = pBlock->info.rows - rows;
427,450,714✔
708

709
  TSDB_CHECK_NULL(taosArrayPush(pIdx, &res), code, lino, _exit, terrno);
427,454,350✔
710

711
_exit:
427,454,350✔
712

713
  if (code) {
427,453,542✔
714
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
715
  }
716

717
  return code;
427,451,926✔
718
}
719

720
static int32_t extWinRebuildWinIdxByFilter(SExecTaskInfo* pTaskInfo, SArray* pIdx, int32_t rowsBeforeFilter,
2,424✔
721
                                           SColumnInfoData* pFilterRes) {
722
  int32_t code = TSDB_CODE_SUCCESS;
2,424✔
723
  int32_t lino = 0;
2,424✔
724

725
  if (pIdx == NULL || pFilterRes == NULL || rowsBeforeFilter <= 0 || taosArrayGetSize(pIdx) == 0) {
2,424✔
726
    return code;
×
727
  }
728

729
  int32_t idxSize = (int32_t)taosArrayGetSize(pIdx);
2,424✔
730
  SArray* pNewIdx = taosArrayInit(idxSize, sizeof(int64_t));
2,424✔
731
  TSDB_CHECK_NULL(pNewIdx, code, lino, _exit, terrno);
2,424✔
732

733
  int8_t* pIndicator = (int8_t*)pFilterRes->pData;
2,424✔
734
  int32_t newRowStart = 0;
2,424✔
735
  for (int32_t i = 0; i < idxSize; ++i) {
12,120✔
736
    int64_t cur = *(int64_t*)taosArrayGet(pIdx, i);
9,696✔
737
    int32_t* pCurWinIdx = (int32_t*)&cur;
9,696✔
738
    int32_t* pCurRowIdx = pCurWinIdx + 1;
9,696✔
739

740
    int32_t startRow = *pCurRowIdx;
9,696✔
741
    int32_t endRow = rowsBeforeFilter;
9,696✔
742
    if (i + 1 < idxSize) {
9,696✔
743
      int64_t next = *(int64_t*)taosArrayGet(pIdx, i + 1);
7,272✔
744
      int32_t* pNextWinIdx = (int32_t*)&next;
7,272✔
745
      int32_t* pNextRowIdx = pNextWinIdx + 1;
7,272✔
746
      endRow = *pNextRowIdx;
7,272✔
747
    }
748

749
    startRow = TMIN(TMAX(startRow, 0), rowsBeforeFilter);
9,696✔
750
    endRow = TMIN(TMAX(endRow, 0), rowsBeforeFilter);
9,696✔
751
    if (endRow <= startRow) {
9,696✔
752
      continue;
×
753
    }
754

755
    int32_t survivedRows = 0;
9,696✔
756
    for (int32_t r = startRow; r < endRow; ++r) {
19,392✔
757
      if (pIndicator[r]) {
9,696✔
758
        survivedRows++;
5,252✔
759
      }
760
    }
761

762
    if (survivedRows <= 0) {
9,696✔
763
      continue;
4,444✔
764
    }
765

766
    int64_t out = 0;
5,252✔
767
    int32_t* pOutWinIdx = (int32_t*)&out;
5,252✔
768
    int32_t* pOutRowIdx = pOutWinIdx + 1;
5,252✔
769
    *pOutWinIdx = *pCurWinIdx;
5,252✔
770
    *pOutRowIdx = newRowStart;
5,252✔
771
    TSDB_CHECK_NULL(taosArrayPush(pNewIdx, &out), code, lino, _exit, terrno);
5,252✔
772

773
    newRowStart += survivedRows;
5,252✔
774
  }
775

776
  taosArrayClear(pIdx);
2,424✔
777
  int32_t newSize = (int32_t)taosArrayGetSize(pNewIdx);
2,424✔
778
  if (newSize > 0) {
2,424✔
779
    void* dest = taosArrayReserve(pIdx, newSize);
2,424✔
780
    TSDB_CHECK_NULL(dest, code, lino, _exit, terrno);
2,424✔
781
    memcpy(dest, pNewIdx->pData, (size_t)newSize * sizeof(int64_t));
2,424✔
782
  }
783

UNCOV
784
_exit:
×
785
  taosArrayDestroy(pNewIdx);
2,424✔
786
  if (code != TSDB_CODE_SUCCESS) {
2,424✔
787
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
788
  }
789
  return code;
2,424✔
790
}
791

792

793
static int32_t extWinInitCGrpCtx(SExternalWindowOperator* pExtW, SExecTaskInfo* pTaskInfo, SExtWinCalcGrpCtx* pCtx) {
340,168✔
794
  int32_t code = 0, lino = 0;
340,168✔
795

796
  pCtx->groupId = 0;
340,168✔
797
  pCtx->curIdx = 0;
340,168✔
798
  pCtx->lastSKey = INT64_MIN;
340,168✔
799
  pCtx->lastEKey = INT64_MAX;
340,168✔
800
  pCtx->lastWinId = -1;
340,168✔
801
  pCtx->lastWinIdx = -1;
340,168✔
802
  pCtx->blkWinIdx = -1;
340,168✔
803
  pCtx->blkWinStartSet = false;
340,168✔
804
  pCtx->blkWinStartIdx = 0;
340,168✔
805
  pCtx->blkRowStartIdx = 0;
340,168✔
806
  pCtx->outWinIdx = 0;
340,168✔
807
  pCtx->outWinLastIdx = -1;
340,168✔
808
  pCtx->outWinTotalNum = 0;
340,168✔
809
  pCtx->outWinNum = 0;
340,168✔
810
  pCtx->pWins = NULL;
340,168✔
811
  pCtx->outWinBufIdx = NULL;
340,168✔
812
  
813
  SStreamRuntimeFuncInfo* pInfo = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
340,168✔
814

815
  if (pInfo->isMultiGroupCalc) {
340,168✔
816
    pInfo->pStreamPesudoFuncVals = pInfo->curGrpCalc->pParams;
30,300✔
817
    pInfo->pStreamPartColVals = pInfo->curGrpCalc->pGroupColVals;
30,300✔
818
  }
819

820
  size_t size = taosArrayGetSize(pInfo->pStreamPesudoFuncVals);
340,168✔
821
  pCtx->pWins = taosArrayInit_s(sizeof(SExtWinTimeWindow), size);
340,168✔
822
  TSDB_CHECK_NULL(pCtx->pWins, code, lino, _exit, terrno);
340,168✔
823
  //pCtx->outWinBufIdx = taosArrayInit_s(sizeof(int32_t), size);
824
  //TSDB_CHECK_NULL(pCtx->outWinBufIdx, code, lino, _exit, terrno);
825

826
  SExtWinTimeWindow* pWin = taosArrayGet(pCtx->pWins, 0);
340,168✔
827

828
  if (pExtW->isMergeAlignedExtW) {
340,168✔
829
    for (int32_t i = 0; i < size; ++i) {
2,253,108✔
830
      SSTriggerCalcParam* pParam = taosArrayGet(pInfo->pStreamPesudoFuncVals, i);
2,204,628✔
831

832
      pWin[i].tw.skey = pParam->wstart;
2,204,628✔
833
      pWin[i].tw.ekey = pParam->wstart + 1;
2,204,628✔
834
      pWin[i].resWinIdx = -1;
2,204,628✔
835
    }
836
  } else if (pExtW->timeRangeExpr && pExtW->timeRangeExpr->needCalc) {
291,688✔
NEW
837
    TAOS_CHECK_EXIT(scalarCalculateExtWinsTimeRange(pExtW->timeRangeExpr, pInfo, pWin));
×
NEW
838
    for (int32_t i = 0; i < size; ++i) {
×
NEW
839
      pWin[i].resWinIdx = -1;
×
840
    }
NEW
841
    if (qDebugFlag & DEBUG_DEBUG) {
×
NEW
842
      for (int32_t i = 0; i < size; ++i) {
×
NEW
843
        qDebug("%s the %d/%d ext window calced initialized, TR[%" PRId64 ", %" PRId64 ")", 
×
844
            pTaskInfo->id.str, i, (int32_t)size, pWin[i].tw.skey, pWin[i].tw.ekey);
845
      }
846
    }
847
  } else {
848
    for (int32_t i = 0; i < size; ++i) {
21,082,740✔
849
      SSTriggerCalcParam* pParam = taosArrayGet(pInfo->pStreamPesudoFuncVals, i);
20,791,052✔
850

851
      pWin[i].tw.skey = pParam->wstart;
20,791,052✔
852
      pWin[i].tw.ekey = pParam->wend + ((pInfo->triggerType != STREAM_TRIGGER_SLIDING) ? 1 : 0);
20,791,052✔
853
      pWin[i].resWinIdx = -1;
20,791,052✔
854

855
      qDebug("%s the %d/%d ext window initialized, TR[%" PRId64 ", %" PRId64 ")", 
20,791,052✔
856
          pTaskInfo->id.str, i, (int32_t)size, pWin[i].tw.skey, pWin[i].tw.ekey);
857
    }
858
  }
859

860
_exit:
291,688✔
861

862
  if (code) {
340,168✔
NEW
863
    qError("%s %s failed at line %d since %s", GET_TASKID(pTaskInfo), __func__, lino, tstrerror(code));
×
864
  }
865
  
866
  return code;
340,168✔
867
}
868

869
static void extWinInitDynParamCGrpCtx(SExtWinCalcGrpCtx* pCtx) {
2,127,269✔
870
  pCtx->groupId = 0;
2,127,269✔
871
  pCtx->curIdx = 0;
2,127,269✔
872
  pCtx->lastSKey = INT64_MIN;
2,127,269✔
873
  pCtx->lastWinId = -1;
2,127,269✔
874
  pCtx->lastWinIdx = -1;
2,127,269✔
875
  pCtx->blkWinIdx = -1;
2,127,269✔
876
  pCtx->blkWinStartSet = false;
2,127,269✔
877
  pCtx->blkWinStartIdx = 0;
2,127,269✔
878
  pCtx->blkRowStartIdx = 0;
2,127,269✔
879
  pCtx->outWinIdx = 0;
2,127,269✔
880
  pCtx->outWinLastIdx = -1;
2,127,269✔
881
  pCtx->outWinTotalNum = 0;
2,127,269✔
882
  pCtx->outWinNum = 0;
2,127,269✔
883
  pCtx->pWins = NULL;
2,127,269✔
884
  pCtx->outWinBufIdx = NULL;
2,127,269✔
885
}
2,127,269✔
886

887
static int32_t extWinSwitchInitTGrpCtx(SExternalWindowOperator* pExtW, SExecTaskInfo* pTaskInfo, SBlockID* pId) {
354,308✔
888
  int32_t code = 0, lino = 0;
354,308✔
889
  SStreamRuntimeFuncInfo* pInfo = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
354,308✔
890

891
  if ((!pInfo->isMultiGroupCalc && NULL != pExtW->pTGrpCtx) ||
354,308✔
892
      (pInfo->isMultiGroupCalc && NULL != pExtW->pTGrpCtx && pId->baseGId == pExtW->lastTGrpId)) {
210,080✔
893
    goto _exit;
145,844✔
894
  }
895
  
896
  if (pInfo->isMultiGroupCalc) {
208,464✔
897
    pInfo->curGrpCalc = tSimpleHashGet(pInfo->pGroupCalcInfos, &pId->baseGId, sizeof(pId->baseGId));
30,300✔
898
    if (NULL == pInfo->curGrpCalc) {
30,300✔
NEW
899
      qError("%s %s failed to get %s extWin tgrp %" PRIu64 " calc info", 
×
900
        GET_TASKID(pTaskInfo), __func__, EXT_WIN_TYPE_STR(pExtW->isMergeAlignedExtW), pId->baseGId);
NEW
901
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
902
    }
903

904
    if (NULL == pInfo->curGrpCalc->pRunnerGrpCtx) {
30,300✔
905
      pInfo->curGrpCalc->pRunnerGrpCtx = taosMemoryCalloc(1, sizeof(SExtWinTrigGrpCtx));
30,300✔
906
      TSDB_CHECK_NULL(pInfo->curGrpCalc->pRunnerGrpCtx, code, lino, _exit, terrno);
30,300✔
907
      
908
      if (!pExtW->calcWithPartition && pExtW->needGroupSort) {
30,300✔
NEW
909
        if (NULL == pExtW->pGrpIds) {
×
NEW
910
          pExtW->pGrpIds = taosArrayInit(1024, sizeof(uint64_t));
×
NEW
911
          TSDB_CHECK_NULL(pExtW->pGrpIds, code, lino, _exit, terrno);
×
912
        }
913
        
NEW
914
        TSDB_CHECK_NULL(taosArrayPush(pExtW->pGrpIds, &pId->baseGId), code, lino, _exit, terrno);
×
915
      }
916
    } else {
NEW
917
      pInfo->pStreamPesudoFuncVals = pInfo->curGrpCalc->pParams;
×
NEW
918
      pInfo->pStreamPartColVals = pInfo->curGrpCalc->pGroupColVals;
×
919
    }
920
    
921
    pInfo->groupId = pId->baseGId;
30,300✔
922
    pExtW->lastTGrpId = pId->baseGId;
30,300✔
923
    pExtW->pTGrpCtx = pInfo->curGrpCalc->pRunnerGrpCtx;
30,300✔
924
    pExtW->ownTGrpCtx = false;
30,300✔
925

926
    qDebug("%s %s extWin switch to tgrp %" PRIu64, 
30,300✔
927
      GET_TASKID(pTaskInfo), EXT_WIN_TYPE_STR(pExtW->isMergeAlignedExtW), pId->baseGId);
928

929
    goto _exit;
30,300✔
930
  }
931

932
  pExtW->pTGrpCtx = taosMemoryCalloc(1, sizeof(SExtWinTrigGrpCtx));
178,164✔
933
  TSDB_CHECK_NULL(pExtW->pTGrpCtx, code, lino, _exit, terrno);
178,164✔
934
  pExtW->ownTGrpCtx = true;
178,164✔
935

936
_exit:
354,308✔
937

938
  if (code) {
354,308✔
NEW
939
    qError("%s %s %s extWin failed at line %d since %s", 
×
940
      GET_TASKID(pTaskInfo), __func__, EXT_WIN_TYPE_STR(pExtW->isMergeAlignedExtW), lino, tstrerror(code));
941
  }
942
  
943
  return code;
354,308✔
944
}
945

946
static FORCE_INLINE bool extWinNeedResolvePartitionBlockId(SExternalWindowOperator* pExtW, SExecTaskInfo* pTaskInfo,
947
                                                           const SBlockID* pId) {
948
  if (pId == NULL || pTaskInfo == NULL || pTaskInfo->pStreamRuntimeInfo == NULL) {
412,888✔
NEW
949
    return false;
×
950
  }
951

952
  SStreamRuntimeFuncInfo* pInfo = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
412,888✔
953
  return pInfo->isMultiGroupCalc && pExtW->calcWithPartition && pInfo->pGroupCalcInfos != NULL;
412,888✔
954
}
955

956
static void extWinResolveBaseGroupIdForPartition(SExternalWindowOperator* pExtW, SExecTaskInfo* pTaskInfo,
35,956✔
957
                                                 SBlockID* pId) {
958
  SStreamRuntimeFuncInfo* pInfo = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
35,956✔
959

960
  if (pId->baseGId != 0) {
35,956✔
961
    return;
33,128✔
962
  }
963

964
  if (pId->groupId != 0 &&
5,656✔
965
      tSimpleHashGet(pInfo->pGroupCalcInfos, &pId->groupId, sizeof(pId->groupId)) != NULL) {
2,828✔
966
    pId->baseGId = pId->groupId;
2,828✔
967
    qDebug("%s %s normalize baseGId <- groupId %" PRIu64,
2,828✔
968
           GET_TASKID(pTaskInfo), EXT_WIN_TYPE_STR(pExtW->isMergeAlignedExtW), pId->groupId);
969
    return;
2,828✔
970
  }
971

972
  // Compatibility path: some non-stream partitioned external-window plans may
973
  // still send blocks with only `groupId` or with both ids unset on the outer
974
  // side, while runtime has exactly one trigger-group from the subquery. In
975
  // that case we can safely recover `baseGId` from the singleton trigger-group.
976
  //
977
  // Typical SQL shape:
978
  //   select tbname, cast(_wstart as bigint) as ws, cast(ts as bigint) as ts64
979
  //   from ext_cx_src partition by tbname
980
  //   external_window((select ts, endtime, mark from ext_cx_win) w);
981
  //
982
  // Here the outer query is partitioned (`partition by tbname`), but the
983
  // subquery has no partition/group clause, so upstream may not fully carry
984
  // `baseGId` even though there is only one trigger-group to bind against.
NEW
985
  int32_t size = tSimpleHashGetSize(pInfo->pGroupCalcInfos);
×
NEW
986
  if (size == 1) {
×
NEW
987
    int32_t iter = 0;
×
NEW
988
    SSTriggerGroupCalcInfo* pOne = tSimpleHashIterate(pInfo->pGroupCalcInfos, NULL, &iter);
×
NEW
989
    if (pOne != NULL) {
×
NEW
990
      pId->baseGId = *(uint64_t*)tSimpleHashGetKey(pOne, NULL);
×
NEW
991
      qDebug("%s %s normalize baseGId <- single gid %" PRIu64,
×
992
             GET_TASKID(pTaskInfo), EXT_WIN_TYPE_STR(pExtW->isMergeAlignedExtW), pId->baseGId);
993
    }
994
  }
995
}
996

997
static void extWinResolveCalcGroupIdForPartition(SExternalWindowOperator* pExtW, SExecTaskInfo* pTaskInfo,
35,956✔
998
                                                 SBlockID* pId) {
999
  if (pId->groupId == 0 && pId->baseGId != 0) {
35,956✔
NEW
1000
    pId->groupId = pId->baseGId;
×
NEW
1001
    qDebug("%s %s normalize groupId <- baseGId %" PRIu64,
×
1002
           GET_TASKID(pTaskInfo), EXT_WIN_TYPE_STR(pExtW->isMergeAlignedExtW), pId->groupId);
1003
  }
1004
}
35,956✔
1005

1006
static void extWinResolveBlockIdForPartition(SExternalWindowOperator* pExtW, SExecTaskInfo* pTaskInfo, SBlockID* pId) {
412,888✔
1007
  if (!extWinNeedResolvePartitionBlockId(pExtW, pTaskInfo, pId)) {
412,888✔
1008
    return;
376,932✔
1009
  }
1010

1011
  extWinResolveBaseGroupIdForPartition(pExtW, pTaskInfo, pId);
35,956✔
1012
  extWinResolveCalcGroupIdForPartition(pExtW, pTaskInfo, pId);
35,956✔
1013
}
1014

1015

1016
static int32_t extWinSwitchInitCGrpCtx(SExternalWindowOperator* pExtW, SExecTaskInfo* pTaskInfo, SBlockID* pId) {
354,308✔
1017
  int32_t code = 0, lino = 0;
354,308✔
1018
  SStreamRuntimeFuncInfo* pInfo = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
354,308✔
1019
  SExtWinTrigGrpCtx* pTCtx = pExtW->pTGrpCtx;
354,308✔
1020

1021
  if (pTCtx == NULL) {
354,308✔
NEW
1022
    qError("%s %s invalid tgrp ctx for %s extWin, baseGrp:%" PRIu64 " grp:%" PRIu64,
×
1023
           GET_TASKID(pTaskInfo), __func__, EXT_WIN_TYPE_STR(pExtW->isMergeAlignedExtW),
1024
           pId->baseGId, pId->groupId);
NEW
1025
    TAOS_CHECK_EXIT(TSDB_CODE_STREAM_INTERNAL_ERROR);
×
1026
  }
1027

1028
  // In partitioned multi-group mode, only blocks whose C-group (groupId)
1029
  // matches the current T-group (baseGId) should proceed. Treat mismatches
1030
  // like "no cgrp" here; extWinOpen has an early filter to skip such blocks.
1031
  if (0 == pId->groupId || (pInfo->isMultiGroupCalc && (pId->baseGId != pId->groupId))) {
354,308✔
1032
    if (NULL != pTCtx->pCGCtxs) {
127,260✔
NEW
1033
      qError("%s plan or ctx conflict, pCGCtxs:%p baseGrp:%" PRIu64 " grp:%" PRIu64
×
1034
             " lastCGrp:%" PRIu64 " isMulti:%d calcWithPart:%d",
1035
             GET_TASKID(pTaskInfo), pTCtx->pCGCtxs, pId->baseGId, pId->groupId,
1036
             pExtW->lastCGrpId, pInfo->isMultiGroupCalc, pExtW->calcWithPartition);
NEW
1037
      TAOS_CHECK_EXIT(TSDB_CODE_STREAM_INTERNAL_ERROR);
×
1038
    }
1039

1040
    if (NULL == pTCtx->pCCtx) {
127,260✔
1041
      pTCtx->pCCtx = taosMemoryCalloc(1, sizeof(*pTCtx->pCCtx));
117,160✔
1042
      TSDB_CHECK_NULL(pTCtx->pCCtx, code, lino, _exit, terrno);
117,160✔
1043
      TAOS_CHECK_EXIT(extWinInitCGrpCtx(pExtW, pTaskInfo, pTCtx->pCCtx));
117,160✔
1044
    }
1045

1046
    qDebug("%s ext win switch to no cgrp", EXT_WIN_TYPE_STR(pExtW->isMergeAlignedExtW));
127,260✔
1047
    
1048
    goto _exit;
127,260✔
1049
  }
1050

1051
  if (pId->groupId == pExtW->lastCGrpId) {
227,048✔
1052
    if (pTCtx->pCCtx != NULL) {
4,040✔
1053
      pTCtx->pCCtx->groupId = pId->groupId;
4,040✔
1054
    }
1055
    qDebug("%s ext win continue cgrp %" PRIu64, EXT_WIN_TYPE_STR(pExtW->isMergeAlignedExtW), pId->groupId);
4,040✔
1056
    goto _exit;
4,040✔
1057
  }
1058

1059
  pExtW->lastCGrpId = pId->groupId;
223,008✔
1060
  
1061
  if (NULL == pTCtx->pCGCtxs) {
223,008✔
1062
    if (NULL != pTCtx->pCCtx) {
91,304✔
NEW
1063
      TAOS_CHECK_EXIT(TSDB_CODE_STREAM_INTERNAL_ERROR);
×
1064
    }
1065
    
1066
    pTCtx->pCGCtxs = tSimpleHashInit(EXT_WIN_CALC_GROUP_SIZE, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
91,304✔
1067
    TSDB_CHECK_NULL(pTCtx->pCGCtxs, code, lino, _exit, terrno);
91,304✔
1068
  }
1069

1070
  pTCtx->pCCtx = tSimpleHashGet(pTCtx->pCGCtxs, &pId->groupId, sizeof(pId->groupId));
223,008✔
1071
  if (NULL == pTCtx->pCCtx) {
223,008✔
1072
    SExtWinCalcGrpCtx tmp = {0};
223,008✔
1073
    TAOS_CHECK_EXIT(tSimpleHashPut(pTCtx->pCGCtxs, &pId->groupId, sizeof(pId->groupId), &tmp, sizeof(tmp)));
223,008✔
1074
    pTCtx->pCCtx = tSimpleHashGet(pTCtx->pCGCtxs, &pId->groupId, sizeof(pId->groupId));
223,008✔
1075
    TAOS_CHECK_EXIT(extWinInitCGrpCtx(pExtW, pTaskInfo, pTCtx->pCCtx));
223,008✔
1076

1077
    if (pExtW->needGroupSort) {
223,008✔
1078
      if (pInfo->isMultiGroupCalc) {
26,664✔
1079
        if (NULL == pExtW->pCTGrpIds) {
5,656✔
1080
          pExtW->pCTGrpIds = taosArrayInit(1024, sizeof(uint64_t) * 2);
4,444✔
1081
          TSDB_CHECK_NULL(pExtW->pCTGrpIds, code, lino, _exit, terrno);
4,444✔
1082
        }
1083
        
1084
        TSDB_CHECK_NULL(taosArrayPush(pExtW->pCTGrpIds, &pId->groupId), code, lino, _exit, terrno);
11,312✔
1085
      } else {
1086
        if (NULL == pExtW->pGrpIds) {
21,008✔
1087
          pExtW->pGrpIds = taosArrayInit(1024, sizeof(uint64_t));
15,352✔
1088
          TSDB_CHECK_NULL(pExtW->pGrpIds, code, lino, _exit, terrno);
15,352✔
1089
        }
1090
        
1091
        TSDB_CHECK_NULL(taosArrayPush(pExtW->pGrpIds, &pId->groupId), code, lino, _exit, terrno);
42,016✔
1092
      }
1093
    }
1094
  }
1095
  if (pTCtx->pCCtx != NULL) {
223,008✔
1096
    pTCtx->pCCtx->groupId = pId->groupId;
223,008✔
1097
  }
1098
  
1099
  qDebug("%s ext win switch to cgrp %" PRIu64, EXT_WIN_TYPE_STR(pExtW->isMergeAlignedExtW), pId->groupId);
223,008✔
1100

NEW
1101
_exit:
×
1102

1103
  if (code) {
354,308✔
NEW
1104
    qError("%s %s %s ext win failed at line %d since %s", 
×
1105
      GET_TASKID(pTaskInfo), __func__, EXT_WIN_TYPE_STR(pExtW->isMergeAlignedExtW), lino, tstrerror(code));
1106
  }
1107
  
1108
  return code;
354,308✔
1109
}
1110

1111
static int32_t extWinSwitchInitCtxs(SExternalWindowOperator* pExtW, SExecTaskInfo* pTaskInfo, SBlockID* pId) {
4,451,786✔
1112
  int32_t code = 0, lino = 0;
4,451,786✔
1113

1114
  if (pTaskInfo->pStreamRuntimeInfo == NULL) {
4,451,786✔
1115
    return TSDB_CODE_SUCCESS;
4,097,478✔
1116
  }
1117

1118
  extWinResolveBlockIdForPartition(pExtW, pTaskInfo, pId);
354,308✔
1119
  TAOS_CHECK_EXIT(extWinSwitchInitTGrpCtx(pExtW, pTaskInfo, pId));
354,308✔
1120
  TAOS_CHECK_EXIT(extWinSwitchInitCGrpCtx(pExtW, pTaskInfo, pId));
354,308✔
1121

1122
_exit:
354,308✔
1123

1124
  if (code) {
354,308✔
NEW
1125
    qError("%s %s %s ext win failed at line %d since %s", 
×
1126
      GET_TASKID(pTaskInfo), __func__, EXT_WIN_TYPE_STR(pExtW->isMergeAlignedExtW), lino, tstrerror(code));
1127
  }
1128

1129
  return code;
354,308✔
1130
}
1131

1132

NEW
1133
static void extWinResetResultRows(SExtWinResultRows* pRows) {
×
NEW
1134
  pRows->resRowsIdx = 0;
×
NEW
1135
  pRows->resRowIdx = 0;
×
NEW
1136
}
×
1137

1138
static int32_t mergeAlignExtWinSetOutputBuf(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, const STimeWindow* pWin, SResultRow** pResult,
2,189,276✔
1139
                                       SExprSupp* pExprSup, SAggSupporter* pAggSup) {
1140
  if (*pResult == NULL) {
2,189,276✔
1141
    *pResult = getNewResultRow(pAggSup->pResultBuf, &pAggSup->currentPageId, pAggSup->resultRowSize);
42,016✔
1142
    if (!*pResult) {
42,016✔
NEW
1143
      qError("get new resultRow failed, err:%s", tstrerror(terrno));
×
NEW
1144
      return terrno;
×
1145
    }
1146
    pResultRowInfo->cur = (SResultRowPosition){.pageId = (*pResult)->pageId, .offset = (*pResult)->offset};
42,016✔
1147
  }
1148
  
1149
  (*pResult)->win = *pWin;
2,189,276✔
1150
  (*pResult)->winIdx = extWinGetCurWinIdx(pOperator);
2,189,276✔
1151
  
1152
  return setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
2,189,276✔
1153
}
1154

1155

1156
static int32_t mergeAlignExtWinGetWinFromTs(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW, TSKEY ts, STimeWindow** ppWin) {
2,189,276✔
1157
  int32_t blkWinIdx = extWinGetCurWinIdx(pOperator);
2,189,276✔
1158
  
1159
  // TODO handle desc order
1160
  for (int32_t i = blkWinIdx; i < pExtW->pTGrpCtx->pCCtx->pWins->size; ++i) {
13,639,444✔
1161
    STimeWindow* pWin = taosArrayGet(pExtW->pTGrpCtx->pCCtx->pWins, i);
13,639,444✔
1162
    if (ts == pWin->skey) {
13,639,444✔
1163
      extWinSetCurWinIdx(pOperator, i);
2,189,276✔
1164
      *ppWin = pWin;
2,189,276✔
1165
      return TSDB_CODE_SUCCESS;
2,189,276✔
1166
    } else if (ts < pWin->skey) {
11,450,168✔
NEW
1167
      qError("invalid ts %" PRId64 " for tgrp %" PRIu64 " current window idx %d skey %" PRId64, ts, pExtW->lastTGrpId, i, pWin->skey);
×
NEW
1168
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
1169
    }
1170
  }
1171
  
NEW
1172
  qError("invalid ts %" PRId64 " to find tgrp %" PRIu64" merge aligned ext window, size:%d", ts, pExtW->lastTGrpId, (int32_t)pExtW->pTGrpCtx->pCCtx->pWins->size);
×
1173
  
NEW
1174
  return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
1175
}
1176

1177
static int32_t mergeAlignExtWinFinalizeResult(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, SSDataBlock* pResultBlock) {
2,189,276✔
1178
  int32_t        code = 0, lino = 0;
2,189,276✔
1179
  SMergeAlignedExternalWindowOperator* pMAExtW = pOperator->info;
2,189,276✔
1180
  SExternalWindowOperator*             pExtW = pMAExtW->pExtW;
2,189,276✔
1181
  SExprSupp*     pSup = &pOperator->exprSupp;
2,189,276✔
1182
  SResultRow*  pResultRow = pMAExtW->pResultRow;
2,189,276✔
1183
  
1184
  finalizeResultRows(pExtW->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pOperator->pTaskInfo);
2,189,276✔
1185

1186
  if (pResultRow->numOfRows > 0) {
2,189,276✔
1187
    TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pExtW->pWinRowIdx, pResultBlock, pResultRow->winIdx, pResultRow->numOfRows));
2,189,276✔
1188
  }
1189

1190
_exit:
2,189,276✔
1191

1192
  if (code) {
2,189,276✔
NEW
1193
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1194
  }
1195

1196
  return code;
2,189,276✔
1197
}
1198

1199
// For vtable COLS merge (isDynWindow): fill skipped windows [fromIdx, toIdx) with NULL rows.
1200
// Each COLS merge source must output exactly numOfWins rows so columns align positionally.
NEW
1201
static int32_t mergeAlignExtWinFillEmptyWins(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo,
×
1202
                                              SSDataBlock* pResultBlock, int32_t fromIdx, int32_t toIdx) {
NEW
1203
  SMergeAlignedExternalWindowOperator* pMAExtW = pOperator->info;
×
NEW
1204
  SExternalWindowOperator*             pExtW = pMAExtW->pExtW;
×
NEW
1205
  SExprSupp*                           pSup = &pOperator->exprSupp;
×
NEW
1206
  int32_t                              code = 0, lino = 0;
×
1207

NEW
1208
  for (int32_t i = fromIdx; i < toIdx; ++i) {
×
NEW
1209
    STimeWindow* pWin = taosArrayGet(pExtW->pTGrpCtx->pCCtx->pWins, i);
×
NEW
1210
    if (pWin == NULL) continue;
×
1211

NEW
1212
    extWinSetCurWinIdx(pOperator, i);
×
NEW
1213
    TAOS_CHECK_EXIT(mergeAlignExtWinSetOutputBuf(pOperator, pResultRowInfo, pWin, &pMAExtW->pResultRow, pSup, &pExtW->aggSup));
×
NEW
1214
    TAOS_CHECK_EXIT(mergeAlignExtWinFinalizeResult(pOperator, pResultRowInfo, pResultBlock));
×
NEW
1215
    resetResultRow(pMAExtW->pResultRow, pExtW->aggSup.resultRowSize - sizeof(SResultRow));
×
NEW
1216
    pMAExtW->lastFinalizedWinIdx = i;
×
1217
  }
1218

NEW
1219
_exit:
×
NEW
1220
  if (code) {
×
NEW
1221
    qError("%s %s failed at line %d since %s", GET_TASKID(pOperator->pTaskInfo), __func__, lino, tstrerror(code));
×
1222
  }
NEW
1223
  return code;
×
1224
}
1225

1226
static int32_t mergeAlignExtWinAggDo(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock, SSDataBlock* pResultBlock) {
52,116✔
1227
  SMergeAlignedExternalWindowOperator* pMAExtW = pOperator->info;
52,116✔
1228
  SExternalWindowOperator*             pExtW = pMAExtW->pExtW;
52,116✔
1229

1230
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
52,116✔
1231
  SExprSupp*     pSup = &pOperator->exprSupp;
52,116✔
1232
  int32_t        code = 0, lino = 0;
52,116✔
1233
  STimeWindow *pWin = NULL;
52,116✔
1234

1235
  int32_t startPos = 0;
52,116✔
1236
  int64_t* tsCols = extWinExtractTsCol(pBlock, pExtW->primaryTsIndex, pTaskInfo);
52,116✔
1237
  TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
52,116✔
1238

1239
  extWinSetCurWinIdx(pOperator, 0);
52,116✔
1240
  code = mergeAlignExtWinGetWinFromTs(pOperator, pExtW, ts, &pWin);
52,116✔
1241
  if (code) {
52,116✔
NEW
1242
    qError("failed to get time window for tgrp %" PRIu64 " ts:%" PRId64 ", prim ts index:%d, error:%s", 
×
1243
      pExtW->lastTGrpId, ts, pExtW->primaryTsIndex, tstrerror(code));
UNCOV
1244
    TAOS_CHECK_EXIT(code);
×
1245
  }
1246

1247
  int32_t newWinIdx = extWinGetCurWinIdx(pOperator);
52,116✔
1248

1249
  if (pMAExtW->curTs != INT64_MIN && pMAExtW->curTs != pWin->skey) {
52,116✔
1250
    TAOS_CHECK_EXIT(mergeAlignExtWinFinalizeResult(pOperator, pResultRowInfo, pResultBlock));
3,636✔
1251
    pMAExtW->lastFinalizedWinIdx = pMAExtW->pResultRow->winIdx;
3,636✔
1252
    resetResultRow(pMAExtW->pResultRow, pExtW->aggSup.resultRowSize - sizeof(SResultRow));
3,636✔
1253
  }
1254

1255
  if (pExtW->isDynWindow && pMAExtW->lastFinalizedWinIdx + 1 < newWinIdx) {
52,116✔
NEW
1256
    TAOS_CHECK_EXIT(mergeAlignExtWinFillEmptyWins(pOperator, pResultRowInfo, pResultBlock, pMAExtW->lastFinalizedWinIdx + 1, newWinIdx));
×
NEW
1257
    extWinSetCurWinIdx(pOperator, newWinIdx);
×
1258
  }
1259

1260
  TAOS_CHECK_EXIT(mergeAlignExtWinSetOutputBuf(pOperator, pResultRowInfo, pWin, &pMAExtW->pResultRow, pSup, &pExtW->aggSup));
52,116✔
1261

1262
  int32_t currPos = startPos;
52,116✔
1263
  pMAExtW->curTs = pWin->skey;
52,116✔
1264
  
1265
  while (++currPos < pBlock->info.rows) {
4,264,220✔
1266
    if (tsCols[currPos] == pMAExtW->curTs) continue;
4,212,104✔
1267

1268
    qDebug("current ts:%" PRId64 ", startPos:%d, currPos:%d, tsCols[currPos]:%" PRId64,
2,137,160✔
1269
      pMAExtW->curTs, startPos, currPos, tsCols[currPos]); 
1270
    TAOS_CHECK_EXIT(applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pExtW->twAggSup.timeWindowData, startPos,
2,137,160✔
1271
                                           currPos - startPos, pBlock->info.rows, pSup->numOfExprs));
1272

1273
    TAOS_CHECK_EXIT(mergeAlignExtWinFinalizeResult(pOperator, pResultRowInfo, pResultBlock));
2,137,160✔
1274
    pMAExtW->lastFinalizedWinIdx = pMAExtW->pResultRow->winIdx;
2,137,160✔
1275
    resetResultRow(pMAExtW->pResultRow, pExtW->aggSup.resultRowSize - sizeof(SResultRow));
2,137,160✔
1276

1277
    TAOS_CHECK_EXIT(mergeAlignExtWinGetWinFromTs(pOperator, pExtW, tsCols[currPos], &pWin));
2,137,160✔
1278
    newWinIdx = extWinGetCurWinIdx(pOperator);
2,137,160✔
1279

1280
    if (pExtW->isDynWindow && pMAExtW->lastFinalizedWinIdx + 1 < newWinIdx) {
2,137,160✔
NEW
1281
      TAOS_CHECK_EXIT(mergeAlignExtWinFillEmptyWins(pOperator, pResultRowInfo, pResultBlock, pMAExtW->lastFinalizedWinIdx + 1, newWinIdx));
×
NEW
1282
      extWinSetCurWinIdx(pOperator, newWinIdx);
×
1283
    }
1284

1285
    qDebug("ext window align2 start:%" PRId64 ", end:%" PRId64, pWin->skey, pWin->ekey);
2,137,160✔
1286
    startPos = currPos;
2,137,160✔
1287
    
1288
    TAOS_CHECK_EXIT(mergeAlignExtWinSetOutputBuf(pOperator, pResultRowInfo, pWin, &pMAExtW->pResultRow, pSup, &pExtW->aggSup));
2,137,160✔
1289

1290
    pMAExtW->curTs = pWin->skey;
2,137,160✔
1291
  }
1292

1293
  code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pExtW->twAggSup.timeWindowData, startPos,
104,232✔
1294
                                         currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
52,116✔
1295

1296
_exit:
52,116✔
1297

1298
  if (code != 0) {
52,116✔
1299
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1300
    T_LONG_JMP(pTaskInfo->env, code);
×
1301
  }
1302
  
1303
  return code;
52,116✔
1304
}
1305

1306
static int32_t mergeAlignExtWinBuildWinRowIdx(SOperatorInfo* pOperator, SSDataBlock* pInput, SSDataBlock* pResult) {
×
1307
  SExternalWindowOperator* pExtW = pOperator->info;
×
1308
  int64_t* tsCols = extWinExtractTsCol(pInput, pExtW->primaryTsIndex, pOperator->pTaskInfo);
×
1309
  STimeWindow* pWin = NULL;
×
1310
  int32_t code = 0, lino = 0;
×
1311
  int64_t prevTs = INT64_MIN;
×
1312
  
1313
  for (int32_t i = 0; i < pInput->info.rows; ++i) {
×
1314
    if (prevTs == tsCols[i]) {
×
1315
      continue;
×
1316
    }
1317
    
1318
    TAOS_CHECK_EXIT(mergeAlignExtWinGetWinFromTs(pOperator, pExtW, tsCols[i], &pWin));
×
NEW
1319
    TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pExtW->pWinRowIdx, pResult, extWinGetCurWinIdx(pOperator), pInput->info.rows - i));
×
1320

1321
    prevTs = tsCols[i];
×
1322
  }
1323

1324
_exit:
×
1325

1326
  if (code != 0) {
×
1327
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1328
  }
1329

1330
  return code;  
×
1331
}
1332

1333
static int32_t mergeAlignExtWinProjectDo(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
×
1334
                                            SSDataBlock* pResultBlock) {
1335
  SExternalWindowOperator* pExtW = pOperator->info;
×
1336
  SExprSupp*               pExprSup = &pExtW->scalarSupp;
×
1337
  int32_t                  code = 0, lino = 0;
×
1338
  
1339
  TAOS_CHECK_EXIT(projectApplyFunctions(pExprSup->pExprInfo, pResultBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
×
1340
                        GET_STM_RTINFO(pOperator->pTaskInfo)));
1341

1342
  TAOS_CHECK_EXIT(mergeAlignExtWinBuildWinRowIdx(pOperator, pBlock, pResultBlock));
×
1343

1344
_exit:
×
1345

1346
  if (code != 0) {
×
1347
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1348
  }
1349

1350
  return code;
×
1351
}
1352

1353
static void mergeAlignExtWinDo(SOperatorInfo* pOperator) {
49,692✔
1354
  SExecTaskInfo*                       pTaskInfo = pOperator->pTaskInfo;
49,692✔
1355
  SMergeAlignedExternalWindowOperator* pMAExtW = pOperator->info;
49,692✔
1356
  SExternalWindowOperator*             pExtW = pMAExtW->pExtW;
49,692✔
1357
  SResultRow*                          pResultRow = NULL;
49,692✔
1358
  int32_t                              code = 0;
49,692✔
1359
  SSDataBlock*                         pRes = pExtW->binfo.pRes;
49,692✔
1360
  SExprSupp*                           pSup = &pOperator->exprSupp;
49,692✔
1361
  int32_t                              lino = 0;
49,692✔
1362
  SStreamRuntimeFuncInfo*              pStream = &pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo;
49,692✔
1363

1364
  taosArrayClear(pExtW->pWinRowIdx);
49,692✔
1365
  blockDataCleanup(pRes);
49,692✔
1366

1367
  SSDataBlock* pBlock = NULL;
49,692✔
1368
  while (1) {
1369
    if (pMAExtW->pNewGroup != NULL) {
101,404✔
1370
      pBlock = pMAExtW->pNewGroup;
6,464✔
1371
      pMAExtW->pNewGroup = NULL;
6,464✔
1372
    } else {
1373
      pBlock = getNextBlockFromDownstream(pOperator, 0);
94,940✔
1374
    }
1375

1376
    if (pBlock == NULL) {
101,404✔
1377
      // close last time window
1378
      if (pMAExtW->curTs != INT64_MIN && EEXT_MODE_AGG == pExtW->mode) {
42,824✔
1379
        TAOS_CHECK_EXIT(mergeAlignExtWinFinalizeResult(pOperator, &pExtW->binfo.resultRowInfo, pRes));
42,016✔
1380
        pMAExtW->lastFinalizedWinIdx = pMAExtW->pResultRow->winIdx;
42,016✔
1381
      }
1382
      // fill remaining empty windows with NULL rows (only for virtual table COLS merge)
1383
      if (pExtW->isDynWindow && EEXT_MODE_AGG == pExtW->mode && pExtW->pTGrpCtx && pExtW->pTGrpCtx->pCCtx && pExtW->pTGrpCtx->pCCtx->pWins) {
42,824✔
NEW
1384
        int32_t totalWins = taosArrayGetSize(pExtW->pTGrpCtx->pCCtx->pWins);
×
NEW
1385
        if (pMAExtW->lastFinalizedWinIdx + 1 < totalWins) {
×
NEW
1386
          TAOS_CHECK_EXIT(mergeAlignExtWinFillEmptyWins(pOperator, &pExtW->binfo.resultRowInfo, pRes, pMAExtW->lastFinalizedWinIdx + 1, totalWins));
×
1387
        }
1388
      }
1389
      setOperatorCompleted(pOperator);
42,824✔
1390
      break;
42,824✔
1391
    }
1392

1393
    extWinResolveBlockIdForPartition(pExtW, pTaskInfo, &pBlock->info.id);
58,580✔
1394

1395
    if (pExtW->lastCGrpId != pBlock->info.id.groupId) {
58,580✔
1396
      if (pMAExtW->curTs != INT64_MIN && EEXT_MODE_AGG == pExtW->mode) {
19,796✔
1397
        TAOS_CHECK_EXIT(mergeAlignExtWinFinalizeResult(pOperator, &pExtW->binfo.resultRowInfo, pRes));
6,464✔
1398

1399
        // Group boundary: always clear row state and curTs after finalize to avoid
1400
        // duplicate finalization and cross-group state reuse on next block.
1401

1402
        if (pMAExtW->pResultRow != NULL) {
6,464✔
1403
          resetResultRow(pMAExtW->pResultRow, pExtW->aggSup.resultRowSize - sizeof(SResultRow));
6,464✔
1404
        }
1405

1406
        pMAExtW->curTs = INT64_MIN;
6,464✔
1407
      }
1408

1409
      if (pRes->info.rows > 0) {
19,796✔
1410
        pMAExtW->pNewGroup = pBlock;
6,464✔
1411
        pMAExtW->curTs = INT64_MIN;
6,464✔
1412
        break;
6,464✔
1413
      }
1414
    }
1415

1416
    TAOS_CHECK_EXIT(extWinSwitchInitCtxs(pExtW, pTaskInfo, &pBlock->info.id));
52,116✔
1417

1418
    pRes->info.scanFlag = pBlock->info.scanFlag;
52,116✔
1419
    pRes->info.id.groupId = pBlock->info.id.baseGId;  // only keep TGroup ID
52,116✔
1420
    
1421
    code = setInputDataBlock(pSup, pBlock, pExtW->binfo.inputTsOrder, pBlock->info.scanFlag, true);
52,116✔
1422
    QUERY_CHECK_CODE(code, lino, _exit);
52,116✔
1423

1424
    printDataBlock(pBlock, __func__, "externalwindowAlign", pTaskInfo->id.queryId);
52,116✔
1425
    qDebug("ext windowpExtWAlign->scalarMode:%d", pExtW->mode);
52,116✔
1426

1427
    if (EEXT_MODE_SCALAR == pExtW->mode) {
52,116✔
1428
      TAOS_CHECK_EXIT(mergeAlignExtWinProjectDo(pOperator, &pExtW->binfo.resultRowInfo, pBlock, pRes));
×
1429
    } else {
1430
      TAOS_CHECK_EXIT(mergeAlignExtWinAggDo(pOperator, &pExtW->binfo.resultRowInfo, pBlock, pRes));
52,116✔
1431
    }
1432

1433
    if (pRes->info.rows >= pOperator->resultInfo.threshold) {
52,116✔
1434
      break;
404✔
1435
    }
1436
  }
1437

1438
  pStream->pStreamBlkWinIdx = pExtW->pWinRowIdx;
49,692✔
1439

1440
  extWinPostUpdateStreamRt(pStream, pOperator, pExtW);
49,692✔
1441
  
1442
_exit:
49,692✔
1443

1444
  if (code != 0) {
49,692✔
1445
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
1446
    pTaskInfo->code = code;
×
1447
    T_LONG_JMP(pTaskInfo->env, code);
×
1448
  }
1449
}
49,692✔
1450

1451
static int32_t mergeAlignExtWinNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
90,496✔
1452
  SExecTaskInfo*                       pTaskInfo = pOperator->pTaskInfo;
90,496✔
1453
  SMergeAlignedExternalWindowOperator* pMAExtW = pOperator->info;
90,496✔
1454
  SExternalWindowOperator*             pExtW = pMAExtW->pExtW;
90,496✔
1455
  int32_t                              code = 0;
90,496✔
1456
  int32_t lino = 0;
90,496✔
1457

1458
  if (pOperator->status == OP_EXEC_DONE) {
90,496✔
1459
    (*ppRes) = NULL;
40,804✔
1460
    return TSDB_CODE_SUCCESS;
40,804✔
1461
  }
1462

1463
  SSDataBlock* pRes = pExtW->binfo.pRes;
49,692✔
1464
  blockDataCleanup(pRes);
49,692✔
1465

1466
  mergeAlignExtWinDo(pOperator);
49,692✔
1467

1468
  if (pRes->info.rows > 0 && pOperator->exprSupp.pFilterInfo != NULL) {
49,692✔
1469
    SColumnInfoData* pFilterRes = NULL;
6,464✔
1470
    TAOS_CHECK_EXIT(doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, &pFilterRes));
6,464✔
1471
    colDataDestroy(pFilterRes);
6,464✔
1472
    taosMemoryFree(pFilterRes);
6,464✔
1473
  }
1474

1475
  size_t rows = pRes->info.rows;
49,692✔
1476
  (*ppRes) = (rows == 0) ? NULL : pRes;
49,692✔
1477

1478
_exit:
49,692✔
1479

1480
  if (code != 0) {
49,692✔
1481
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
1482
    pTaskInfo->code = code;
×
1483
    T_LONG_JMP(pTaskInfo->env, code);
×
1484
  }
1485
  return code;
49,692✔
1486
}
1487

NEW
1488
static int32_t resetMergeAlignedExtWinOperator(SOperatorInfo* pOperator) {
×
NEW
1489
  int32_t code = 0, lino = 0;
×
NEW
1490
  SMergeAlignedExternalWindowOperator* pMAExtW = pOperator->info;
×
NEW
1491
  SExternalWindowOperator*             pExtW = pMAExtW->pExtW;
×
UNCOV
1492
  SExecTaskInfo*                       pTaskInfo = pOperator->pTaskInfo;
×
UNCOV
1493
  SMergeAlignedIntervalPhysiNode * pPhynode = (SMergeAlignedIntervalPhysiNode*)pOperator->pPhyNode;
×
UNCOV
1494
  pOperator->status = OP_NOT_OPENED;
×
1495

1496
  //resetBasicOperatorState(&pExtW->binfo);
1497
  //pMAExtW->pResultRow = NULL;
1498

NEW
1499
  initResultRowInfo(&pExtW->binfo.resultRowInfo);
×
NEW
1500
  pMAExtW->curTs = INT64_MIN;
×
NEW
1501
  pMAExtW->lastFinalizedWinIdx = -1;
×
1502

NEW
1503
  extWinResetResultRows(&pExtW->resultRows);
×
1504

1505
/*
1506
  int32_t code = resetAggSup(&pOperator->exprSupp, &pExtW->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
1507
                             sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, NULL,
1508
                             &pTaskInfo->storageAPI.functionStore);
1509
*/                             
1510

NEW
1511
  colDataDestroy(&pExtW->twAggSup.timeWindowData);
×
NEW
1512
  TAOS_CHECK_EXIT(initExecTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pTaskInfo->window));
×
1513

NEW
1514
  pExtW->outWinIdx = 0;
×
NEW
1515
  pExtW->lastTGrpId = 0;
×
NEW
1516
  pExtW->lastCGrpId = 0;
×
NEW
1517
  pExtW->pTGrpCtx = NULL;
×
NEW
1518
  pExtW->ownTGrpCtx = false;
×
1519

NEW
1520
_exit:
×
1521

NEW
1522
  if (code != 0) {
×
NEW
1523
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
1524
  }
1525
  
UNCOV
1526
  return code;
×
1527
}
1528

1529
int32_t createMergeAlignedExternalWindowOperator(SOperatorInfo* pDownstream, SPhysiNode* pNode,
42,824✔
1530
                                                 SExecTaskInfo* pTaskInfo, SOperatorInfo** ppOptrOut) {
1531
  SExternalWindowPhysiNode* pPhynode = (SExternalWindowPhysiNode*)pNode;
42,824✔
1532
  STimeWindow nonStreamExtWinRange = {.skey = INT64_MAX, .ekey = INT64_MIN};
42,824✔
1533
  int32_t code = 0;
42,824✔
1534
  int32_t lino = 0;
42,824✔
1535
  SMergeAlignedExternalWindowOperator* pMAExtW = taosMemoryCalloc(1, sizeof(SMergeAlignedExternalWindowOperator));
42,824✔
1536
  SOperatorInfo*                       pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
42,824✔
1537

1538
  if (pTaskInfo->pStreamRuntimeInfo != NULL){
42,824✔
UNCOV
1539
    pTaskInfo->pStreamRuntimeInfo->funcInfo.withExternalWindow = true;
×
1540
  }
1541
  pOperator->pPhyNode = pNode;
42,824✔
1542
  if (!pMAExtW || !pOperator) {
42,824✔
1543
    code = terrno;
×
1544
    goto _error;
×
1545
  }
1546
  initOperatorCostInfo(pOperator);
42,824✔
1547

1548
  pMAExtW->pExtW = taosMemoryCalloc(1, sizeof(SExternalWindowOperator));
42,824✔
1549
  if (!pMAExtW->pExtW) {
42,824✔
1550
    code = terrno;
×
1551
    goto _error;
×
1552
  }
1553

1554
  SExternalWindowOperator* pExtW = pMAExtW->pExtW;
42,824✔
1555
  pExtW->pTaskInfo = pTaskInfo;
42,824✔
1556
  SExprSupp* pSup = &pOperator->exprSupp;
42,824✔
1557
  pSup->hasWindowOrGroup = true;
42,824✔
1558
  pSup->hasWindow = true;
42,824✔
1559
  pMAExtW->curTs = INT64_MIN;
42,824✔
1560
  pMAExtW->lastFinalizedWinIdx = -1;
42,824✔
1561

1562
  pExtW->primaryTsIndex = ((SColumnNode*)pPhynode->window.pTspk)->slotId;
42,824✔
1563
  pExtW->mode = pPhynode->window.pProjs ? EEXT_MODE_SCALAR : EEXT_MODE_AGG;
42,824✔
1564
  pExtW->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC;
42,824✔
1565
  pExtW->binfo.outputTsOrder = pExtW->binfo.inputTsOrder;
42,824✔
1566
  pExtW->needGroupSort = pPhynode->needGroupSort;
42,824✔
1567
  pExtW->calcWithPartition = pPhynode->calcWithPartition;
42,824✔
1568
  pExtW->extWinSplit = pPhynode->extWinSplit;
42,824✔
1569

1570
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
42,824✔
1571
  initResultSizeInfo(&pOperator->resultInfo, 4096);
42,824✔
1572

1573
  int32_t num = 0;
42,824✔
1574
  SExprInfo* pExprInfo = NULL;
42,824✔
1575
  code = createExprInfo(pPhynode->window.pFuncs, NULL, &pExprInfo, &num);
42,824✔
1576
  QUERY_CHECK_CODE(code, lino, _error);
42,824✔
1577

1578
  if (pTaskInfo->execModel != OPTR_EXEC_MODEL_STREAM) {
42,824✔
1579
    code = extWinInitNonStreamWindowDataFromBlock(pPhynode, pTaskInfo, &nonStreamExtWinRange);
42,824✔
1580
    QUERY_CHECK_CODE(code, lino, _error);
42,824✔
1581
  }
1582

1583
  if (pExtW->mode == EEXT_MODE_AGG) {
42,824✔
1584
    code = initAggSup(pSup, &pExtW->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, NULL,
42,824✔
1585
                      &pTaskInfo->storageAPI.functionStore);
1586
    QUERY_CHECK_CODE(code, lino, _error);
42,824✔
1587
  }
1588

1589
  code = filterInitFromNode((SNode*)pNode->pConditions, &pOperator->exprSupp.pFilterInfo, 0,
42,824✔
1590
                            pTaskInfo->pStreamRuntimeInfo);
42,824✔
1591
  QUERY_CHECK_CODE(code, lino, _error);
42,824✔
1592

1593
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhynode->window.node.pOutputDataBlockDesc);
42,824✔
1594
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
42,824✔
1595
  initBasicInfo(&pExtW->binfo, pResBlock);
42,824✔
1596

1597
  pExtW->pWinRowIdx = taosArrayInit(4096, sizeof(int64_t));
42,824✔
1598
  TSDB_CHECK_NULL(pExtW->pWinRowIdx, code, lino, _error, terrno);
42,824✔
1599

1600
  initResultRowInfo(&pExtW->binfo.resultRowInfo);
42,824✔
1601
  code = blockDataEnsureCapacity(pExtW->binfo.pRes, pOperator->resultInfo.capacity);
42,824✔
1602
  QUERY_CHECK_CODE(code, lino, _error);
42,824✔
1603

1604
  pExtW->isMergeAlignedExtW = true;
42,824✔
1605
  
1606
  setOperatorInfo(pOperator, "MergeAlignedExternalWindowOperator", QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW, false, OP_NOT_OPENED, pMAExtW, pTaskInfo);
42,824✔
1607
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignExtWinNext, NULL,
42,824✔
1608
                                         destroyMergeAlignedExternalWindowOperator, optrDefaultBufFn, NULL,
1609
                                         optrDefaultGetNextExtFn, NULL);
1610
  setOperatorResetStateFn(pOperator, resetMergeAlignedExtWinOperator);
42,824✔
1611

1612
  code = appendDownstream(pOperator, &pDownstream, 1);
42,824✔
1613
  QUERY_CHECK_CODE(code, lino, _error);
42,824✔
1614
  *ppOptrOut = pOperator;
42,824✔
1615
  return code;
42,824✔
1616
  
1617
_error:
×
NEW
1618
  if (pMAExtW) destroyMergeAlignedExternalWindowOperator(pMAExtW);
×
1619
  destroyOperatorAndDownstreams(pOperator, &pDownstream, 1);
×
1620
  pTaskInfo->code = code;
×
1621
  return code;
×
1622
}
1623

1624
static void extWinResetResultRows(SExtWinResultRows* pRows);
UNCOV
1625
static int32_t resetExternalWindowExprSupp(SExternalWindowOperator* pExtW, SExecTaskInfo* pTaskInfo,
×
1626
                                           SExternalWindowPhysiNode* pPhynode) {
UNCOV
1627
  int32_t    code = 0, lino = 0, num = 0;
×
UNCOV
1628
  SExprInfo* pExprInfo = NULL;
×
UNCOV
1629
  cleanupExprSuppWithoutFilter(&pExtW->scalarSupp);
×
1630

UNCOV
1631
  SNodeList* pNodeList = NULL;
×
UNCOV
1632
  if (pPhynode->window.pProjs) {
×
1633
    pNodeList = pPhynode->window.pProjs;
×
1634
  } else {
UNCOV
1635
    pNodeList = pPhynode->window.pExprs;
×
1636
  }
1637

UNCOV
1638
  code = createExprInfo(pNodeList, NULL, &pExprInfo, &num);
×
UNCOV
1639
  QUERY_CHECK_CODE(code, lino, _error);
×
UNCOV
1640
  code = initExprSupp(&pExtW->scalarSupp, pExprInfo, num, &pTaskInfo->storageAPI.functionStore);
×
UNCOV
1641
  QUERY_CHECK_CODE(code, lino, _error);
×
NEW
1642
  pExtW->lastGrpIdx = INT32_MAX;
×
NEW
1643
  extWinResetResultRows(&pExtW->resultRows);
×
1644
  return code;
×
1645
_error:
×
1646
  if (code != TSDB_CODE_SUCCESS) {
×
1647
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1648
    pTaskInfo->code = code;
×
1649
  }
1650
  return code;
×
1651
}
1652

1653

UNCOV
1654
static int32_t resetExternalWindowOperator(SOperatorInfo* pOperator) {
×
UNCOV
1655
  int32_t code = 0, lino = 0;
×
UNCOV
1656
  SExternalWindowOperator* pExtW = pOperator->info;
×
UNCOV
1657
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
×
UNCOV
1658
  SExternalWindowPhysiNode* pPhynode = (SExternalWindowPhysiNode*)pOperator->pPhyNode;
×
UNCOV
1659
  pOperator->status = OP_NOT_OPENED;
×
1660

1661
  //resetBasicOperatorState(&pExtW->binfo);
UNCOV
1662
  initResultRowInfo(&pExtW->binfo.resultRowInfo);
×
1663

UNCOV
1664
  extWinRecycleBlkNode(pExtW, &pExtW->pLastBlkNode);
×
1665

1666
/*
1667
  int32_t code = blockDataEnsureCapacity(pExtW->binfo.pRes, pOperator->resultInfo.capacity);
1668
  if (code == 0) {
1669
    code = resetAggSup(&pOperator->exprSupp, &pExtW->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
1670
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
1671
                       &pTaskInfo->storageAPI.functionStore);
1672
  }
1673
*/
UNCOV
1674
  TAOS_CHECK_EXIT(resetExternalWindowExprSupp(pExtW, pTaskInfo, pPhynode));
×
UNCOV
1675
  colDataDestroy(&pExtW->twAggSup.timeWindowData);
×
UNCOV
1676
  TAOS_CHECK_EXIT(initExecTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pTaskInfo->window));
×
1677

NEW
1678
  pExtW->resWinIdx = 0;
×
NEW
1679
  pExtW->lastOutputIter = 0;
×
UNCOV
1680
  pExtW->outWinIdx = 0;
×
NEW
1681
  pExtW->lastTGrpId = 0;
×
NEW
1682
  pExtW->lastCGrpId = 0;
×
NEW
1683
  pExtW->pTGrpCtx = NULL;
×
NEW
1684
  pExtW->ownTGrpCtx = false;
×
UNCOV
1685
  pExtW->isDynWindow = false;
×
1686

NEW
1687
  extWinResetResultRows(&pExtW->resultRows);
×
1688
  
UNCOV
1689
  qDebug("%s ext window stat at reset, created:%" PRId64 ", destroyed:%" PRId64 ", recycled:%" PRId64 ", reused:%" PRId64 ", append:%" PRId64, 
×
1690
      pTaskInfo->id.str, pExtW->stat.resBlockCreated, pExtW->stat.resBlockDestroyed, pExtW->stat.resBlockRecycled, 
1691
      pExtW->stat.resBlockReused, pExtW->stat.resBlockAppend);
1692

UNCOV
1693
_exit:
×
1694

UNCOV
1695
  if (code) {
×
1696
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1697
  }
1698
  
UNCOV
1699
  return code;
×
1700
}
1701

1702
static EDealRes extWinHasCountLikeFunc(SNode* pNode, void* res) {
6,920,818✔
1703
  if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
6,920,818✔
1704
    SFunctionNode* pFunc = (SFunctionNode*)pNode;
2,527,825✔
1705
    if (fmIsCountLikeFunc(pFunc->funcId) || (pFunc->hasOriginalFunc && fmIsCountLikeFunc(pFunc->originalFuncId))) {
2,527,825✔
1706
      *(bool*)res = true;
880,633✔
1707
      return DEAL_RES_END;
881,037✔
1708
    }
1709
  }
1710
  return DEAL_RES_CONTINUE;
6,040,589✔
1711
}
1712

1713

UNCOV
1714
static int32_t extWinCreateEmptyInputBlock(SOperatorInfo* pOperator, SSDataBlock** ppBlock) {
×
UNCOV
1715
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1716
  int32_t lino = 0;
×
UNCOV
1717
  SSDataBlock* pBlock = NULL;
×
UNCOV
1718
  if (!tsCountAlwaysReturnValue) {
×
1719
    return TSDB_CODE_SUCCESS;
×
1720
  }
1721

UNCOV
1722
  SExternalWindowOperator* pExtW = pOperator->info;
×
1723

UNCOV
1724
  if (!pExtW->hasCountFunc) {
×
1725
    return TSDB_CODE_SUCCESS;
×
1726
  }
1727

UNCOV
1728
  code = createDataBlock(&pBlock);
×
UNCOV
1729
  if (code) {
×
1730
    return code;
×
1731
  }
1732

UNCOV
1733
  pBlock->info.rows = 1;
×
UNCOV
1734
  pBlock->info.capacity = 0;
×
1735

NEW
1736
  SExprSupp* pSupps[] = {&pOperator->exprSupp, &pExtW->scalarSupp};
×
NEW
1737
  for (int32_t s = 0; s < 2; ++s) {
×
NEW
1738
    SExprSupp* pSupp = pSupps[s];
×
NEW
1739
    if (pSupp == NULL || pSupp->pExprInfo == NULL) {
×
NEW
1740
      continue;
×
1741
    }
1742

NEW
1743
    for (int32_t i = 0; i < pSupp->numOfExprs; ++i) {
×
NEW
1744
      SColumnInfoData colInfo = {0};
×
NEW
1745
      colInfo.hasNull = true;
×
NEW
1746
      colInfo.info.type = TSDB_DATA_TYPE_NULL;
×
NEW
1747
      colInfo.info.bytes = 1;
×
1748

NEW
1749
      SExprInfo* pOneExpr = &pSupp->pExprInfo[i];
×
NEW
1750
      for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
×
NEW
1751
        SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
×
NEW
1752
        if (pFuncParam->type != FUNC_PARAM_TYPE_COLUMN) {
×
NEW
1753
          continue;
×
1754
        }
1755

UNCOV
1756
        int32_t slotId = pFuncParam->pCol->slotId;
×
UNCOV
1757
        int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
×
NEW
1758
        if (slotId < numOfCols) {
×
NEW
1759
          continue;
×
1760
        }
1761

NEW
1762
        code = taosArrayEnsureCap(pBlock->pDataBlock, slotId + 1);
×
NEW
1763
        QUERY_CHECK_CODE(code, lino, _end);
×
1764

NEW
1765
        for (int32_t k = numOfCols; k < slotId + 1; ++k) {
×
NEW
1766
          void* tmp = taosArrayPush(pBlock->pDataBlock, &colInfo);
×
NEW
1767
          QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
1768
        }
1769
      }
1770
    }
1771
  }
1772

UNCOV
1773
  code = blockDataEnsureCapacity(pBlock, pBlock->info.rows);
×
UNCOV
1774
  QUERY_CHECK_CODE(code, lino, _end);
×
1775

UNCOV
1776
  for (int32_t i = 0; i < blockDataGetNumOfCols(pBlock); ++i) {
×
UNCOV
1777
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
×
UNCOV
1778
    QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
×
1779
    colDataSetNULL(pColInfoData, 0);
1780
  }
UNCOV
1781
  *ppBlock = pBlock;
×
1782

UNCOV
1783
_end:
×
UNCOV
1784
  if (code != TSDB_CODE_SUCCESS) {
×
1785
    blockDataDestroy(pBlock);
×
1786
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1787
  }
UNCOV
1788
  return code;
×
1789
}
1790

1791

1792

1793
static int extWinTsWinCompare(const void* pLeft, const void* pRight) {
25,902,745✔
1794
  int64_t ts = *(int64_t*)pLeft;
25,902,745✔
1795
  SExtWinTimeWindow* pWin = (SExtWinTimeWindow*)pRight;
25,902,745✔
1796
  if (ts < pWin->tw.skey) {
25,902,745✔
1797
    return -1;
15,801,048✔
1798
  }
1799
  if (ts >= pWin->tw.ekey) {
10,101,697✔
1800
    return 1;
7,720,441✔
1801
  }
1802

1803
  return 0;
2,381,256✔
1804
}
1805

1806

NEW
1807
static int32_t extWinGetMultiTbWinFromTs(SOperatorInfo* pOperator, SArray* pWins, int64_t* tsCol, int64_t rowNum, int32_t* startPos) {
×
NEW
1808
  int32_t idx = taosArraySearchIdx(pWins, tsCol, extWinTsWinCompare, TD_EQ);
×
UNCOV
1809
  if (idx >= 0) {
×
UNCOV
1810
    *startPos = 0;
×
UNCOV
1811
    return idx;
×
1812
  }
1813

UNCOV
1814
  SExtWinTimeWindow* pWin = NULL;
×
UNCOV
1815
  int32_t w = 0;
×
UNCOV
1816
  for (int64_t i = 1; i < rowNum; ++i) {
×
NEW
1817
    for (; w < pWins->size; ++w) {
×
NEW
1818
      pWin = TARRAY_GET_ELEM(pWins, w);
×
UNCOV
1819
      if (tsCol[i] < pWin->tw.skey) {
×
UNCOV
1820
        break;
×
1821
      }
1822
      
UNCOV
1823
      if (tsCol[i] < pWin->tw.ekey) {
×
UNCOV
1824
        *startPos = i;
×
UNCOV
1825
        return w;
×
1826
      }
1827
    }
1828
  }
1829

UNCOV
1830
  return -1;
×
1831
}
1832

UNCOV
1833
static int32_t extWinGetNoOvlpWin(SOperatorInfo* pOperator, int64_t* tsCol, int32_t* startPos, SDataBlockInfo* pInfo, SExtWinTimeWindow** ppWin, int32_t* winRows) {
×
UNCOV
1834
  SExternalWindowOperator* pExtW = pOperator->info;
×
UNCOV
1835
  if ((*startPos) >= pInfo->rows) {
×
UNCOV
1836
    qDebug("%s %s blk rowIdx %d reach the end, size: %d, skip block", 
×
1837
        GET_TASKID(pOperator->pTaskInfo), __func__, *startPos, (int32_t)pInfo->rows);
UNCOV
1838
    *ppWin = NULL;
×
UNCOV
1839
    return TSDB_CODE_SUCCESS;
×
1840
  }
1841

NEW
1842
  SExtWinCalcGrpCtx* pCCtx = pExtW->pTGrpCtx->pCCtx;
×
1843
  
NEW
1844
  if (pCCtx->blkWinIdx < 0) {
×
NEW
1845
    pCCtx->blkWinIdx = extWinGetCurWinIdx(pOperator);
×
1846
  } else {
NEW
1847
    pCCtx->blkWinIdx++;
×
1848
  }
1849

NEW
1850
  if (pCCtx->blkWinIdx >= pCCtx->pWins->size) {
×
UNCOV
1851
    qDebug("%s %s ext win blk idx %d reach the end, size: %d, skip block", 
×
1852
        GET_TASKID(pOperator->pTaskInfo), __func__, pCCtx->blkWinIdx, (int32_t)pCCtx->pWins->size);
UNCOV
1853
    *ppWin = NULL;
×
UNCOV
1854
    return TSDB_CODE_SUCCESS;
×
1855
  }
1856
  
NEW
1857
  SExtWinTimeWindow* pWin = taosArrayGet(pCCtx->pWins, pCCtx->blkWinIdx);
×
UNCOV
1858
  if (tsCol[pInfo->rows - 1] < pWin->tw.skey) {
×
UNCOV
1859
    qDebug("%s %s block end ts %" PRId64 " is small than curr win %d skey %" PRId64 ", skip block", 
×
1860
        GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[pInfo->rows - 1], pCCtx->blkWinIdx, pWin->tw.skey);
UNCOV
1861
    *ppWin = NULL;
×
UNCOV
1862
    return TSDB_CODE_SUCCESS;
×
1863
  }
1864

UNCOV
1865
  int32_t r = *startPos;
×
1866

NEW
1867
  qDebug("%s %s start to get novlp win from winIdx %d rowIdx %d", GET_TASKID(pOperator->pTaskInfo), __func__, pCCtx->blkWinIdx, r);
×
1868

1869
  // TODO handle desc order
NEW
1870
  for (; pCCtx->blkWinIdx < pCCtx->pWins->size; ++pCCtx->blkWinIdx) {
×
NEW
1871
    pWin = taosArrayGet(pCCtx->pWins, pCCtx->blkWinIdx);
×
UNCOV
1872
    for (; r < pInfo->rows; ++r) {
×
UNCOV
1873
      if (tsCol[r] < pWin->tw.skey) {
×
UNCOV
1874
        continue;
×
1875
      }
1876

UNCOV
1877
      if (tsCol[r] < pWin->tw.ekey) {
×
NEW
1878
        extWinSetCurWinIdx(pOperator, pCCtx->blkWinIdx);
×
UNCOV
1879
        *ppWin = pWin;
×
UNCOV
1880
        *startPos = r;
×
UNCOV
1881
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
×
1882

UNCOV
1883
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
×
1884
            GET_TASKID(pOperator->pTaskInfo), __func__, pCCtx->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, r, tsCol[r], tsCol[r + *winRows - 1]);
1885
        
UNCOV
1886
        return TSDB_CODE_SUCCESS;
×
1887
      }
1888

UNCOV
1889
      if (!pOperator->pTaskInfo->pStreamRuntimeInfo && tsCol[r] >= pWin->tw.ekey) {
×
NEW
1890
        extWinSetCurWinIdx(pOperator, pCCtx->blkWinIdx);
×
UNCOV
1891
        *ppWin = pWin;
×
UNCOV
1892
        *startPos = r;
×
UNCOV
1893
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
×
1894

UNCOV
1895
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk",
×
1896
               GET_TASKID(pOperator->pTaskInfo), __func__, pCCtx->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, r, tsCol[r], tsCol[r + *winRows - 1]);
1897

UNCOV
1898
        return TSDB_CODE_SUCCESS;
×
1899
      }
1900

UNCOV
1901
      break;
×
1902
    }
1903

UNCOV
1904
    if (r == pInfo->rows) {
×
UNCOV
1905
      break;
×
1906
    }
1907
  }
1908

UNCOV
1909
  qDebug("%s %s no more ext win in block, TR[%" PRId64 ", %" PRId64 "), skip it", 
×
1910
      GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1911

UNCOV
1912
  *ppWin = NULL;
×
UNCOV
1913
  return TSDB_CODE_SUCCESS;
×
1914
}
1915

1916
static int32_t extWinGetOvlpWin(SOperatorInfo* pOperator, int64_t* tsCol, int32_t* startPos, SDataBlockInfo* pInfo, SExtWinTimeWindow** ppWin, int32_t* winRows) {
2,073,857,990✔
1917
  SExternalWindowOperator* pExtW = pOperator->info;
2,073,857,990✔
1918
  SExtWinCalcGrpCtx* pCCtx = pExtW->pTGrpCtx->pCCtx;
2,073,857,990✔
1919

1920
  if (pCCtx->blkWinIdx < 0) {
2,073,857,990✔
1921
    pCCtx->blkWinIdx = pCCtx->blkWinStartIdx;
1,774,073✔
1922
  } else {
1923
    pCCtx->blkWinIdx++;
2,072,083,917✔
1924
  }
1925

1926
  if (pCCtx->blkWinIdx >= pCCtx->pWins->size) {
2,073,857,990✔
1927
    qDebug("%s %s ext win blk idx %d reach the end, size: %d, skip block", 
783,331✔
1928
        GET_TASKID(pOperator->pTaskInfo), __func__, pCCtx->blkWinIdx, (int32_t)pCCtx->pWins->size);
1929
    *ppWin = NULL;
783,331✔
1930
    return TSDB_CODE_SUCCESS;
783,331✔
1931
  }
1932
  
1933
  SExtWinTimeWindow* pWin = taosArrayGet(pCCtx->pWins, pCCtx->blkWinIdx);
2,073,074,659✔
1934
  if (tsCol[pInfo->rows - 1] < pWin->tw.skey) {
2,073,074,659✔
1935
    qDebug("%s %s block end ts %" PRId64 " is small than curr win %d skey %" PRId64 ", skip block", 
887,318✔
1936
        GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[pInfo->rows - 1], pCCtx->blkWinIdx, pWin->tw.skey);
1937
    *ppWin = NULL;
887,318✔
1938
    return TSDB_CODE_SUCCESS;
887,318✔
1939
  }
1940

1941
  int64_t r = 0;
2,072,187,341✔
1942

1943
  qDebug("%s %s start to get ovlp win from winIdx %d rowIdx %d", GET_TASKID(pOperator->pTaskInfo), __func__, pCCtx->blkWinIdx, pCCtx->blkRowStartIdx);
2,072,187,341✔
1944
  
1945
  // TODO handle desc order
1946
  for (; pCCtx->blkWinIdx < pCCtx->pWins->size; ++pCCtx->blkWinIdx) {
2,147,483,647✔
1947
    pWin = taosArrayGet(pCCtx->pWins, pCCtx->blkWinIdx);
2,147,483,647✔
1948
    for (r = pCCtx->blkRowStartIdx; r < pInfo->rows; ++r) {
2,147,483,647✔
1949
      if (tsCol[r] < pWin->tw.skey) {
2,147,483,647✔
1950
        pCCtx->blkRowStartIdx = r + 1;
2,147,483,647✔
1951
        continue;
2,147,483,647✔
1952
      }
1953

1954
      if (tsCol[r] < pWin->tw.ekey) {
2,147,483,647✔
1955
        extWinSetCurWinIdx(pOperator, pCCtx->blkWinIdx);
2,072,083,917✔
1956
        *ppWin = pWin;
2,072,083,917✔
1957
        *startPos = r;
2,072,083,917✔
1958
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
2,072,083,917✔
1959

1960
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
2,072,083,917✔
1961
            GET_TASKID(pOperator->pTaskInfo), __func__, pCCtx->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, (int32_t)r, tsCol[r], tsCol[r + *winRows - 1]);
1962
        
1963
        if ((r + *winRows) < pInfo->rows) {
2,072,083,917✔
1964
          pCCtx->blkWinStartIdx = pCCtx->blkWinIdx + 1;
2,070,659,822✔
1965
          pCCtx->blkWinStartSet = true;
2,070,659,822✔
1966
        }
1967
        
1968
        return TSDB_CODE_SUCCESS;
2,072,083,917✔
1969
      }
1970

1971
      break;
213,754,586✔
1972
    }
1973

1974
    if (r >= pInfo->rows) {
213,856,394✔
1975
      if (!pCCtx->blkWinStartSet) {
101,808✔
1976
        pCCtx->blkWinStartIdx = pCCtx->blkWinIdx;
101,808✔
1977
      }
1978
      
1979
      break;
101,808✔
1980
    }
1981
  }
1982

1983
  qDebug("%s %s no more ext win in block, TR[%" PRId64 ", %" PRId64 "), skip it", 
103,424✔
1984
      GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
1985

1986
  *ppWin = NULL;
103,424✔
1987
  return TSDB_CODE_SUCCESS;
103,424✔
1988
}
1989

1990

UNCOV
1991
static int32_t extWinGetMultiTbNoOvlpWin(SOperatorInfo* pOperator, int64_t* tsCol, int32_t* startPos, SDataBlockInfo* pInfo, SExtWinTimeWindow** ppWin, int32_t* winRows) {
×
UNCOV
1992
  SExternalWindowOperator* pExtW = pOperator->info;
×
NEW
1993
  SExtWinCalcGrpCtx* pCCtx = pExtW->pTGrpCtx->pCCtx;
×
1994

UNCOV
1995
  if ((*startPos) >= pInfo->rows) {
×
UNCOV
1996
    qDebug("%s %s blk rowIdx %d reach the end, size: %d, skip block", 
×
1997
        GET_TASKID(pOperator->pTaskInfo), __func__, *startPos, (int32_t)pInfo->rows);
UNCOV
1998
    *ppWin = NULL;
×
UNCOV
1999
    return TSDB_CODE_SUCCESS;
×
2000
  }
2001
  
NEW
2002
  if (pCCtx->blkWinIdx < 0) {
×
NEW
2003
    pCCtx->blkWinIdx = extWinGetMultiTbWinFromTs(pOperator, pCCtx->pWins, tsCol, pInfo->rows, startPos);
×
NEW
2004
    if (pCCtx->blkWinIdx < 0) {
×
UNCOV
2005
      qDebug("%s %s blk TR[%" PRId64 ", %" PRId64 ") not in any win, skip block", 
×
2006
          GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
UNCOV
2007
      *ppWin = NULL;
×
UNCOV
2008
      return TSDB_CODE_SUCCESS;
×
2009
    }
2010

NEW
2011
    extWinSetCurWinIdx(pOperator, pCCtx->blkWinIdx);
×
NEW
2012
    *ppWin = taosArrayGet(pCCtx->pWins, pCCtx->blkWinIdx);
×
UNCOV
2013
    *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, *startPos, (*ppWin)->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
×
2014

UNCOV
2015
    qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
×
2016
        GET_TASKID(pOperator->pTaskInfo), __func__, pCCtx->blkWinIdx, (*ppWin)->tw.skey, (*ppWin)->tw.ekey, *winRows, *startPos, tsCol[*startPos], tsCol[*startPos + *winRows - 1]);
2017
    
UNCOV
2018
    return TSDB_CODE_SUCCESS;
×
2019
  } else {
NEW
2020
    pCCtx->blkWinIdx++;
×
2021
  }
2022

NEW
2023
  if (pCCtx->blkWinIdx >= pCCtx->pWins->size) {
×
UNCOV
2024
    qDebug("%s %s ext win blk idx %d reach the end, size: %d, skip block", 
×
2025
        GET_TASKID(pOperator->pTaskInfo), __func__, pCCtx->blkWinIdx, (int32_t)pCCtx->pWins->size);
UNCOV
2026
    *ppWin = NULL;
×
UNCOV
2027
    return TSDB_CODE_SUCCESS;
×
2028
  }
2029
  
NEW
2030
  SExtWinTimeWindow* pWin = taosArrayGet(pCCtx->pWins, pCCtx->blkWinIdx);
×
UNCOV
2031
  if (tsCol[pInfo->rows - 1] < pWin->tw.skey) {
×
UNCOV
2032
    qDebug("%s %s block end ts %" PRId64 " is small than curr win %d skey %" PRId64 ", skip block", 
×
2033
        GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[pInfo->rows - 1], pCCtx->blkWinIdx, pWin->tw.skey);
UNCOV
2034
    *ppWin = NULL;
×
UNCOV
2035
    return TSDB_CODE_SUCCESS;
×
2036
  }
2037

UNCOV
2038
  int32_t r = *startPos;
×
2039

NEW
2040
  qDebug("%s %s start to get mnovlp win from winIdx %d rowIdx %d", GET_TASKID(pOperator->pTaskInfo), __func__, pCCtx->blkWinIdx, r);
×
2041

2042
  // TODO handle desc order
NEW
2043
  for (; pCCtx->blkWinIdx < pCCtx->pWins->size; ++pCCtx->blkWinIdx) {
×
NEW
2044
    pWin = taosArrayGet(pCCtx->pWins, pCCtx->blkWinIdx);
×
UNCOV
2045
    for (; r < pInfo->rows; ++r) {
×
UNCOV
2046
      if (tsCol[r] < pWin->tw.skey) {
×
UNCOV
2047
        continue;
×
2048
      }
2049

UNCOV
2050
      if (tsCol[r] < pWin->tw.ekey) {
×
NEW
2051
        extWinSetCurWinIdx(pOperator, pCCtx->blkWinIdx);
×
UNCOV
2052
        *ppWin = pWin;
×
UNCOV
2053
        *startPos = r;
×
UNCOV
2054
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
×
2055

UNCOV
2056
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
×
2057
            GET_TASKID(pOperator->pTaskInfo), __func__, pCCtx->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, r, tsCol[r], tsCol[r + *winRows - 1]);
2058
        
UNCOV
2059
        return TSDB_CODE_SUCCESS;
×
2060
      }
2061

UNCOV
2062
      if (!pOperator->pTaskInfo->pStreamRuntimeInfo && tsCol[r] >= pWin->tw.ekey) {
×
NEW
2063
        extWinSetCurWinIdx(pOperator, pCCtx->blkWinIdx);
×
UNCOV
2064
        *ppWin = pWin;
×
UNCOV
2065
        *startPos = r;
×
UNCOV
2066
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
×
2067

UNCOV
2068
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk",
×
2069
               GET_TASKID(pOperator->pTaskInfo), __func__, pCCtx->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, r, tsCol[r], tsCol[r + *winRows - 1]);
2070

UNCOV
2071
        return TSDB_CODE_SUCCESS;
×
2072
      }
2073

UNCOV
2074
      break;
×
2075
    }
2076

UNCOV
2077
    if (r == pInfo->rows) {
×
2078
      break;
×
2079
    }
2080
  }
2081

2082
  qDebug("%s %s no more ext win in block, TR[%" PRId64 ", %" PRId64 "), skip it", 
×
2083
      GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
2084

2085
  *ppWin = NULL;
×
2086
  return TSDB_CODE_SUCCESS;
×
2087
}
2088

2089
static int32_t extWinGetFirstWinFromTs(SOperatorInfo* pOperator, SArray* pWins, int64_t* tsCol,
2,625,597✔
2090
                                       int64_t rowNum, int32_t* startPos) {
2091
  SExtWinTimeWindow* pWin = NULL;
2,625,597✔
2092
  int32_t            idx = taosArraySearchIdx(pWins, tsCol, extWinTsWinCompare, TD_EQ);
2,625,597✔
2093
  if (idx >= 0) {
2,625,597✔
2094
    for (int i = idx - 1; i >= 0; --i) {
2,381,256✔
2095
      pWin = TARRAY_GET_ELEM(pWins, i);
1,204,458✔
2096
      if (extWinTsWinCompare(tsCol, pWin) == 0) {
1,204,458✔
2097
        idx = i;
2,020✔
2098
      } else {
2099
        break;
1,202,438✔
2100
      }
2101
    }
2102
    *startPos = 0;
2,379,236✔
2103
    return idx;
2,379,236✔
2104
  }
2105

2106
  pWin = NULL;
246,361✔
2107
  int32_t w = 0;
246,361✔
2108
  for (int64_t i = 1; i < rowNum; ++i) {
87,441,961✔
2109
    for (; w < pWins->size; ++w) {
480,311,930✔
2110
      pWin = TARRAY_GET_ELEM(pWins, w);
480,308,698✔
2111
      if (tsCol[i] < pWin->tw.skey) {
480,308,698✔
2112
        break;
87,192,368✔
2113
      }
2114

2115
      if (tsCol[i] < pWin->tw.ekey) {
393,116,330✔
2116
        *startPos = i;
213,183✔
2117
        return w;
213,183✔
2118
      }
2119
    }
2120
  }
2121

2122
  return -1;
33,178✔
2123
}
2124

2125
static int32_t extWinGetMultiTbOvlpWin(SOperatorInfo* pOperator, int64_t* tsCol, int32_t* startPos, SDataBlockInfo* pInfo, SExtWinTimeWindow** ppWin, int32_t* winRows) {
2,147,483,647✔
2126
  SExternalWindowOperator* pExtW = pOperator->info;
2,147,483,647✔
2127
  SExtWinCalcGrpCtx* pCCtx = pExtW->pTGrpCtx->pCCtx;
2,147,483,647✔
2128

2129
  if (pCCtx->blkWinIdx < 0) {
2,147,483,647✔
2130
    pCCtx->blkWinIdx = extWinGetFirstWinFromTs(pOperator, pCCtx->pWins, tsCol, pInfo->rows, startPos);
2,625,597✔
2131
    if (pCCtx->blkWinIdx < 0) {
2,625,597✔
2132
      qDebug("%s %s blk TR[%" PRId64 ", %" PRId64 ") not in any win, skip block", 
33,178✔
2133
          GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
2134
      *ppWin = NULL;
33,178✔
2135
      return TSDB_CODE_SUCCESS;
33,178✔
2136
    }
2137

2138
    extWinSetCurWinIdx(pOperator, pCCtx->blkWinIdx);
2,592,419✔
2139
    *ppWin = taosArrayGet(pCCtx->pWins, pCCtx->blkWinIdx);
2,592,419✔
2140
    *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, *startPos, (*ppWin)->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
2,592,419✔
2141
    
2142
    qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
2,592,419✔
2143
        GET_TASKID(pOperator->pTaskInfo), __func__, pCCtx->blkWinIdx, (*ppWin)->tw.skey, (*ppWin)->tw.ekey, *winRows, *startPos, tsCol[*startPos], tsCol[*startPos + *winRows - 1]);
2144
    
2145
    return TSDB_CODE_SUCCESS;
2,592,419✔
2146
  } else {
2147
    pCCtx->blkWinIdx++;
2,147,483,647✔
2148
  }
2149

2150
  if (pCCtx->blkWinIdx >= pCCtx->pWins->size) {
2,147,483,647✔
2151
    qDebug("%s %s ext win blk idx %d reach the end, size: %d, skip block", 
1,048,586✔
2152
        GET_TASKID(pOperator->pTaskInfo), __func__, pCCtx->blkWinIdx, (int32_t)pCCtx->pWins->size);
2153
    *ppWin = NULL;
1,048,586✔
2154
    return TSDB_CODE_SUCCESS;
1,048,586✔
2155
  }
2156
  
2157
  SExtWinTimeWindow* pWin = taosArrayGet(pCCtx->pWins, pCCtx->blkWinIdx);
2,147,483,647✔
2158
  if (tsCol[pInfo->rows - 1] < pWin->tw.skey) {
2,147,483,647✔
2159
    qDebug("%s %s block end ts %" PRId64 " is small than curr win %d skey %" PRId64 ", skip block", 
1,538,965✔
2160
        GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[pInfo->rows - 1], pCCtx->blkWinIdx, pWin->tw.skey);
2161
    *ppWin = NULL;
1,538,965✔
2162
    return TSDB_CODE_SUCCESS;
1,538,965✔
2163
  }
2164

2165
  int64_t r = 0;
2,147,483,647✔
2166

2167
  qDebug("%s %s start to get movlp win from winIdx %d rowIdx %d", GET_TASKID(pOperator->pTaskInfo), __func__, pCCtx->blkWinIdx, pCCtx->blkRowStartIdx);
2,147,483,647✔
2168

2169
  // TODO handle desc order
2170
  for (; pCCtx->blkWinIdx < pCCtx->pWins->size; ++pCCtx->blkWinIdx) {
2,147,483,647✔
2171
    pWin = taosArrayGet(pCCtx->pWins, pCCtx->blkWinIdx);
2,147,483,647✔
2172
    for (r = pCCtx->blkRowStartIdx; r < pInfo->rows; ++r) {
2,147,483,647✔
2173
      if (tsCol[r] < pWin->tw.skey) {
2,147,483,647✔
2174
        pCCtx->blkRowStartIdx = r + 1;
2,147,483,647✔
2175
        continue;
2,147,483,647✔
2176
      }
2177

2178
      if (tsCol[r] < pWin->tw.ekey) {
2,147,483,647✔
2179
        extWinSetCurWinIdx(pOperator, pCCtx->blkWinIdx);
2,147,483,647✔
2180
        *ppWin = pWin;
2,147,483,647✔
2181
        *startPos = r;
2,147,483,647✔
2182
        *winRows = getNumOfRowsInTimeWindow(pInfo, tsCol, r, pWin->tw.ekey - 1, binarySearchForKey, NULL, pExtW->binfo.inputTsOrder);
2,147,483,647✔
2183

2184
        qDebug("%s %s the %dth ext win TR[%" PRId64 ", %" PRId64 ") got %d rows rowStartidx %d ts[%" PRId64 ", %" PRId64 "] in blk", 
2,147,483,647✔
2185
            GET_TASKID(pOperator->pTaskInfo), __func__, pCCtx->blkWinIdx, pWin->tw.skey, pWin->tw.ekey, *winRows, (int32_t)r, tsCol[r], tsCol[r + *winRows - 1]);
2186
        
2187
        return TSDB_CODE_SUCCESS;
2,147,483,647✔
2188
      }
2189

2190
      break;
218,842,969✔
2191
    }
2192

2193
    if (r >= pInfo->rows) {
218,846,221✔
2194
      break;
3,252✔
2195
    }
2196
  }
2197

2198
  qDebug("%s %s no more ext win in block, TR[%" PRId64 ", %" PRId64 "), skip it", 
4,868✔
2199
      GET_TASKID(pOperator->pTaskInfo), __func__, tsCol[0], tsCol[pInfo->rows - 1]);
2200

2201
  *ppWin = NULL;
4,868✔
2202
  return TSDB_CODE_SUCCESS;
4,868✔
2203
}
2204

2205

2206
static int32_t extWinGetResultRow(SExecTaskInfo* pTaskInfo, SExternalWindowOperator* pExtW, int32_t winIdx, int32_t resultRowSize, SResultRow** ppRes) {
2,147,483,647✔
2207
  int32_t code = 0, lino = 0;
2,147,483,647✔
2208
  SExtWinResultRows* pRows = &pExtW->resultRows;
2,147,483,647✔
2209
  while (true) {
2210
    if (pRows->pResultRows[pRows->resRowsIdx] && pRows->resRowIdx < pRows->resRowSize) {
2,147,483,647✔
2211
      break;
2,147,483,647✔
2212
    }
2213

2214
    if (NULL == pRows->pResultRows[pRows->resRowsIdx]) {
1,232,056✔
2215
      pRows->pResultRows[pRows->resRowsIdx] = taosMemoryMalloc(pRows->resRowSize * resultRowSize);
1,202,489✔
2216
      TSDB_CHECK_NULL(pRows->pResultRows[pRows->resRowsIdx], code, lino, _exit, terrno);
1,202,489✔
2217
      pRows->resRowAllcNum += pRows->resRowSize;
1,202,489✔
2218
      continue;
1,202,489✔
2219
    }
2220

2221
    // pRows->resRowIdx >= pRows->resRowSize
2222
    
2223
    pRows->resRowIdx = 0;
29,567✔
2224
    pRows->resRowsIdx++;
29,567✔
2225

2226
    if (pRows->resRowsIdx >= pRows->resRowsSize) {
29,567✔
NEW
2227
      int32_t oldSize = pRows->resRowsSize;
×
NEW
2228
      pRows->resRowsSize += EXT_WIN_RES_ROWS_ALLOC_SIZE;
×
NEW
2229
      pRows->pResultRows = taosMemoryRealloc(pRows->pResultRows, pRows->resRowsSize * POINTER_BYTES);
×
NEW
2230
      TSDB_CHECK_NULL(pRows->pResultRows, code, lino, _exit, terrno);
×
NEW
2231
      memset(pRows->pResultRows + oldSize, 0, (pRows->resRowsSize - oldSize) * POINTER_BYTES);
×
2232
    }
2233
  }
2234

2235
  pExtW->pTGrpCtx->pCCtx->outWinTotalNum++;
2,147,483,647✔
2236
  //TSDB_CHECK_NULL(taosArrayPush(pExtW->pTGrpCtx->outWinBufIdx, &winIdx), code, lino, _exit, terrno);
2237

2238
  *ppRes = (SResultRow*)((char*)pRows->pResultRows[pRows->resRowsIdx] + pRows->resRowIdx++ * resultRowSize);
2,147,483,647✔
2239

2240
_exit:
2,147,483,647✔
2241

2242
  if (code) {
2,147,483,647✔
NEW
2243
    qError("%s %s failed at line %d since %s", GET_TASKID(pTaskInfo), __func__, lino, tstrerror(code));
×
2244
  }
2245

2246
  return code;
2,147,483,647✔
2247
}
2248

2249
static FORCE_INLINE SResultRow* extWinGetResultRowByIdx(SExternalWindowOperator* pExtW, int32_t resWinIdx,
2250
                                                         int32_t resultRowSize) {
2251
  SExtWinResultRows* pRows = &pExtW->resultRows;
1,626,976,181✔
2252
  int32_t            resRowsIdx = resWinIdx / pRows->resRowSize;
1,626,976,181✔
2253
  if (resRowsIdx >= pRows->resRowsSize || pRows->pResultRows[resRowsIdx] == NULL) {
1,626,976,181✔
NEW
2254
    return NULL;
×
2255
  }
2256

2257
  return (SResultRow*)((char*)pRows->pResultRows[resRowsIdx] + (resWinIdx % pRows->resRowSize) * resultRowSize);
1,626,976,181✔
2258
}
2259

2260
int32_t extWinInitResRows(SExternalWindowOperator* pExtW, SExecTaskInfo* pTaskInfo) {
1,395,598✔
2261
  int32_t code = 0, lino = 0;
1,395,598✔
2262
  SExtWinResultRows* pRows = &pExtW->resultRows;
1,395,598✔
2263

2264
  pRows->resRowsSize = EXT_WIN_RES_ROWS_ALLOC_SIZE;
1,395,598✔
2265
  pRows->resRowSize = 4096;
1,395,598✔
2266
  pRows->pResultRows = taosMemoryCalloc(pRows->resRowsSize, POINTER_BYTES);
1,395,598✔
2267
  TSDB_CHECK_NULL(pRows->pResultRows, code, lino, _exit, terrno);
1,395,598✔
2268

2269
_exit:
1,395,598✔
2270

2271
  if (code) {
1,395,598✔
NEW
2272
    qError("%s %s failed at line %d since %s", GET_TASKID(pTaskInfo), __func__, lino, tstrerror(code));
×
2273
  }
2274

2275
  return code;
1,395,598✔
2276
}
2277

2278

2279
static int32_t extWinAggSetWinOutputBuf(SOperatorInfo* pOperator, SExtWinTimeWindow* win, SExprSupp* pSupp, 
2,147,483,647✔
2280
                                     SAggSupporter* pAggSup, SExecTaskInfo* pTaskInfo) {
2281
  int32_t code = 0, lino = 0;
2,147,483,647✔
2282
  SResultRow* pResultRow = NULL;
2,147,483,647✔
2283
  SExternalWindowOperator* pExtW = (SExternalWindowOperator*)pOperator->info;
2,147,483,647✔
2284

2285
  if (win->resWinIdx >= 0) {
2,147,483,647✔
2286
    pResultRow = extWinGetResultRowByIdx(pExtW, win->resWinIdx, pAggSup->resultRowSize);
1,626,976,181✔
2287
    TSDB_CHECK_NULL(pResultRow, code, lino, _exit, TSDB_CODE_INVALID_PARA);
1,626,976,181✔
2288
  } else {
2289
    win->resWinIdx = pExtW->resWinIdx++;
2,147,483,647✔
2290
    
2291
    qDebug("set window [%" PRId64 ", %" PRId64 "] outIdx:%d", win->tw.skey, win->tw.ekey, win->resWinIdx);
2,147,483,647✔
2292

2293
    TAOS_CHECK_EXIT(extWinGetResultRow(pTaskInfo, pExtW, win->resWinIdx, pAggSup->resultRowSize, &pResultRow));
2,147,483,647✔
2294
    
2295
    memset(pResultRow, 0, pAggSup->resultRowSize);
2,147,483,647✔
2296

2297
    pResultRow->winIdx = extWinGetCurWinIdx(pOperator);
2,147,483,647✔
2298
    TAOS_SET_POBJ_ALIGNED(&pResultRow->win, &win->tw);
2,147,483,647✔
2299
  }
2300

2301
  // set time window for current result
2302
  TAOS_CHECK_EXIT(setResultRowInitCtx(pResultRow, pSupp->pCtx, pSupp->numOfExprs, pSupp->rowEntryInfoOffset));
2,147,483,647✔
2303

2304
_exit:
2,147,483,647✔
2305
  
2306
  if (code) {
2,147,483,647✔
2307
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2308
  }
2309

2310
  return code;
2,147,483,647✔
2311
}
2312

2313
static int32_t extWinAggDo(SOperatorInfo* pOperator, int32_t startPos, int32_t forwardRows,
2,147,483,647✔
2314
                                  SSDataBlock* pInputBlock) {
2315
  if (pOperator->pTaskInfo->pStreamRuntimeInfo && forwardRows == 0) {
2,147,483,647✔
2316
    return TSDB_CODE_SUCCESS;
×
2317
  }
2318

2319
  SExprSupp*               pSup = &pOperator->exprSupp;
2,147,483,647✔
2320
  SExternalWindowOperator* pExtW = pOperator->info;
2,147,483,647✔
2321
  return applyAggFunctionOnPartialTuples(pOperator->pTaskInfo, pSup->pCtx, &pExtW->twAggSup.timeWindowData, startPos,
2,147,483,647✔
2322
                                         forwardRows, pInputBlock->info.rows, pSup->numOfExprs);
2,147,483,647✔
2323

2324
}
2325

2326
static bool extWinLastWinClosed(SExternalWindowOperator* pExtW) {
10,177,972✔
2327
  if (pExtW->resWinIdx <= 0 || (pExtW->multiTableMode && !pExtW->inputHasOrder)) {
10,177,972✔
2328
    return false;
4,087,268✔
2329
  }
2330

2331
  if (pExtW->pTGrpCtx == NULL || pExtW->pTGrpCtx->pCCtx == NULL || pExtW->pTGrpCtx->pCCtx->lastWinIdx < 0) {
6,090,704✔
2332
    return false;
2,424✔
2333
  }
2334

2335
  if (NULL == pExtW->timeRangeExpr || !pExtW->timeRangeExpr->needCalc) {
6,088,280✔
2336
    return true;
6,088,280✔
2337
  }
2338

NEW
2339
  SList* pList = taosArrayGetP(pExtW->pOutputBlocks, pExtW->pTGrpCtx->pCCtx->lastWinIdx);
×
NEW
2340
  if (pList == NULL) {
×
NEW
2341
    return false;
×
2342
  }
2343
  if (0 == listNEles(pList)) {
×
2344
    return true;
×
2345
  }
2346

2347
  SListNode* pNode = listTail(pList);
×
2348
  SArray* pBlkWinIdx = *((SArray**)pNode->data + 1);
×
2349
  int64_t* pIdx = taosArrayGetLast(pBlkWinIdx);
×
NEW
2350
  if (pIdx && *(int32_t*)pIdx < pExtW->pTGrpCtx->pCCtx->blkWinStartIdx) {
×
2351
    return true;
×
2352
  }
2353

2354
  return false;
×
2355
}
2356

2357
static SList** extWinReserveGetBlockList(SExternalWindowOperator* pExtW, SArray* pOutputBlocks, int32_t winIdx) {
4,089,692✔
2358
  SList** ppList = NULL;
4,089,692✔
2359
  if (taosArrayGetSize(pOutputBlocks) > winIdx) {
4,089,692✔
NEW
2360
    ppList = taosArrayGet(pOutputBlocks, winIdx);
×
NEW
2361
    extWinRecycleBlockList(pExtW, ppList);
×
2362
  } else {
2363
    ppList = taosArrayReserve(pOutputBlocks, 1);
4,089,692✔
2364
  }
2365

2366
  return ppList;
4,089,692✔
2367
}
2368

2369
static int32_t extWinEnsureBlockList(SExternalWindowOperator* pExtW, int32_t winIdx, SList** ppList) {
6,090,704✔
2370
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
6,090,704✔
2371

2372
  *ppList = taosArrayGetP(pExtW->pOutputBlocks, winIdx);
6,090,704✔
2373
  if (*ppList != NULL) {
6,090,704✔
2374
    return code;
6,090,704✔
2375
  }
2376

NEW
2377
  SList** ppSlot = extWinReserveGetBlockList(pExtW, pExtW->pOutputBlocks, winIdx);
×
NEW
2378
  TSDB_CHECK_NULL(ppSlot, code, lino, _exit, terrno);
×
2379

NEW
2380
  *ppList = tdListNew(POINTER_BYTES * 2);
×
NEW
2381
  TSDB_CHECK_NULL(*ppList, code, lino, _exit, terrno);
×
2382

NEW
2383
  *ppSlot = *ppList;
×
2384

NEW
2385
_exit:
×
NEW
2386
  return code;
×
2387
}
2388

2389
static int32_t extWinGetWinResBlock(SOperatorInfo* pOperator, int32_t rows, SExtWinTimeWindow* pWin, SSDataBlock** ppRes, SArray** ppIdx) {
10,180,396✔
2390
  SExternalWindowOperator* pExtW = pOperator->info;
10,180,396✔
2391
  SList*                   pList = NULL;
10,180,396✔
2392
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
10,180,396✔
2393
  
2394
  if (pWin->resWinIdx >= 0) {
10,180,396✔
2395
    TAOS_CHECK_EXIT(extWinEnsureBlockList(pExtW, pWin->resWinIdx, &pList));
2,424✔
2396
  } else {
2397
    if (extWinLastWinClosed(pExtW)) {
10,177,972✔
2398
      pWin->resWinIdx = pExtW->pTGrpCtx->pCCtx->lastWinIdx;
6,088,280✔
2399
      TAOS_CHECK_EXIT(extWinEnsureBlockList(pExtW, pWin->resWinIdx, &pList));
6,088,280✔
2400
    } else {
2401
      pWin->resWinIdx = pExtW->resWinIdx++;
4,089,692✔
2402
      pList = tdListNew(POINTER_BYTES * 2);
4,089,692✔
2403
      TSDB_CHECK_NULL(pList, code, lino, _exit, terrno);
4,089,692✔
2404
      SList** ppList = extWinReserveGetBlockList(pExtW, pExtW->pOutputBlocks, pWin->resWinIdx);
4,089,692✔
2405
      TSDB_CHECK_NULL(ppList, code, lino, _exit, terrno);
4,089,692✔
2406
      *ppList = pList;
4,089,692✔
2407
    }
2408
  }
2409

2410
  pExtW->pTGrpCtx->pCCtx->lastWinIdx = pWin->resWinIdx;
10,180,396✔
2411
  
2412
  TAOS_CHECK_EXIT(extWinGetLastBlockFromList(pOperator, pExtW, pList, rows, ppRes, ppIdx));
10,180,396✔
2413

2414
_exit:
10,180,396✔
2415

2416
  if (code) {
10,180,396✔
2417
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
2418
  }
2419

2420
  return code;
10,180,396✔
2421
}
2422

2423
static int32_t extWinProjectDo(SOperatorInfo* pOperator, SSDataBlock* pInputBlock, int32_t startPos, int32_t rows, SExtWinTimeWindow* pWin) {
10,180,396✔
2424
  SExternalWindowOperator* pExtW = pOperator->info;
10,180,396✔
2425
  SExprSupp*               pExprSup = &pExtW->scalarSupp;
10,180,396✔
2426
  SSDataBlock*             pResBlock = NULL;
10,180,396✔
2427
  SArray*                  pIdx = NULL;
10,180,396✔
2428
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
10,180,396✔
2429
  
2430
  TAOS_CHECK_EXIT(extWinGetWinResBlock(pOperator, rows, pWin, &pResBlock, &pIdx));
10,180,396✔
2431

2432
  qDebug("%s %s win[%" PRId64 ", %" PRId64 "] got res block %p winRowIdx %p, winOutIdx:%d, capacity:%d", 
10,180,396✔
2433
      pOperator->pTaskInfo->id.str, __func__, pWin->tw.skey, pWin->tw.ekey, pResBlock, pIdx, pWin->resWinIdx, pResBlock->info.capacity);
2434
  
2435
  if (!pExtW->pTmpBlock) {
10,180,396✔
2436
    TAOS_CHECK_EXIT(createOneDataBlock(pInputBlock, false, &pExtW->pTmpBlock));
26,260✔
2437
  } else {
2438
    blockDataCleanup(pExtW->pTmpBlock);
10,154,136✔
2439
  }
2440
  
2441
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pExtW->pTmpBlock, TMAX(1, rows)));
10,180,396✔
2442

2443
  qDebug("%s %s start to copy %d rows to tmp blk", pOperator->pTaskInfo->id.str, __func__, rows);
10,180,396✔
2444
  TAOS_CHECK_EXIT(blockDataMergeNRows(pExtW->pTmpBlock, pInputBlock, startPos, rows));
10,180,396✔
2445

2446
  qDebug("%s %s start to apply project to tmp blk", pOperator->pTaskInfo->id.str, __func__);
10,180,396✔
2447
  TAOS_CHECK_EXIT(projectApplyFunctionsWithSelect(pExprSup->pExprInfo, pResBlock, pExtW->pTmpBlock, pExprSup->pCtx,
10,180,396✔
2448
                                                  pExprSup->numOfExprs, NULL, GET_STM_RTINFO(pOperator->pTaskInfo),
2449
                                                  true, pExprSup->hasIndefRowsFunc));
2450

2451
  // propagate both ids so downstream (e.g., non-agg output/filters) can align with trigger group
2452
  pResBlock->info.id.groupId = pInputBlock->info.id.groupId;
10,176,356✔
2453
  pResBlock->info.id.baseGId = pInputBlock->info.id.baseGId;
10,180,396✔
2454

2455
  TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pIdx, pResBlock, extWinGetCurWinIdx(pOperator), rows));
10,179,992✔
2456

2457
_exit:
10,178,780✔
2458

2459
  if (code) {
10,173,932✔
2460
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
2461
  } else {
2462
    qDebug("%s %s project succeed", pOperator->pTaskInfo->id.str, __func__);
10,173,932✔
2463
  }
2464
  
2465
  return code;
10,179,588✔
2466
}
2467

2468
static int32_t extWinProjectOpen(SOperatorInfo* pOperator, SSDataBlock* pInputBlock) {
34,744✔
2469
  SExternalWindowOperator* pExtW = pOperator->info;
34,744✔
2470
  int64_t*                 tsCol = extWinExtractTsCol(pInputBlock, pExtW->primaryTsIndex, pOperator->pTaskInfo);
34,744✔
2471
  SExtWinTimeWindow*       pWin = NULL;
34,744✔
2472
  bool                     ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
34,744✔
2473
  int32_t                  startPos = 0, winRows = 0;
34,744✔
2474
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
34,744✔
2475
  
2476
  while (true) {
2477
    TAOS_CHECK_EXIT((*pExtW->getWinFp)(pOperator, tsCol, &startPos, &pInputBlock->info, &pWin, &winRows));
10,215,140✔
2478
    if (pWin == NULL) {
10,215,140✔
2479
      break;
34,744✔
2480
    }
2481

2482
    qDebug("%s ext window [%" PRId64 ", %" PRId64 ") tgrp %" PRId64 " project start, ascScan:%d, startPos:%d, winRows:%d",
10,180,396✔
2483
           GET_TASKID(pOperator->pTaskInfo), pWin->tw.skey, pWin->tw.ekey, pExtW->lastTGrpId, ascScan, startPos, winRows);        
2484
    
2485
    TAOS_CHECK_EXIT(extWinProjectDo(pOperator, pInputBlock, startPos, winRows, pWin));
10,180,396✔
2486
    
2487
    startPos += winRows;
10,180,396✔
2488
  }
2489
  
2490
_exit:
34,744✔
2491

2492
  if (code) {
34,744✔
2493
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
2494
  }
2495

2496
  return code;
34,744✔
2497
}
2498

2499
static int32_t extWinIndefRowsDoImpl(SOperatorInfo* pOperator, SSDataBlock* pRes, SSDataBlock* pBlock) {
×
2500
  SExternalWindowOperator* pExtW = pOperator->info;
×
2501
  SOptrBasicInfo*     pInfo = &pExtW->binfo;
×
2502
  SExprSupp*          pSup = &pOperator->exprSupp;
×
2503
  SExecTaskInfo*      pTaskInfo = pOperator->pTaskInfo;
×
2504
  int32_t order = pInfo->inputTsOrder;
×
2505
  int32_t scanFlag = pBlock->info.scanFlag;
×
2506
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
×
2507

2508
  SExprSupp* pScalarSup = &pExtW->scalarSupp;
×
2509
  if (pScalarSup->pExprInfo != NULL) {
×
2510
    TAOS_CHECK_EXIT(projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
×
2511
                                 pExtW->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo)));
2512
  }
2513

2514
  TAOS_CHECK_EXIT(setInputDataBlock(pSup, pBlock, order, scanFlag, false));
×
2515

2516
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pRes, pRes->info.rows + pBlock->info.rows));
×
2517

2518
  TAOS_CHECK_EXIT(projectApplyFunctions(pSup->pExprInfo, pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
×
2519
                               pExtW->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo)));
2520

2521
_exit:
×
2522

2523
  if (code) {
×
2524
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __FUNCTION__, lino, tstrerror(code));
×
2525
  }
2526

2527
  return code;
×
2528
}
2529

2530
static int32_t extWinIndefRowsSetWinOutputBuf(SExternalWindowOperator* pExtW, SExtWinTimeWindow* win, SExprSupp* pSupp, 
×
2531
                                     SAggSupporter* pAggSup, SExecTaskInfo* pTaskInfo, bool reset) {
2532
  int32_t code = 0, lino = 0;
×
2533
  SResultRow* pResultRow = NULL;
×
2534

NEW
2535
  TAOS_CHECK_EXIT(extWinGetResultRow(pTaskInfo, pExtW, win->resWinIdx, pAggSup->resultRowSize, &pResultRow));
×
2536
  
NEW
2537
  qDebug("set indefRows tgrp %" PRIu64 " window [%" PRId64 ", %" PRId64 "] outIdx:%d", pExtW->lastTGrpId, win->tw.skey, win->tw.ekey, win->resWinIdx);
×
2538

2539
  if (reset) {
×
2540
    memset(pResultRow, 0, pAggSup->resultRowSize);
×
2541
    for (int32_t k = 0; k < pSupp->numOfExprs; ++k) {
×
2542
      SqlFunctionCtx* pCtx = &pSupp->pCtx[k];
×
2543
      pCtx->pOutput = NULL;
×
2544
    }
2545
  }
2546

2547
  TAOS_SET_POBJ_ALIGNED(&pResultRow->win, &win->tw);
×
2548

2549
  // set time window for current result
2550
  TAOS_CHECK_EXIT(setResultRowInitCtx(pResultRow, pSupp->pCtx, pSupp->numOfExprs, pSupp->rowEntryInfoOffset));
×
2551

2552
_exit:
×
2553
  
2554
  if (code) {
×
2555
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2556
  }
2557

2558
  return code;
×
2559
}
2560

2561
static int32_t extWinGetSetWinResBlockBuf(SOperatorInfo* pOperator, int32_t rows, SExtWinTimeWindow* pWin, SSDataBlock** ppRes, SArray** ppIdx) {
×
2562
  SExternalWindowOperator* pExtW = pOperator->info;
×
2563
  SList*                   pList = NULL;
×
2564
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
×
2565
  
NEW
2566
  if (pWin->resWinIdx >= 0) {
×
NEW
2567
    TAOS_CHECK_EXIT(extWinEnsureBlockList(pExtW, pWin->resWinIdx, &pList));
×
UNCOV
2568
    TAOS_CHECK_EXIT(extWinIndefRowsSetWinOutputBuf(pExtW, pWin, &pOperator->exprSupp, &pExtW->aggSup, pOperator->pTaskInfo, false));
×
2569
  } else {
2570
    if (extWinLastWinClosed(pExtW)) {
×
NEW
2571
      pWin->resWinIdx = pExtW->pTGrpCtx->pCCtx->lastWinIdx;
×
NEW
2572
      TAOS_CHECK_EXIT(extWinEnsureBlockList(pExtW, pWin->resWinIdx, &pList));
×
2573
    } else {
NEW
2574
      pWin->resWinIdx = pExtW->resWinIdx++;
×
2575
      pList = tdListNew(POINTER_BYTES * 2);
×
2576
      TSDB_CHECK_NULL(pList, code, lino, _exit, terrno);
×
NEW
2577
      SList** ppList = extWinReserveGetBlockList(pExtW, pExtW->pOutputBlocks, pWin->resWinIdx);
×
NEW
2578
      TSDB_CHECK_NULL(ppList, code, lino, _exit, terrno);
×
UNCOV
2579
      *ppList = pList;
×
2580
    }
2581
    TAOS_CHECK_EXIT(extWinIndefRowsSetWinOutputBuf(pExtW, pWin, &pOperator->exprSupp, &pExtW->aggSup, pOperator->pTaskInfo, true));
×
2582
  }
2583

NEW
2584
  pExtW->pTGrpCtx->pCCtx->lastWinIdx = pWin->resWinIdx;
×
2585
  
NEW
2586
  TAOS_CHECK_EXIT(extWinGetLastBlockFromList(pOperator, pExtW, pList, rows, ppRes, ppIdx));
×
2587

2588
_exit:
×
2589

2590
  if (code) {
×
2591
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
2592
  }
2593

2594
  return code;
×
2595
}
2596

2597

2598
static int32_t extWinIndefRowsDo(SOperatorInfo* pOperator, SSDataBlock* pInputBlock, int32_t startPos, int32_t rows, SExtWinTimeWindow* pWin) {
×
2599
  SExternalWindowOperator* pExtW = pOperator->info;
×
2600
  SSDataBlock*             pResBlock = NULL;
×
2601
  SArray*                  pIdx = NULL;
×
2602
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
×
2603
  
2604
  TAOS_CHECK_EXIT(extWinGetSetWinResBlockBuf(pOperator, rows, pWin, &pResBlock, &pIdx));
×
2605
  
2606
  if (!pExtW->pTmpBlock) {
×
2607
    TAOS_CHECK_EXIT(createOneDataBlock(pInputBlock, false, &pExtW->pTmpBlock));
×
2608
  } else {
2609
    blockDataCleanup(pExtW->pTmpBlock);
×
2610
  }
2611
  
2612
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pExtW->pTmpBlock, TMAX(1, rows)));
×
2613

2614
  TAOS_CHECK_EXIT(blockDataMergeNRows(pExtW->pTmpBlock, pInputBlock, startPos, rows));
×
2615
  TAOS_CHECK_EXIT(extWinIndefRowsDoImpl(pOperator, pResBlock, pExtW->pTmpBlock));
×
2616

NEW
2617
  pResBlock->info.id.groupId = pInputBlock->info.id.groupId;
×
2618

NEW
2619
  TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pIdx, pResBlock, extWinGetCurWinIdx(pOperator), rows));
×
2620

2621
_exit:
×
2622

2623
  if (code) {
×
2624
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
2625
  }
2626
  
2627
  return code;
×
2628
}
2629

2630

2631
static int32_t extWinIndefRowsOpen(SOperatorInfo* pOperator, SSDataBlock* pInputBlock) {
×
2632
  SExternalWindowOperator* pExtW = pOperator->info;
×
2633
  int64_t*                 tsCol = extWinExtractTsCol(pInputBlock, pExtW->primaryTsIndex, pOperator->pTaskInfo);
×
2634
  SExtWinTimeWindow*       pWin = NULL;
×
2635
  bool                     ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
×
2636
  int32_t                  startPos = 0, winRows = 0;
×
2637
  int32_t                  code = TSDB_CODE_SUCCESS, lino = 0;
×
2638
  
2639
  while (true) {
2640
    TAOS_CHECK_EXIT((*pExtW->getWinFp)(pOperator, tsCol, &startPos, &pInputBlock->info, &pWin, &winRows));
×
2641
    if (pWin == NULL) {
×
2642
      break;
×
2643
    }
2644

NEW
2645
    qDebug("%s ext window [%" PRId64 ", %" PRId64 ") tgrp %" PRId64 " indefRows start, ascScan:%d, startPos:%d, winRows:%d",
×
2646
           GET_TASKID(pOperator->pTaskInfo), pWin->tw.skey, pWin->tw.ekey, pExtW->lastTGrpId, ascScan, startPos, winRows);        
2647
    
2648
    TAOS_CHECK_EXIT(extWinIndefRowsDo(pOperator, pInputBlock, startPos, winRows, pWin));
×
2649
    
2650
    startPos += winRows;
×
2651
  }
2652
  
2653
_exit:
×
2654

2655
  if (code) {
×
2656
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
2657
  }
2658

2659
  return code;
×
2660
}
2661

2662

2663
static int32_t extWinNonAggOutputSingleGrpRes(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW, SSDataBlock** ppRes) {
4,050,504✔
2664
  SExtWinCalcGrpCtx*  pCCtx = pExtW->pTGrpCtx->pCCtx;
4,050,504✔
2665
  int32_t         numOfWin = taosArrayGetSize(pCCtx->pWins);
4,059,392✔
2666
  int32_t         code = TSDB_CODE_SUCCESS;
4,058,988✔
2667
  int32_t         lino = 0;
4,058,988✔
2668
  SSDataBlock*    pRes = NULL;
4,058,988✔
2669

2670
  for (; pCCtx->outWinIdx < numOfWin && pCCtx->outWinLastIdx < pCCtx->lastWinIdx; pCCtx->outWinIdx += 1, extWinIncCurWinOutIdx(pOperator)) {
4,058,988✔
2671
    SExtWinTimeWindow* pWin = TARRAY_GET_ELEM(pCCtx->pWins, pCCtx->outWinIdx);
4,051,312✔
2672
    if (pWin->resWinIdx < 0 || pWin->resWinIdx == pCCtx->outWinLastIdx) {
4,051,312✔
NEW
2673
      continue;
×
2674
    }
2675

2676
    pCCtx->outWinLastIdx = pWin->resWinIdx;
4,051,312✔
2677
    SList* pList = taosArrayGetP(pExtW->pOutputBlocks, pWin->resWinIdx);
4,051,312✔
2678
    if (listNEles(pList) <= 0) {
4,050,908✔
2679
      continue;
×
2680
    }
2681

2682
    SListNode* pNode = tdListPopHead(pList);
4,051,716✔
2683
    pRes = *(SSDataBlock**)pNode->data;
4,051,716✔
2684
    pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamBlkWinIdx = *(SArray**)((SArray**)pNode->data + 1);
4,051,716✔
2685
    pExtW->pLastBlkNode = pNode;
4,051,716✔
2686

2687
    if (listNEles(pList) <= 0) {
4,051,716✔
2688
      pCCtx->outWinIdx++;
4,051,716✔
2689
      extWinIncCurWinOutIdx(pOperator);
4,051,716✔
2690
    }
2691

2692
    break;
4,051,312✔
2693
  }
2694

2695
_exit:
7,676✔
2696

2697
  *ppRes = pRes;
4,058,988✔
2698

2699
  if (code != TSDB_CODE_SUCCESS) {
4,058,988✔
NEW
2700
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2701
  }
2702
  
2703
  return code;
4,058,988✔
2704
}
2705

2706
static int32_t extWinNonAggOutputMultiGrpRes(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW, SSDataBlock** ppRes) {
4,071,108✔
2707
  int32_t         code = TSDB_CODE_SUCCESS;
4,071,108✔
2708
  int32_t         lino = 0;
4,071,108✔
2709
  SStreamRuntimeFuncInfo* pStream = &pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo;
4,071,108✔
2710
  SSDataBlock*    pBlock = NULL;
4,071,108✔
2711

2712
  if (pStream->curGrpCalc) {
4,071,108✔
2713
    TAOS_CHECK_EXIT(extWinNonAggOutputSingleGrpRes(pOperator, pExtW, &pBlock));
4,051,716✔
2714
  }
2715
  if (pBlock == NULL || pBlock->info.rows == 0) {
4,070,704✔
2716
    pStream->curGrpCalc = tSimpleHashIterate(pStream->pGroupCalcInfos, pStream->curGrpCalc, &pExtW->lastOutputIter);
27,068✔
2717
    while (pStream->curGrpCalc != NULL) {
50,096✔
2718
      if (pStream->curGrpCalc->pRunnerGrpCtx) {
30,704✔
2719
        pExtW->lastTGrpId = *(uint64_t*)tSimpleHashGetKey(pStream->curGrpCalc, NULL);
15,352✔
2720
        pExtW->pTGrpCtx = pStream->curGrpCalc->pRunnerGrpCtx;
7,676✔
2721
        pExtW->ownTGrpCtx = false;
7,676✔
2722
        
2723
        TAOS_CHECK_EXIT(extWinNonAggOutputSingleGrpRes(pOperator, pExtW, &pBlock));
7,676✔
2724
        if (pBlock != NULL && pBlock->info.rows > 0) {
7,676✔
2725
          break;
7,676✔
2726
        }
2727
      }
2728
      
2729
      pStream->curGrpCalc = tSimpleHashIterate(pStream->pGroupCalcInfos, pStream->curGrpCalc, &pExtW->lastOutputIter);
23,028✔
2730
    }
2731
  }
2732

2733
  extWinPostUpdateStreamRt(pStream, pOperator, pExtW);
4,070,704✔
2734

2735
_exit:
4,071,108✔
2736

2737
  *ppRes = pBlock;
4,071,108✔
2738

2739
  if (code != TSDB_CODE_SUCCESS) {
4,071,108✔
NEW
2740
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2741
  }
2742

2743
  return code;
4,071,108✔
2744
}
2745

2746

2747
static int32_t extWinNonAggOutputRes(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
4,139,384✔
2748
  SExternalWindowOperator* pExtW = pOperator->info;
4,139,384✔
2749
  int32_t                  numOfWin = pExtW->resWinIdx;
4,139,384✔
2750
  int32_t                  code = TSDB_CODE_SUCCESS;
4,138,576✔
2751
  int32_t                  lino = 0;
4,138,576✔
2752
  SSDataBlock*             pRes = NULL;
4,138,576✔
2753
  SStreamRuntimeFuncInfo*  pStream = &pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo;
4,138,980✔
2754

2755
  if (pStream->isMultiGroupCalc) {
4,139,384✔
2756
    TAOS_CHECK_EXIT(extWinNonAggOutputMultiGrpRes(pOperator, pExtW, &pRes));
4,070,704✔
2757
  } else {
2758
    for (; pExtW->outWinIdx < numOfWin; pExtW->outWinIdx++, extWinIncCurWinOutIdx(pOperator)) {
68,680✔
2759
      SList* pList = taosArrayGetP(pExtW->pOutputBlocks, pExtW->outWinIdx);
38,380✔
2760
      if (listNEles(pList) <= 0) {
38,380✔
NEW
2761
        continue;
×
2762
      }
2763

2764
      SListNode* pNode = tdListPopHead(pList);
38,380✔
2765
      pRes = *(SSDataBlock**)pNode->data;
38,380✔
2766
      pStream->pStreamBlkWinIdx = *(SArray**)((SArray**)pNode->data + 1);
38,380✔
2767
      pExtW->pLastBlkNode = pNode;
38,380✔
2768

2769
      if (listNEles(pList) <= 0) {
38,380✔
2770
        pExtW->outWinIdx++;
37,168✔
2771
        extWinIncCurWinOutIdx(pOperator);
37,168✔
2772
      }
2773

2774
      break;
38,380✔
2775
    }
2776
  }
2777

2778
  if (pRes) {
4,139,788✔
2779
    qDebug("%s result generated, rows:%" PRId64 , GET_TASKID(pOperator->pTaskInfo), pRes->info.rows);
4,090,096✔
2780
    pRes->info.version = pOperator->pTaskInfo->version;
4,090,096✔
2781
    pRes->info.dataLoad = 1;
4,090,096✔
2782
  } else {
2783
    pStream->pStreamBlkWinIdx = NULL;
49,692✔
2784
    qDebug("%s ext window done", GET_TASKID(pOperator->pTaskInfo));
49,692✔
2785
  }
2786

2787
  *ppRes = (pRes && pRes->info.rows > 0) ? pRes : NULL;
4,139,788✔
2788

2789
_exit:
4,139,788✔
2790

2791
  if (code != TSDB_CODE_SUCCESS) {
4,139,788✔
2792
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2793
  }
2794

2795
  return code;
4,139,788✔
2796
}
2797

2798
static int32_t extWinAggHandleEmptyWins(SOperatorInfo* pOperator, SSDataBlock* pBlock, bool allRemains, SExtWinTimeWindow* pWin) {
2,147,483,647✔
2799
  int32_t code = 0, lino = 0;
2,147,483,647✔
2800
  SExternalWindowOperator* pExtW = (SExternalWindowOperator*)pOperator->info;
2,147,483,647✔
2801
  SExprSupp* pSup = &pOperator->exprSupp;
2,147,483,647✔
2802
  int32_t currIdx = extWinGetCurWinIdx(pOperator);
2,147,483,647✔
2803

2804
  if (NULL == pExtW->pEmptyInputBlock || (pWin && pWin->tw.skey == pExtW->pTGrpCtx->pCCtx->lastSKey &&
2,147,483,647✔
NEW
2805
                                          pWin->tw.ekey == pExtW->pTGrpCtx->pCCtx->lastEKey)) {
×
2806
    goto _exit;
2,147,483,647✔
2807
  }
2808

UNCOV
2809
  bool ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
×
NEW
2810
  int32_t endIdx = allRemains ? (pExtW->pTGrpCtx->pCCtx->pWins->size - 1) : (currIdx - 1);
×
UNCOV
2811
  SResultRowInfo* pResultRowInfo = &pExtW->binfo.resultRowInfo;
×
UNCOV
2812
  SSDataBlock* pInput = pExtW->pEmptyInputBlock;
×
2813

NEW
2814
  if ((pExtW->pTGrpCtx->pCCtx->lastWinId + 1) <= endIdx) {
×
UNCOV
2815
    TAOS_CHECK_EXIT(setInputDataBlock(pSup, pExtW->pEmptyInputBlock, pExtW->binfo.inputTsOrder, MAIN_SCAN, true));
×
2816
  }
2817
  
NEW
2818
  for (int32_t i = pExtW->pTGrpCtx->pCCtx->lastWinId + 1; i <= endIdx; ++i) {
×
NEW
2819
    SExtWinTimeWindow* pWin = taosArrayGet(pExtW->pTGrpCtx->pCCtx->pWins, i);
×
2820

UNCOV
2821
    extWinSetCurWinIdx(pOperator, i);
×
NEW
2822
    qDebug("%s tgrp %" PRIu64 " cgrp %" PRIu64 " %dth ext empty window start:%" PRId64 ", end:%" PRId64 ", ascScan:%d",
×
2823
           GET_TASKID(pOperator->pTaskInfo), pExtW->lastTGrpId, pExtW->lastCGrpId, i, pWin->tw.skey, pWin->tw.ekey, ascScan);
2824

UNCOV
2825
    TAOS_CHECK_EXIT(extWinAggSetWinOutputBuf(pOperator, pWin, pSup, &pExtW->aggSup, pOperator->pTaskInfo));
×
2826

UNCOV
2827
    updateTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pWin->tw, 1);
×
UNCOV
2828
    code = extWinAggDo(pOperator, 0, 1, pInput);
×
NEW
2829
    pExtW->pTGrpCtx->pCCtx->lastWinId = i;  
×
UNCOV
2830
    TAOS_CHECK_EXIT(code);
×
2831
  }
2832

2833
  
UNCOV
2834
_exit:
×
2835

2836
  if (code) {
2,147,483,647✔
2837
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __FUNCTION__, lino, tstrerror(code));
×
2838
  } else {
2839
    if (pBlock) {
2,147,483,647✔
2840
      TAOS_CHECK_EXIT(setInputDataBlock(pSup, pBlock, pExtW->binfo.inputTsOrder, pBlock->info.scanFlag, true));
2,147,483,647✔
2841
    }
2842

2843
    if (!allRemains) {
2,147,483,647✔
2844
      extWinSetCurWinIdx(pOperator, currIdx);  
2,147,483,647✔
2845
    }
2846
  }
2847

2848
  return code;
2,147,483,647✔
2849
}
2850

2851
static int32_t extWinAggOpen(SOperatorInfo* pOperator, SSDataBlock* pInputBlock) {
4,364,926✔
2852
  SExternalWindowOperator* pExtW = (SExternalWindowOperator*)pOperator->info;
4,364,926✔
2853
  int32_t                  startPos = 0, winRows = 0;
4,364,926✔
2854
  int64_t*                 tsCol = extWinExtractTsCol(pInputBlock, pExtW->primaryTsIndex, pOperator->pTaskInfo);
4,364,926✔
2855
  bool                     ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
4,364,926✔
2856
  int32_t                  code = 0, lino = 0;
4,364,926✔
2857
  SExtWinTimeWindow*       pWin = NULL;
4,364,926✔
2858
  bool                     scalarCalc = false;
4,364,926✔
2859

2860
  while (true) {
2861
    TAOS_CHECK_EXIT((*pExtW->getWinFp)(pOperator, tsCol, &startPos, &pInputBlock->info, &pWin, &winRows));
2,147,483,647✔
2862
    if (pWin == NULL) {
2,147,483,647✔
2863
      break;
4,364,926✔
2864
    }
2865

2866
    TAOS_CHECK_EXIT(extWinAggHandleEmptyWins(pOperator, pInputBlock, false, pWin));
2,147,483,647✔
2867

2868
    qDebug("%s ext window [%" PRId64 ", %" PRId64 ") tgrp %" PRIu64 " cgrp %" PRIu64 " agg start, ascScan:%d, startPos:%d, winRows:%d",
2,147,483,647✔
2869
           GET_TASKID(pOperator->pTaskInfo), pWin->tw.skey, pWin->tw.ekey, pExtW->lastTGrpId, pExtW->lastCGrpId, ascScan, startPos, winRows);        
2870

2871
    if (!scalarCalc) {
2,147,483,647✔
2872
      if (pExtW->scalarSupp.pExprInfo) {
4,230,748✔
UNCOV
2873
        SExprSupp* pScalarSup = &pExtW->scalarSupp;
×
UNCOV
2874
        TAOS_CHECK_EXIT(projectApplyFunctions(pScalarSup->pExprInfo, pInputBlock, pInputBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
×
2875
                                     pExtW->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo)));
2876
      }
2877
      
2878
      scalarCalc = true;
4,230,748✔
2879
    }
2880

2881
    if (pWin->tw.skey != pExtW->pTGrpCtx->pCCtx->lastSKey || pWin->tw.ekey != pExtW->pTGrpCtx->pCCtx->lastEKey ||
2,147,483,647✔
2882
        pWin->tw.skey == INT64_MIN) {
722,192✔
2883
      TAOS_CHECK_EXIT(
2,147,483,647✔
2884
          extWinAggSetWinOutputBuf(pOperator, pWin, &pOperator->exprSupp, &pExtW->aggSup, pOperator->pTaskInfo));
2885
    }
2886

2887
    updateTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pWin->tw, 1);
2,147,483,647✔
2888
    TAOS_CHECK_EXIT(extWinAggDo(pOperator, startPos, winRows, pInputBlock));
2,147,483,647✔
2889
    
2890
    pExtW->pTGrpCtx->pCCtx->lastSKey = pWin->tw.skey;
2,147,483,647✔
2891
    pExtW->pTGrpCtx->pCCtx->lastEKey = pWin->tw.ekey;
2,147,483,647✔
2892
    pExtW->pTGrpCtx->pCCtx->lastWinId = extWinGetCurWinIdx(pOperator);
2,147,483,647✔
2893
    startPos += winRows;
2,147,483,647✔
2894
  }
2895

2896
_exit:
4,364,926✔
2897

2898
  if (code) {
4,364,926✔
2899
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
2900
  }
2901

2902
  return code;
4,364,926✔
2903
}
2904

2905
static int32_t extWinAggOutputSingleCGrpRes(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW, bool* grpDone) {
1,583,723✔
2906
  int32_t            code = TSDB_CODE_SUCCESS;
1,583,723✔
2907
  int32_t            lino = 0;
1,583,723✔
2908
  SExprInfo*         pExprInfo = pOperator->exprSupp.pExprInfo;
1,583,723✔
2909
  int32_t            numOfExprs = pOperator->exprSupp.numOfExprs;
1,583,723✔
2910
  int32_t*           rowEntryOffset = pOperator->exprSupp.rowEntryInfoOffset;
1,583,723✔
2911
  SqlFunctionCtx*    pCtx = pOperator->exprSupp.pCtx;
1,583,723✔
2912
  SSDataBlock*       pBlock = pExtW->binfo.pRes;
1,583,723✔
2913
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
1,583,723✔
2914
  SExtWinTrigGrpCtx* pTGrpCtx = pExtW->pTGrpCtx;
1,583,723✔
2915
  if (!pExtW->pTGrpCtx) {
1,583,723✔
2916
    return TSDB_CODE_SUCCESS;
88,476✔
2917
  }
2918
  SExtWinCalcGrpCtx* pCCtx = pTGrpCtx->pCCtx;
1,495,247✔
2919
  int32_t            numOfWin = taosArrayGetSize(pCCtx->pWins);
1,495,247✔
2920

2921
  // For vtable COLS merge (isDynWindow), iterate all windows including empty ones to output NULL placeholder rows;
2922
  // for normal queries, stop at outWinTotalNum (windows with actual results).
2923
  for (; pCCtx->outWinIdx < numOfWin && (pExtW->isDynWindow || pCCtx->outWinNum < pCCtx->outWinTotalNum); pCCtx->outWinIdx += 1) {
2,147,483,647✔
2924
    SExtWinTimeWindow* pWin = TARRAY_GET_ELEM(pCCtx->pWins, pCCtx->outWinIdx);
2,147,483,647✔
2925
    bool emptyWin = false;
2,147,483,647✔
2926

2927
    if (pWin->resWinIdx < 0) {
2,147,483,647✔
2928
      emptyWin = true;
225,901,349✔
2929
    } else {
2930
      SResultRow* pRow = (SResultRow*)((char*)pExtW->resultRows.pResultRows[pWin->resWinIdx / pExtW->resultRows.resRowSize] + (pWin->resWinIdx % pExtW->resultRows.resRowSize) * pExtW->aggSup.resultRowSize);
2,147,483,647✔
2931
      doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
2,147,483,647✔
2932
      if (pRow->numOfRows == 0) {
2,147,483,647✔
NEW
2933
        emptyWin = true;
×
2934
      }
2935
    }
2936

2937
    if (emptyWin) {
2,147,483,647✔
2938
      if (!pExtW->isDynWindow) {
225,901,349✔
2939
        continue;
124,028✔
2940
      }
2941
      // Output a NULL row to keep row count aligned with window count in COLS merge
2942
      if (pBlock->info.rows + 1 > pBlock->info.capacity) {
225,777,321✔
NEW
2943
        uint32_t newSize = pBlock->info.rows + 1 + numOfWin - pCCtx->outWinIdx;
×
NEW
2944
        TAOS_CHECK_EXIT(blockDataEnsureCapacity(pBlock, newSize));
×
2945
      }
2946
      for (int32_t j = 0; j < taosArrayGetSize(pBlock->pDataBlock); ++j) {
572,554,653✔
2947
        SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, j);
346,777,332✔
2948
        if (pColInfo) {
346,777,332✔
2949
          colDataSetNULL(pColInfo, pBlock->info.rows);
346,777,332✔
2950
        }
2951
      }
2952
      pBlock->info.rows += 1;
225,777,321✔
2953
      pCCtx->outWinNum++;
225,777,321✔
2954
      TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pExtW->pWinRowIdx, pBlock, pCCtx->outWinIdx, 1));
225,777,321✔
2955

2956
      if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
225,777,321✔
NEW
2957
        ++pCCtx->outWinIdx;
×
NEW
2958
        break;
×
2959
      }
2960
      continue;
225,777,321✔
2961
    }
2962

2963
    pCCtx->outWinNum++;
2,147,483,647✔
2964
    SResultRow* pRow = (SResultRow*)((char*)pExtW->resultRows.pResultRows[pWin->resWinIdx / pExtW->resultRows.resRowSize] + (pWin->resWinIdx % pExtW->resultRows.resRowSize) * pExtW->aggSup.resultRowSize);
2,147,483,647✔
2965

2966
    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
2,147,483,647✔
2967
      uint32_t newSize = pBlock->info.rows + pRow->numOfRows + (pExtW->isDynWindow ? numOfWin - pCCtx->outWinIdx : pCCtx->outWinTotalNum - pCCtx->outWinNum);
1,172,922✔
2968
      TAOS_CHECK_EXIT(blockDataEnsureCapacity(pBlock, newSize));
1,172,922✔
2969
      qDebug("datablock capacity not sufficient, expand to required:%d, current capacity:%d, %s", newSize,
1,172,922✔
2970
             pBlock->info.capacity, GET_TASKID(pTaskInfo));
2971
    }
2972

2973
    updateTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pRow->win, 0);
2,147,483,647✔
2974
    TAOS_CHECK_EXIT(copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo));
2,147,483,647✔
2975

2976
    pBlock->info.rows += pRow->numOfRows;
2,147,483,647✔
2977

2978
    TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pExtW->pWinRowIdx, pBlock, pRow->winIdx, pRow->numOfRows));
2,147,483,647✔
2979

2980
    if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
2,147,483,647✔
2981
      ++pCCtx->outWinIdx;
1,027,418✔
2982
      break;
1,027,418✔
2983
    }
2984
  }
2985

2986
  if (grpDone) {
1,495,247✔
2987
    // isDynWindow: done when all windows (including empty) are emitted; otherwise done when result count is met
2988
    *grpDone = pExtW->isDynWindow ? (pCCtx->outWinIdx >= numOfWin) : (pCCtx->outWinIdx >= numOfWin || pCCtx->outWinNum >= pCCtx->outWinTotalNum);
13,332✔
2989
  }
2990

2991
_exit:
1,481,915✔
2992

2993
  if (code != TSDB_CODE_SUCCESS) {
1,495,247✔
NEW
2994
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2995
  }
2996
  
2997
  return code;
1,495,247✔
2998
}
2999

3000
static int32_t extWinAggOutputMulNoOrderCGrpsRes(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW) {
173,720✔
3001
  int32_t code = 0, lino = 0;
173,720✔
3002
  SExtWinTrigGrpCtx* pTGrpCtx = pExtW->pTGrpCtx;
173,720✔
3003
  SSDataBlock*    pBlock = pExtW->binfo.pRes;
173,720✔
3004

3005
  if (pTGrpCtx == NULL) {
173,720✔
3006
    return TSDB_CODE_SUCCESS;
39,592✔
3007
  }
3008

3009
  if (pTGrpCtx->pCCtx) {
134,128✔
3010
    pExtW->lastCGrpId = pTGrpCtx->pCCtx->groupId;
78,376✔
3011
    TAOS_CHECK_EXIT(extWinAggOutputSingleCGrpRes(pOperator, pExtW, NULL));
78,376✔
3012
  }
3013
  if (0 == pBlock->info.rows) {
134,128✔
3014
    pTGrpCtx->pCCtx = tSimpleHashIterate(pTGrpCtx->pCGCtxs, pTGrpCtx->pCCtx, &pTGrpCtx->lastCtxIter);
133,320✔
3015
    while (pTGrpCtx->pCCtx != NULL) {
235,128✔
3016
      pExtW->lastCGrpId = pTGrpCtx->pCCtx->groupId;
179,376✔
3017
      TAOS_CHECK_EXIT(extWinAggOutputSingleCGrpRes(pOperator, pExtW, NULL));
179,376✔
3018
      if (pBlock->info.rows > 0) {
179,376✔
3019
        break;
77,568✔
3020
      }
3021
      
3022
      pTGrpCtx->pCCtx = tSimpleHashIterate(pTGrpCtx->pCGCtxs, pTGrpCtx->pCCtx, &pTGrpCtx->lastCtxIter);
101,808✔
3023
    }
3024
  }
3025

3026
_exit:
56,560✔
3027

3028
  if (code != TSDB_CODE_SUCCESS) {
134,128✔
NEW
3029
    qError("%s %s failed at line %d since %s", GET_TASKID(pOperator->pTaskInfo), __func__, lino, tstrerror(code));
×
3030
  }
3031

3032
  return code;
134,128✔
3033
}
3034

3035

3036

3037
static int32_t extWinAggOutputMulNoOrderTGrpsRes(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW) {
60,600✔
3038
  int32_t         code = TSDB_CODE_SUCCESS;
60,600✔
3039
  int32_t         lino = 0;
60,600✔
3040
  SStreamRuntimeFuncInfo* pStream = &pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo;
60,600✔
3041
  SSDataBlock*    pBlock = pExtW->binfo.pRes;
60,600✔
3042

3043
  if (pStream->curGrpCalc) {
60,600✔
3044
    pExtW->lastTGrpId = *(uint64_t*)tSimpleHashGetKey(pStream->curGrpCalc, NULL);
35,552✔
3045
    pExtW->pTGrpCtx = pStream->curGrpCalc->pRunnerGrpCtx;
17,776✔
3046
    pExtW->ownTGrpCtx = false;
17,776✔
3047
    if (pExtW->calcWithPartition) {
17,776✔
3048
      TAOS_CHECK_EXIT(extWinAggOutputMulNoOrderCGrpsRes(pOperator, pExtW));
17,372✔
3049
    } else {
NEW
3050
      TAOS_CHECK_EXIT(extWinAggOutputSingleCGrpRes(pOperator, pExtW, NULL));
×
3051
    }
3052
  }
3053
  if (0 == pBlock->info.rows) {
60,600✔
3054
    pStream->curGrpCalc = tSimpleHashIterate(pStream->pGroupCalcInfos, pStream->curGrpCalc, &pExtW->lastOutputIter);
59,792✔
3055
    while (pStream->curGrpCalc != NULL) {
137,360✔
3056
      if (pStream->curGrpCalc->pRunnerGrpCtx) {
94,536✔
3057
        pExtW->lastTGrpId = *(uint64_t*)tSimpleHashGetKey(pStream->curGrpCalc, NULL);
33,936✔
3058
        pExtW->pTGrpCtx = pStream->curGrpCalc->pRunnerGrpCtx;
16,968✔
3059
        pExtW->ownTGrpCtx = false;
16,968✔
3060
        pExtW->lastCGrpId = 0;
16,968✔
3061

3062
        if (pExtW->calcWithPartition) {
16,968✔
3063
          pExtW->pTGrpCtx->pCCtx = NULL;
16,968✔
3064
          TAOS_CHECK_EXIT(extWinAggOutputMulNoOrderCGrpsRes(pOperator, pExtW));
16,968✔
3065
        } else {
NEW
3066
          TAOS_CHECK_EXIT(extWinAggOutputSingleCGrpRes(pOperator, pExtW, NULL));
×
3067
        }
3068
        
3069
        if (pBlock->info.rows > 0) {
16,968✔
3070
          break;
16,968✔
3071
        }
3072
      }
3073
      
3074
      pStream->curGrpCalc = tSimpleHashIterate(pStream->pGroupCalcInfos, pStream->curGrpCalc, &pExtW->lastOutputIter);
77,568✔
3075
    }
3076
  }
3077

3078
  extWinPostUpdateStreamRt(pStream, pOperator, pExtW);
60,600✔
3079

3080
_exit:
60,600✔
3081

3082
  if (code != TSDB_CODE_SUCCESS) {
60,600✔
NEW
3083
    qError("%s %s failed at line %d since %s", GET_TASKID(pOperator->pTaskInfo), __func__, lino, tstrerror(code));
×
3084
  }
3085

3086
  return code;
60,600✔
3087
}
3088

3089

3090
static int32_t extWinAggOutputMulOrderCGrpsRes(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW) {
31,512✔
3091
  int32_t code = 0, lino = 0;
31,512✔
3092
  SExtWinTrigGrpCtx* pTGrpCtx = pExtW->pTGrpCtx;
31,512✔
3093
  SSDataBlock*    pBlock = pExtW->binfo.pRes;
31,512✔
3094
  int32_t grpNum = taosArrayGetSize(pExtW->pGrpIds);
31,512✔
3095
  bool grpDone = false;
31,512✔
3096

3097
  if (pTGrpCtx == NULL || pTGrpCtx->pCGCtxs == NULL) {
31,512✔
3098
    return TSDB_CODE_SUCCESS;
10,908✔
3099
  }
3100

3101
  for (; pExtW->lastGrpIdx < grpNum; ++pExtW->lastGrpIdx) {
20,604✔
3102
    uint64_t *grpId = taosArrayGet(pExtW->pGrpIds, pExtW->lastGrpIdx);
10,504✔
3103
    pTGrpCtx->pCCtx = tSimpleHashGet(pTGrpCtx->pCGCtxs, grpId, sizeof(*grpId));
10,504✔
3104
    TSDB_CHECK_NULL(pTGrpCtx->pCCtx, code, lino, _exit, terrno);
10,504✔
3105
    TAOS_CHECK_EXIT(extWinAggOutputSingleCGrpRes(pOperator, pExtW, &grpDone));
10,504✔
3106
    if (pBlock->info.rows > 0) {
10,504✔
3107
      if (grpDone) {
10,504✔
3108
        pExtW->lastGrpIdx++;
10,504✔
3109
      }
3110
      
3111
      break;
10,504✔
3112
    }
3113
  }
3114

3115
_exit:
20,604✔
3116

3117
  if (code != TSDB_CODE_SUCCESS) {
20,604✔
NEW
3118
    qError("%s %s failed at line %d since %s", GET_TASKID(pOperator->pTaskInfo), __func__, lino, tstrerror(code));
×
3119
  }
3120

3121
  return code;
20,604✔
3122
}
3123

NEW
3124
static int32_t extWinAggOutputMulOrderTGrpsRes(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW) {
×
NEW
3125
  int32_t code = 0, lino = 0;
×
NEW
3126
  SExtWinTrigGrpCtx* pTGrpCtx = pExtW->pTGrpCtx;
×
NEW
3127
  SSDataBlock*    pBlock = pExtW->binfo.pRes;
×
NEW
3128
  int32_t grpNum = taosArrayGetSize(pExtW->pGrpIds);
×
NEW
3129
  SStreamRuntimeFuncInfo* pStream = &pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo;
×
NEW
3130
  bool grpDone = false;
×
3131

NEW
3132
  for (; pExtW->lastGrpIdx < grpNum; ++pExtW->lastGrpIdx) {
×
NEW
3133
    uint64_t *grpId = taosArrayGet(pExtW->pGrpIds, pExtW->lastGrpIdx);
×
3134

NEW
3135
    pStream->curGrpCalc = tSimpleHashGet(pStream->pGroupCalcInfos, grpId, sizeof(*grpId));
×
NEW
3136
    TSDB_CHECK_NULL(pStream->curGrpCalc, code, lino, _exit, terrno);
×
NEW
3137
    pExtW->pTGrpCtx = pStream->curGrpCalc->pRunnerGrpCtx;
×
NEW
3138
    pExtW->ownTGrpCtx = false;
×
3139

NEW
3140
    TAOS_CHECK_EXIT(extWinAggOutputSingleCGrpRes(pOperator, pExtW, &grpDone));
×
NEW
3141
    if (pBlock->info.rows > 0) {
×
NEW
3142
      if (grpDone) {
×
NEW
3143
        pExtW->lastGrpIdx++;
×
3144
      }
3145

NEW
3146
      break;
×
3147
    }
3148
  }
3149

NEW
3150
  extWinPostUpdateStreamRt(pStream, pOperator, pExtW);
×
3151

UNCOV
3152
_exit:
×
3153

NEW
3154
  if (code != TSDB_CODE_SUCCESS) {
×
NEW
3155
    qError("%s %s failed at line %d since %s", GET_TASKID(pOperator->pTaskInfo), __func__, lino, tstrerror(code));
×
3156
  }
3157

UNCOV
3158
  return code;
×
3159
}
3160

3161
static int32_t extWinAggOutputMulOrderTCGrpsRes(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW) {
9,292✔
3162
  int32_t code = 0, lino = 0;
9,292✔
3163
  SExtWinTrigGrpCtx* pTGrpCtx = pExtW->pTGrpCtx;
9,292✔
3164
  SSDataBlock*    pBlock = pExtW->binfo.pRes;
9,292✔
3165
  int32_t grpNum = taosArrayGetSize(pExtW->pCTGrpIds);
9,292✔
3166
  SStreamRuntimeFuncInfo* pStream = &pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo;
9,292✔
3167
  bool grpDone = false;
9,292✔
3168

3169
  for (; pExtW->lastGrpIdx < grpNum; ++pExtW->lastGrpIdx) {
9,292✔
3170
    uint64_t* cGrpId = taosArrayGet(pExtW->pCTGrpIds, pExtW->lastGrpIdx);
2,828✔
3171
    uint64_t* tGrpId = cGrpId + 1;
2,828✔
3172

3173
    pStream->curGrpCalc = tSimpleHashGet(pStream->pGroupCalcInfos, tGrpId, sizeof(*tGrpId));
2,828✔
3174
    TSDB_CHECK_NULL(pStream->curGrpCalc, code, lino, _exit, terrno);
2,828✔
3175
    pExtW->pTGrpCtx = pStream->curGrpCalc->pRunnerGrpCtx;
2,828✔
3176
    pExtW->ownTGrpCtx = false;
2,828✔
3177

3178
    pExtW->pTGrpCtx->pCCtx = tSimpleHashGet(pExtW->pTGrpCtx->pCGCtxs, cGrpId, sizeof(*cGrpId));
2,828✔
3179
    TSDB_CHECK_NULL(pExtW->pTGrpCtx->pCCtx, code, lino, _exit, terrno);
2,828✔
3180

3181
    TAOS_CHECK_EXIT(extWinAggOutputSingleCGrpRes(pOperator, pExtW, &grpDone));
2,828✔
3182
    if (pBlock->info.rows > 0) {
2,828✔
3183
      if (grpDone) {
2,828✔
3184
        pExtW->lastGrpIdx++;
2,828✔
3185
      }
3186
      
3187
      break;
2,828✔
3188
    }
3189
  }
3190

3191
  extWinPostUpdateStreamRt(pStream, pOperator, pExtW);
9,292✔
3192
  
3193
_exit:
9,292✔
3194

3195
  if (code != TSDB_CODE_SUCCESS) {
9,292✔
NEW
3196
    qError("%s %s failed at line %d since %s", GET_TASKID(pOperator->pTaskInfo), __func__, lino, tstrerror(code));
×
3197
  }
3198

3199
  return code;
9,292✔
3200
}
3201

3202

3203
static int32_t extWinAggOutputRes(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,553,019✔
3204
  SExternalWindowOperator* pExtW = pOperator->info;
1,553,019✔
3205
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
1,553,019✔
3206
  SSDataBlock*    pBlock = pExtW->binfo.pRes;
1,553,019✔
3207
  int32_t         code = TSDB_CODE_SUCCESS;
1,553,019✔
3208
  int32_t         lino = 0;
1,553,019✔
3209
  SStreamRuntimeFuncInfo*  pStream = pTaskInfo->pStreamRuntimeInfo ? &pTaskInfo->pStreamRuntimeInfo->funcInfo : NULL;
1,553,019✔
3210

3211
  pBlock->info.version = pTaskInfo->version;
1,553,019✔
3212
  blockDataCleanup(pBlock);
1,553,019✔
3213
  taosArrayClear(pExtW->pWinRowIdx);
1,553,019✔
3214

3215
  if (pExtW->needGroupSort) {
1,553,019✔
3216
    if (pStream && pStream->isMultiGroupCalc) {
40,804✔
3217
      if (pExtW->calcWithPartition) {
9,292✔
3218
        TAOS_CHECK_EXIT(extWinAggOutputMulOrderTCGrpsRes(pOperator, pExtW));
9,292✔
3219
      } else {
NEW
3220
        TAOS_CHECK_EXIT(extWinAggOutputMulOrderTGrpsRes(pOperator, pExtW));
×
3221
      }
3222
    } else {
3223
      TAOS_CHECK_EXIT(extWinAggOutputMulOrderCGrpsRes(pOperator, pExtW));
31,512✔
3224
    }
3225
  } else {
3226
    if (pStream && pStream->isMultiGroupCalc) {
1,512,215✔
3227
      TAOS_CHECK_EXIT(extWinAggOutputMulNoOrderTGrpsRes(pOperator, pExtW));
60,600✔
3228
    } else if (pExtW->calcWithPartition) {
1,451,615✔
3229
      TAOS_CHECK_EXIT(extWinAggOutputMulNoOrderCGrpsRes(pOperator, pExtW));
138,976✔
3230
    } else {
3231
      TAOS_CHECK_EXIT(extWinAggOutputSingleCGrpRes(pOperator, pExtW, NULL));
1,312,639✔
3232
    }
3233
  }
3234
  
3235
  qDebug("%s result generated, rows:%" PRId64 ", groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
1,553,019✔
3236
         pBlock->info.id.groupId);
3237

3238
  int32_t rowsBeforeFilter = pBlock->info.rows;
1,553,019✔
3239
  SColumnInfoData* pFilterRes = NULL;
1,553,019✔
3240
  TAOS_CHECK_EXIT(doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL, &pFilterRes));
1,553,019✔
3241
  if (pBlock->info.rows < rowsBeforeFilter) {
1,552,615✔
3242
    if (pFilterRes != NULL) {
2,424✔
3243
      TAOS_CHECK_EXIT(extWinRebuildWinIdxByFilter(pTaskInfo, pExtW->pWinRowIdx, rowsBeforeFilter, pFilterRes));
2,424✔
3244
    } else {
3245
      // no indicator means all rows are filtered out by short-circuit path
NEW
3246
      taosArrayClear(pExtW->pWinRowIdx);
×
3247
    }
3248
  }
3249

3250
  pBlock->info.dataLoad = 1;
1,552,615✔
3251
  extWinAssignBlockGrpId(pOperator, pExtW, &pBlock->info.id);
1,553,019✔
3252

3253
  qDebug("%s result generated, rows:%" PRId64 ", cGrpId:%" PRIu64 " baseGid:%" PRIu64, 
1,553,019✔
3254
    GET_TASKID(pTaskInfo), pBlock->info.rows, pBlock->info.id.groupId, pBlock->info.id.baseGId);
3255

3256
  *ppRes = (pBlock->info.rows > 0) ? pBlock : NULL;
1,553,019✔
3257

3258
  if (*ppRes) {
1,553,019✔
3259
    (*ppRes)->info.window.skey = pExtW->orgTableTimeRange.skey;
1,247,595✔
3260
    (*ppRes)->info.window.ekey = pExtW->orgTableTimeRange.ekey;
1,247,595✔
3261
  }
3262
  if (pOperator->pTaskInfo->pStreamRuntimeInfo) {
1,553,019✔
3263
    pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamBlkWinIdx = pExtW->pWinRowIdx;
465,408✔
3264
  }
3265

3266
_exit:
1,553,019✔
3267
  colDataDestroy(pFilterRes);
1,553,019✔
3268
  taosMemoryFree(pFilterRes);
1,553,019✔
3269

3270
  if (code != TSDB_CODE_SUCCESS) {
1,553,019✔
NEW
3271
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3272
  }
3273

3274
  return code;
1,553,019✔
3275
}
3276

3277
static void extWinFreeResultRow(SExternalWindowOperator* pExtW) {
355,116✔
3278
  if (pExtW->resultRows.resRowAllcNum * pExtW->aggSup.resultRowSize >= 1048576) {
355,116✔
3279
    int32_t i = 1;
28,684✔
3280
    while (i < pExtW->resultRows.resRowsSize && pExtW->resultRows.pResultRows[i]) {
28,684✔
NEW
3281
      taosMemoryFreeClear(pExtW->resultRows.pResultRows[i]);
×
NEW
3282
      pExtW->resultRows.resRowAllcNum -= pExtW->resultRows.resRowSize;
×
NEW
3283
      i++;
×
3284
    }
3285
  }
3286
  
3287
  if (pExtW->binfo.pRes && pExtW->binfo.pRes->info.rows * pExtW->aggSup.resultRowSize >= 1048576) {
355,116✔
NEW
3288
    blockDataFreeCols(pExtW->binfo.pRes);
×
3289
  }
3290
}
355,116✔
3291

3292
static bool extWinNonAggGotResBlock(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW) {
34,744✔
3293
  if (pExtW->calcWithPartition) {
34,744✔
3294
    return false;
18,584✔
3295
  }
3296

3297
  if ((pExtW->multiTableMode && !pExtW->inputHasOrder) || pExtW->needGroupSort) {
16,160✔
3298
    return false;
10,504✔
3299
  }
3300
  int32_t remainWin = pExtW->resWinIdx - pExtW->outWinIdx;
5,656✔
3301
  if (remainWin > 1 && (NULL == pExtW->timeRangeExpr || !pExtW->timeRangeExpr->needCalc)) {
5,656✔
NEW
3302
    return true;
×
3303
  }
3304
  
3305
  SList* pList = taosArrayGetP(pExtW->pOutputBlocks, pExtW->outWinIdx);
5,656✔
3306
  if (!pList || listNEles(pList) <= 0) {
5,656✔
NEW
3307
    return false;
×
3308
  }
3309
  if (listNEles(pList) > 1) {
5,656✔
3310
    return true;
404✔
3311
  }
3312

3313
  SListNode* pNode = listHead(pList);
5,252✔
3314
  SArray* pIdx = *(SArray**)((SArray**)pNode->data + 1);
5,252✔
3315
  int32_t* winIdx = taosArrayGetLast(pIdx);
5,252✔
3316
  if (winIdx && *winIdx < pExtW->pTGrpCtx->pCCtx->blkWinStartIdx) {
5,252✔
3317
    return true;
2,020✔
3318
  }
3319

3320
  return false;
3,232✔
3321
}
3322

3323
static int32_t getTimeWindowOfBlock(SSDataBlock *pBlock, col_id_t tsSlotId, int64_t *startTs, int64_t *endTs) {
4,097,478✔
3324
  int32_t code = TSDB_CODE_SUCCESS;
4,097,478✔
3325
  int32_t lino = 0;
4,097,478✔
3326
  int32_t tsIndex = -1;
4,097,478✔
3327
  for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
5,834,893✔
3328
    SColumnInfoData *pCol = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
5,834,893✔
3329
    QUERY_CHECK_NULL(pCol, code, lino, _return, terrno)
5,834,893✔
3330
    if (pCol->info.colId == tsSlotId) {
5,834,893✔
3331
      tsIndex = i;
4,097,478✔
3332
      break;
4,097,478✔
3333
    }
3334
  }
3335

3336
  if (tsIndex == -1) {
4,097,478✔
3337
    tsIndex = (int32_t)taosArrayGetSize(pBlock->pDataBlock) - 1;
×
3338
  }
3339

3340
  SColumnInfoData *pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, tsIndex);
4,097,478✔
3341
  QUERY_CHECK_NULL(pColData, code, lino, _return, terrno)
4,097,478✔
3342

3343
  GET_TYPED_DATA(*startTs, int64_t, TSDB_DATA_TYPE_TIMESTAMP, colDataGetNumData(pColData, 0), 0);
4,097,478✔
3344
  GET_TYPED_DATA(*endTs, int64_t, TSDB_DATA_TYPE_TIMESTAMP, colDataGetNumData(pColData, pBlock->info.rows - 1), 0);
4,097,478✔
3345

3346
  return code;
4,097,478✔
3347
_return:
×
3348
  qError("failed to get time window of block, %s code:%s, line:%d", __func__, tstrerror(code), lino);
×
3349
  return code;
×
3350
}
3351

3352
static void extWinEndClearCtxs(SExternalWindowOperator* pExtW, SExecTaskInfo* pTaskInfo) {
1,445,959✔
3353
  if (pTaskInfo == NULL || pTaskInfo->pStreamRuntimeInfo == NULL) {
1,445,959✔
3354
    return;
1,087,611✔
3355
  }
3356

3357
  SStreamRuntimeFuncInfo* pInfo = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
358,348✔
3358
  if (!pInfo->isMultiGroupCalc) {
358,348✔
3359
    return;
289,668✔
3360
  }
3361

3362
  pInfo->pStreamPesudoFuncVals = NULL;
68,680✔
3363
  pInfo->pStreamPartColVals = NULL;
68,680✔
3364
}
3365

3366

3367
static int32_t extWinAggHandleMultiTGrpEmptyWins(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW) {
49,288✔
3368
  int32_t         code = TSDB_CODE_SUCCESS;
49,288✔
3369
  int32_t         lino = 0;
49,288✔
3370
  int32_t         iter = 0;
49,288✔
3371
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
49,288✔
3372
  SStreamRuntimeFuncInfo* pStream = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
49,288✔
3373
  SSTriggerGroupCalcInfo* pGroup = NULL;
49,288✔
3374

3375
  if (NULL == pExtW->pEmptyInputBlock) {
49,288✔
3376
    goto _exit;
49,288✔
3377
  }
3378
  
NEW
3379
  pGroup = tSimpleHashIterate(pStream->pGroupCalcInfos, NULL, &iter);
×
NEW
3380
  while (pGroup != NULL) {
×
NEW
3381
    pExtW->lastTGrpId = *(uint64_t*)tSimpleHashGetKey(pGroup, NULL);
×
3382

NEW
3383
    if (NULL == pGroup->pRunnerGrpCtx) {
×
NEW
3384
      pStream->curGrpCalc = pGroup;
×
3385

NEW
3386
      SExtWinTrigGrpCtx* pTGCtx = taosMemoryCalloc(1, sizeof(SExtWinTrigGrpCtx));
×
NEW
3387
      TSDB_CHECK_NULL(pTGCtx, code, lino, _exit, terrno);
×
3388

NEW
3389
      pTGCtx->pCCtx = taosMemoryCalloc(1, sizeof(*pTGCtx->pCCtx));
×
NEW
3390
      TSDB_CHECK_NULL(pTGCtx->pCCtx, code, lino, _exit, terrno);
×
NEW
3391
      TAOS_CHECK_EXIT(extWinInitCGrpCtx(pExtW, pOperator->pTaskInfo, pTGCtx->pCCtx));
×
NEW
3392
      pGroup->pRunnerGrpCtx = pTGCtx;
×
3393
    }
3394

NEW
3395
    pExtW->pTGrpCtx = pGroup->pRunnerGrpCtx;
×
NEW
3396
    pExtW->ownTGrpCtx = false;
×
NEW
3397
    TAOS_CHECK_EXIT(extWinAggHandleEmptyWins(pOperator, NULL, true, NULL));
×
3398
    
NEW
3399
    pGroup = tSimpleHashIterate(pStream->pGroupCalcInfos, pGroup, &iter);
×
3400
  }
3401

3402
_exit:
49,288✔
3403

3404
  if (code != TSDB_CODE_SUCCESS) {
49,288✔
NEW
3405
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3406
  }
3407

3408
  return code;
49,288✔
3409
}
3410

3411
static void extWinPrepareForOutput(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW) {
1,443,535✔
3412
  pExtW->lastGrpIdx = 0;
1,443,535✔
3413
  pExtW->lastOutputIter = 0;
1,443,535✔
3414
  pExtW->lastTGrpId = 0;
1,443,535✔
3415
  pExtW->lastCGrpId = 0;
1,443,535✔
3416

3417
  if (pOperator == NULL || pOperator->pTaskInfo == NULL || pOperator->pTaskInfo->pStreamRuntimeInfo == NULL) {
1,443,535✔
3418
    return;
1,087,611✔
3419
  }
3420

3421
  SStreamRuntimeFuncInfo* pStream = &pOperator->pTaskInfo->pStreamRuntimeInfo->funcInfo;
355,924✔
3422

3423
  pStream->curGrpCalc = NULL;
355,924✔
3424
  pStream->curGrpRead = NULL;
355,924✔
3425

3426
  if (pExtW->needGroupSort) {
355,924✔
3427
    if (pStream->isMultiGroupCalc && pExtW->calcWithPartition) {
27,472✔
3428
      if (pExtW->pCTGrpIds) {
6,464✔
3429
        taosArraySort(pExtW->pCTGrpIds, extWinGrpIdCompare);
2,828✔
3430
      }
3431
    } else {
3432
      if (pExtW->pGrpIds) {
21,008✔
3433
        taosArraySort(pExtW->pGrpIds, extWinGrpIdCompare);
10,100✔
3434
      }
3435
    }
3436
  }
3437

3438
  // In multi-group output mode, always restart from iterator state instead of
3439
  // inheriting the calc phase's current group/cgrp cursor.
3440
  if (pStream->isMultiGroupCalc) {
355,924✔
3441
    pExtW->pTGrpCtx = NULL;
68,680✔
3442
    pExtW->ownTGrpCtx = false;
68,680✔
3443
  }
3444

3445
  if ((!pStream->isMultiGroupCalc) && pExtW->calcWithPartition && pExtW->pTGrpCtx) {
355,924✔
3446
    pExtW->pTGrpCtx->pCCtx = NULL;
55,752✔
3447
  }
3448
}
3449

3450
static int32_t extWinEnsureDynParamOpenCtx(SExternalWindowOperator* pExtW) {
1,088,851✔
3451
  int32_t                 code = TSDB_CODE_SUCCESS;
1,088,851✔
3452
  int32_t                 lino = 0;
1,088,851✔
3453
  if (pExtW->pTGrpCtx == NULL) {
1,088,851✔
3454
    pExtW->pTGrpCtx = taosMemoryCalloc(1, sizeof(*pExtW->pTGrpCtx));
1,038,418✔
3455
    TSDB_CHECK_NULL(pExtW->pTGrpCtx, code, lino, _exit, terrno);
1,038,418✔
3456
    pExtW->ownTGrpCtx = true;
1,038,418✔
3457
  }
3458

3459
  if (pExtW->pTGrpCtx->pCCtx == NULL) {
1,088,851✔
3460
    pExtW->pTGrpCtx->pCCtx = taosMemoryCalloc(1, sizeof(*pExtW->pTGrpCtx->pCCtx));
1,038,418✔
3461
    TSDB_CHECK_NULL(pExtW->pTGrpCtx->pCCtx, code, lino, _exit, terrno);
1,038,418✔
3462
    extWinInitDynParamCGrpCtx(pExtW->pTGrpCtx->pCCtx);
1,038,418✔
3463
  }
3464

3465
_exit:
50,433✔
3466
  if (code != TSDB_CODE_SUCCESS) {
1,088,851✔
NEW
3467
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3468
  }
3469

3470
  return code;
1,088,851✔
3471
}
3472

3473
static int32_t extWinOpen(SOperatorInfo* pOperator) {
1,447,199✔
3474
  if (OPTR_IS_OPENED(pOperator) && !pOperator->pOperatorGetParam) {
1,447,199✔
3475
    return TSDB_CODE_SUCCESS;
×
3476
  }
3477

3478
  int32_t                  code = 0;
1,447,199✔
3479
  int32_t                  lino = 0;
1,447,199✔
3480
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
1,447,199✔
3481
  SOperatorInfo*           pDownstream = pOperator->pDownstream[0];
1,447,199✔
3482
  SExternalWindowOperator* pExtW = pOperator->info;
1,447,199✔
3483
  SExprSupp*               pSup = &pOperator->exprSupp;
1,447,199✔
3484
  SStreamRuntimeFuncInfo*  pInfo = pTaskInfo->pStreamRuntimeInfo ? &pTaskInfo->pStreamRuntimeInfo->funcInfo : NULL;
1,447,199✔
3485

3486
  if (pOperator->pOperatorGetParam) {
1,447,199✔
3487
    SOperatorParam*               pParam = (SOperatorParam*)(pOperator->pOperatorGetParam);
1,088,851✔
3488
    SOperatorParam*               pDownParam = (SOperatorParam*)(pOperator->pDownstreamGetParams[0]);
1,088,851✔
3489
    SExchangeOperatorParam*       pExecParam = NULL;
1,088,851✔
3490
    SExternalWindowOperatorParam* pExtPram = (SExternalWindowOperatorParam*)pParam->value;
1,088,851✔
3491

3492
    TAOS_CHECK_EXIT(extWinEnsureDynParamOpenCtx(pExtW));
1,088,851✔
3493

3494
    if (pExtW->pTGrpCtx->pCCtx->pWins) {
1,088,851✔
3495
      taosArrayDestroy(pExtW->pTGrpCtx->pCCtx->pWins);
50,433✔
3496
    }
3497

3498
    extWinInitDynParamCGrpCtx(pExtW->pTGrpCtx->pCCtx);
1,088,851✔
3499
    pExtW->pTGrpCtx->pCCtx->pWins = pExtPram->ExtWins;
1,088,851✔
3500

3501
    pExtPram->ExtWins = NULL;
1,088,851✔
3502
    pExtW->outWinIdx = 0;
1,088,851✔
3503
    pExtW->isDynWindow = true;
1,088,851✔
3504
    pExtW->orgTableTimeRange.skey = INT64_MAX;
1,088,851✔
3505
    pExtW->orgTableTimeRange.ekey = INT64_MIN;
1,088,851✔
3506

3507
    QUERY_CHECK_CONDITION(pOperator->numOfDownstream == 1, code, lino, _exit, TSDB_CODE_INVALID_PARA)
1,088,851✔
3508

3509
    switch (pDownParam->opType) {
1,088,851✔
3510
      case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: {
1,088,851✔
3511
        pExecParam = (SExchangeOperatorParam*)((SOperatorParam*)(pOperator->pDownstreamGetParams[0]))->value;
1,088,851✔
3512
        if (!pExecParam->multiParams) {
1,088,851✔
3513
          pExecParam->basic.vgId = pExtW->orgTableVgId;
753,561✔
3514
          taosArrayClear(pExecParam->basic.uidList);
753,561✔
3515
          QUERY_CHECK_NULL(taosArrayPush(pExecParam->basic.uidList, &pExtW->orgTableUid), code, lino, _exit, terrno)
1,507,122✔
3516
        }
3517
        break;
1,088,851✔
3518
      }
3519
      default:
×
3520
        break;
×
3521
    }
3522

3523
    freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
1,088,851✔
3524
    pOperator->pOperatorGetParam = NULL;
1,088,851✔
3525
  }
3526

3527
  while (1) {
4,422,294✔
3528
    SSDataBlock* pBlock = getNextBlockFromDownstreamRemainDetach(pOperator, 0);
5,869,493✔
3529
    if (pBlock == NULL) {
5,868,253✔
3530
      if (EEXT_MODE_AGG == pExtW->mode) {
1,443,535✔
3531
        TAOS_CHECK_EXIT((pInfo && pInfo->isMultiGroupCalc) ? extWinAggHandleMultiTGrpEmptyWins(pOperator, pExtW)
1,393,035✔
3532
                                                           : extWinAggHandleEmptyWins(pOperator, pBlock, true, NULL));
3533
      }
3534

3535
      break;
1,443,535✔
3536
    }
3537

3538
    if (pExtW->isDynWindow) {
4,424,718✔
3539
      TSKEY skey = 0;
4,097,478✔
3540
      TSKEY ekey = 0;
4,097,478✔
3541
      code = getTimeWindowOfBlock(pBlock, pExtW->primaryTsIndex, &skey, &ekey);
4,097,478✔
3542
      QUERY_CHECK_CODE(code, lino, _exit);
4,097,478✔
3543
      pExtW->orgTableTimeRange.skey = TMIN(pExtW->orgTableTimeRange.skey, skey);
4,097,478✔
3544
      pExtW->orgTableTimeRange.ekey = TMAX(pExtW->orgTableTimeRange.ekey, ekey);
4,097,478✔
3545
    }
3546

3547
    printDataBlock(pBlock, __func__, pTaskInfo->id.str, pTaskInfo->id.queryId);
4,424,718✔
3548

3549
    qInfo("%s ext window mode:%s baseGrp:%" PRIu64 " grp:%" PRIu64 " got %" PRId64 " rows from downstream",
4,424,718✔
3550
        GET_TASKID(pTaskInfo), extWinModeStr(pExtW->mode), pBlock->info.id.baseGId, pBlock->info.id.groupId, pBlock->info.rows);
3551

3552
    // Fallback for non-stream grouped external-window:
3553
    // 1) if outer calc is partitioned and downstream only carries groupId,
3554
    //    treat groupId as the corresponding trigger-group id;
3555
    // 2) otherwise, if there is exactly one trigger group from subquery,
3556
    //    use that singleton gid.
3557
    if (pBlock->info.id.baseGId == 0 && pTaskInfo->pStreamRuntimeInfo &&
4,424,718✔
3558
        pTaskInfo->pStreamRuntimeInfo->funcInfo.isMultiGroupCalc &&
313,908✔
3559
        pTaskInfo->pStreamRuntimeInfo->funcInfo.pGroupCalcInfos) {
40,804✔
3560
      if (pExtW->calcWithPartition && pBlock->info.id.groupId != 0 &&
81,608✔
3561
          tSimpleHashGet(pTaskInfo->pStreamRuntimeInfo->funcInfo.pGroupCalcInfos,
40,804✔
3562
                         &pBlock->info.id.groupId, sizeof(pBlock->info.id.groupId)) != NULL) {
40,804✔
3563
        pBlock->info.id.baseGId = pBlock->info.id.groupId;
28,280✔
3564
        qDebug("%s extWin fallback baseGId <- matched groupId %" PRIu64,
28,280✔
3565
               GET_TASKID(pTaskInfo), pBlock->info.id.groupId);
3566
      } else {
3567
        int32_t __iter = 0;
12,524✔
3568
        int32_t __size = tSimpleHashGetSize(pTaskInfo->pStreamRuntimeInfo->funcInfo.pGroupCalcInfos);
12,524✔
3569
        if (__size == 1) {
12,524✔
3570
          SSTriggerGroupCalcInfo* __one = tSimpleHashIterate(pTaskInfo->pStreamRuntimeInfo->funcInfo.pGroupCalcInfos, NULL, &__iter);
4,848✔
3571
          if (__one) {
4,848✔
3572
            uint64_t __gid = *(uint64_t*)tSimpleHashGetKey(__one, NULL);
4,848✔
3573
            pBlock->info.id.baseGId = __gid;
4,848✔
3574
            qDebug("%s extWin fallback baseGId set to single gid %" PRIu64, GET_TASKID(pTaskInfo), __gid);
4,848✔
3575
          }
3576
        }
3577
      }
3578
    }
3579

3580
    // Equality gating in partitioned multi-group mode:
3581
    // process the block only when baseGId == groupId; otherwise skip it.
3582
    if (pTaskInfo->pStreamRuntimeInfo &&
4,424,718✔
3583
        pTaskInfo->pStreamRuntimeInfo->funcInfo.isMultiGroupCalc &&
327,240✔
3584
        pExtW->calcWithPartition &&
54,136✔
3585
        pBlock->info.id.groupId != 0 && pBlock->info.id.baseGId == 0) {
54,136✔
3586
      qDebug("%s skip block: no matched trigger group for groupId %" PRIu64,
7,676✔
3587
             GET_TASKID(pTaskInfo), pBlock->info.id.groupId);
3588
      continue;
7,676✔
3589
    }
3590

3591
    if (pTaskInfo->pStreamRuntimeInfo &&
4,417,042✔
3592
        pTaskInfo->pStreamRuntimeInfo->funcInfo.isMultiGroupCalc &&
319,564✔
3593
        pExtW->calcWithPartition &&
46,460✔
3594
        pBlock->info.id.groupId != 0 && pBlock->info.id.baseGId != 0 &&
46,460✔
3595
        pBlock->info.id.groupId != pBlock->info.id.baseGId) {
46,460✔
3596
      qDebug("%s skip block: baseGId %" PRIu64 " != groupId %" PRIu64,
17,372✔
3597
             GET_TASKID(pTaskInfo), pBlock->info.id.baseGId, pBlock->info.id.groupId);
3598
      continue;
17,372✔
3599
    }
3600

3601
    TAOS_CHECK_EXIT(extWinSwitchInitCtxs(pExtW, pTaskInfo, &pBlock->info.id));
4,399,670✔
3602

3603
    // Reset block-local traversal state for each new input block, but preserve
3604
    // blkWinStartIdx so a window spanning multiple blocks can continue from the
3605
    // same logical window on the next block.
3606
    extWinResetBlockCalcState(pExtW->pTGrpCtx ? pExtW->pTGrpCtx->pCCtx : NULL);
4,399,670✔
3607

3608
    switch (pExtW->mode) {
4,399,670✔
3609
      case EEXT_MODE_SCALAR:
34,744✔
3610
        TAOS_CHECK_EXIT(extWinProjectOpen(pOperator, pBlock));
34,744✔
3611
        if (extWinNonAggGotResBlock(pOperator, pExtW)) {
34,744✔
3612
          goto _exit;
2,424✔
3613
        }
3614
        break;
32,320✔
3615
      case EEXT_MODE_AGG:
4,364,926✔
3616
        TAOS_CHECK_EXIT(extWinAggOpen(pOperator, pBlock));
4,364,926✔
3617
        break;
4,364,926✔
3618
      case EEXT_MODE_INDEFR_FUNC:
×
3619
        TAOS_CHECK_EXIT(extWinIndefRowsOpen(pOperator, pBlock));
×
NEW
3620
        if (extWinNonAggGotResBlock(pOperator, pExtW)) {
×
NEW
3621
          goto _exit;
×
3622
        }
3623
        break;
×
3624
      default:
×
3625
        break;
×
3626
    }
3627
  }
3628

3629
  if (pOperator->pOperatorGetParam) {
1,443,535✔
3630
    freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
×
3631
    pOperator->pOperatorGetParam = NULL;
×
3632
  }
3633
  OPTR_SET_OPENED(pOperator);
1,443,535✔
3634

3635
  extWinPrepareForOutput(pOperator, pExtW);
1,443,535✔
3636

3637
_exit:
1,445,959✔
3638

3639
  extWinEndClearCtxs(pExtW, pTaskInfo);
1,445,959✔
3640

3641
  if (code != 0) {
1,445,959✔
3642
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
3643
    pTaskInfo->code = code;
×
3644
    T_LONG_JMP(pTaskInfo->env, code);
×
3645
  }
3646

3647
  return code;
1,445,959✔
3648
}
3649

3650
static int32_t extWinNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
5,693,643✔
3651
  int32_t                  code = 0;
5,693,643✔
3652
  int32_t                  lino = 0;
5,693,643✔
3653
  SExternalWindowOperator* pExtW = pOperator->info;
5,693,643✔
3654
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
5,693,643✔
3655
  if (pOperator->status == OP_EXEC_DONE && !pOperator->pOperatorGetParam) {
5,694,047✔
UNCOV
3656
    *ppRes = NULL;
×
3657
    return code;
×
3658
  }
3659

3660
  if (pOperator->pOperatorGetParam) {
5,694,047✔
3661
    if (pOperator->status == OP_EXEC_DONE) {
1,088,851✔
3662
      pOperator->status = OP_NOT_OPENED;
50,433✔
3663
    }
3664
  }
3665

3666
  extWinRecycleBlkNode(pExtW, &pExtW->pLastBlkNode);
5,694,047✔
3667

3668
  while (1) {
3669
    if (pOperator->status == OP_NOT_OPENED) {
5,694,047✔
3670
      TAOS_CHECK_EXIT(pOperator->fpSet._openFn(pOperator));
1,447,199✔
3671
    }
3672

3673
    if (pExtW->mode == EEXT_MODE_SCALAR || pExtW->mode == EEXT_MODE_INDEFR_FUNC) {
5,692,807✔
3674
      TAOS_CHECK_EXIT(extWinNonAggOutputRes(pOperator, ppRes));
4,139,788✔
3675
      if (NULL == *ppRes) {
4,139,788✔
3676
        setOperatorCompleted(pOperator);
49,692✔
3677
        extWinFreeResultRow(pExtW);
49,692✔
3678
      }
3679
    } else {
3680
      TAOS_CHECK_EXIT(extWinAggOutputRes(pOperator, ppRes));
1,553,019✔
3681
      if (NULL != *ppRes) {
1,553,019✔
3682
        break;
1,247,595✔
3683
      }
3684

3685
      if (pExtW->pTGrpCtx == NULL || pExtW->pTGrpCtx->pCCtx == NULL ||
305,424✔
3686
          (pExtW->isDynWindow ? (pExtW->pTGrpCtx->pCCtx->outWinIdx >= taosArrayGetSize(pExtW->pTGrpCtx->pCCtx->pWins))
81,204✔
3687
                              : (pExtW->pTGrpCtx->pCCtx->outWinNum >= pExtW->pTGrpCtx->pCCtx->outWinTotalNum))) {
81,204✔
3688
        setOperatorCompleted(pOperator);
305,424✔
3689
        if (pTaskInfo->pStreamRuntimeInfo) {
305,424✔
3690
          extWinFreeResultRow(pExtW);
305,424✔
3691
        }
3692
        break;
305,424✔
3693
      }
3694
    }
3695
    break;
4,139,788✔
3696
  }
3697

3698
  if (*ppRes && (*ppRes)->info.rows > 0) {
5,692,807✔
3699
    qDebug("%s tgrp %" PRIu64 " cgrp %" PRIu64 " ext window return block with %" PRId64 " rows", 
5,337,691✔
3700
      GET_TASKID(pTaskInfo), pExtW->lastTGrpId, pExtW->lastCGrpId, (*ppRes)->info.rows);
3701
        
3702
    pOperator->resultInfo.totalRows += (*ppRes)->info.rows;
5,337,691✔
3703
    printDataBlock(*ppRes, __func__, GET_TASKID(pTaskInfo), pTaskInfo->id.queryId);
5,337,691✔
3704
  }
3705

3706
_exit:
355,116✔
3707

3708
  if (code) {
5,692,807✔
3709
    qError("%s %s failed at line %d since %s", GET_TASKID(pTaskInfo), __func__, lino, tstrerror(code));
×
3710
    pTaskInfo->code = code;
×
3711
    T_LONG_JMP(pTaskInfo->env, code);
×
3712
  }
3713

3714
  if ((*ppRes) && (*ppRes)->info.rows <= 0) {
5,692,807✔
3715
    *ppRes = NULL;
×
3716
  }
3717

3718
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM && (*ppRes)) {
5,692,807✔
UNCOV
3719
    printDataBlock(*ppRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo), pTaskInfo->id.queryId);
×
3720
  }
3721

3722
  return code;
5,692,807✔
3723
}
3724

3725
static int32_t extWinInitNonStreamWindowDataFromBlock(SExternalWindowPhysiNode* pPhynode, SExecTaskInfo* pTaskInfo, STimeWindow* pTimeRange);
3726
static int32_t extWinValidateNonStreamBlock(SSDataBlock* pBlock, SColumnInfoData** ppStartCol,
3727
                                            SColumnInfoData** ppEndCol, int32_t* pNumRows, int32_t* pNumCols);
3728
static int32_t extWinCheckMonotonicWstart(bool hasPrevStart, int64_t prevStart, int64_t currStart, int32_t row);
3729
static int32_t extWinBuildExternalWindowDataForRow(SSDataBlock* pBlock, int32_t numCols, int32_t row,
3730
                                                   SArray** ppExternalData);
3731
static int32_t extWinBuildTriggerParamForRow(SSDataBlock* pBlock, SColumnInfoData* pStartCol, SColumnInfoData* pEndCol,
3732
                                             int32_t numCols, int32_t row, SSTriggerCalcParam* pParam);
3733

3734
int32_t createExternalWindowOperator(SOperatorInfo* pDownstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo,
1,396,406✔
3735
                                     SOperatorInfo** pOptrOut) {
3736
  SExternalWindowPhysiNode* pPhynode = (SExternalWindowPhysiNode*)pNode;
1,396,406✔
3737
  QRY_PARAM_CHECK(pOptrOut);
1,396,406✔
3738
  int32_t                  code = 0;
1,396,002✔
3739
  int32_t                  lino = 0;
1,396,002✔
3740
  bool                     isInStream = true;
1,396,002✔
3741
  STimeWindow              nonStreamExtWinRange = {.skey = INT64_MAX, .ekey = INT64_MIN};
1,396,002✔
3742
  SExternalWindowOperator* pExtW = taosMemoryCalloc(1, sizeof(SExternalWindowOperator));
1,396,002✔
3743
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1,395,598✔
3744
  pOperator->pPhyNode = pNode;
1,396,002✔
3745
  if (!pExtW || !pOperator) {
1,396,002✔
3746
    code = terrno;
404✔
3747
    lino = __LINE__;
×
3748
    goto _error;
×
3749
  }
3750
  initOperatorCostInfo(pOperator);
1,395,598✔
3751
  pExtW->pTaskInfo = pTaskInfo;
1,396,406✔
3752
  pExtW->needGroupSort = pPhynode->needGroupSort;
1,396,002✔
3753
  // In non-stream (batch) mode, temporarily disable calcWithPartition to tolerate
3754
  // upstream that does not provide distinct C-group ids (groupId==baseGId).
3755
  // Caller may decide to turn it back on once upstream is ready.
3756
  pExtW->calcWithPartition = pPhynode->calcWithPartition;
1,396,406✔
3757
  // pExtW->calcWithPartition = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) ? pPhynode->calcWithPartition : false;
3758
  pExtW->extWinSplit = pPhynode->extWinSplit;
1,396,406✔
3759
  
3760
  setOperatorInfo(pOperator, "ExternalWindowOperator", QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW, true, OP_NOT_OPENED,
1,395,598✔
3761
                  pExtW, pTaskInfo);
3762
                  
3763
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhynode->window.node.pOutputDataBlockDesc);
1,396,002✔
3764
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
1,396,406✔
3765
  initBasicInfo(&pExtW->binfo, pResBlock);
1,396,406✔
3766

3767
  pExtW->primaryTsIndex = ((SColumnNode*)pPhynode->window.pTspk)->slotId;
1,396,406✔
3768
  pExtW->mode = pPhynode->window.pProjs ? EEXT_MODE_SCALAR : (pPhynode->window.indefRowsFunc ? EEXT_MODE_INDEFR_FUNC : EEXT_MODE_AGG);
1,396,406✔
3769
  pExtW->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC;
1,396,406✔
3770
  pExtW->binfo.outputTsOrder = pExtW->binfo.inputTsOrder;
1,396,406✔
3771
  pExtW->isDynWindow = false;
1,396,406✔
3772

3773
  qDebug("%s create extWin operator, execModel:%d, phySubquery:%p", GET_TASKID(pTaskInfo), pTaskInfo->execModel,
1,396,406✔
3774
        pPhynode->pSubquery);
3775

3776
  if (pTaskInfo->pStreamRuntimeInfo != NULL){
1,396,406✔
UNCOV
3777
    pTaskInfo->pStreamRuntimeInfo->funcInfo.withExternalWindow = true;
×
3778
  }
3779
  
3780
  // pExtW->limitInfo = (SLimitInfo){0};
3781
  // initLimitInfo(pPhynode->window.node.pLimit, pPhynode->window.node.pSlimit, &pExtW->limitInfo);
3782

3783
  if (pTaskInfo->execModel != OPTR_EXEC_MODEL_STREAM) {
1,396,406✔
3784
    isInStream = false;
1,396,406✔
3785
    // If pre-init has not been performed, initialize from subquery blocks here.
3786
    if (pTaskInfo->pStreamRuntimeInfo == NULL ||
1,396,406✔
NEW
3787
        pTaskInfo->pStreamRuntimeInfo->funcInfo.pGroupCalcInfos == NULL) {
×
3788
      code = extWinInitNonStreamWindowDataFromBlock(pPhynode, pTaskInfo, &nonStreamExtWinRange);
1,396,406✔
3789
      if (code != TSDB_CODE_SUCCESS) {
1,396,406✔
3790
        lino = __LINE__;
808✔
3791
        goto _error;
808✔
3792
      }
3793
    }
3794
  }
3795

3796
  if (pPhynode->window.pProjs) {
1,395,598✔
3797
    int32_t    numOfScalarExpr = 0;
50,500✔
3798
    SExprInfo* pScalarExprInfo = NULL;
50,500✔
3799
    code = createExprInfo(pPhynode->window.pProjs, NULL, &pScalarExprInfo, &numOfScalarExpr);
50,500✔
3800
    QUERY_CHECK_CODE(code, lino, _error);
50,500✔
3801

3802
    code = initExprSupp(&pExtW->scalarSupp, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
50,500✔
3803
    QUERY_CHECK_CODE(code, lino, _error);
50,500✔
3804

3805
    pExtW->pOutputBlocks = taosArrayInit(STREAM_CALC_REQ_MAX_WIN_NUM, POINTER_BYTES);
50,500✔
3806
    if (!pExtW->pOutputBlocks) QUERY_CHECK_CODE(terrno, lino, _error);
50,500✔
3807

3808
    pExtW->pFreeBlocks = tdListNew(POINTER_BYTES * 2);
50,500✔
3809
    QUERY_CHECK_NULL(pExtW->pFreeBlocks, code, lino, _error, terrno);
50,500✔
3810
  } else if (pExtW->mode == EEXT_MODE_AGG) {
1,345,098✔
3811
    if (pPhynode->window.pExprs != NULL) {
1,345,098✔
UNCOV
3812
      int32_t    num = 0;
×
UNCOV
3813
      SExprInfo* pSExpr = NULL;
×
UNCOV
3814
      code = createExprInfo(pPhynode->window.pExprs, NULL, &pSExpr, &num);
×
UNCOV
3815
      QUERY_CHECK_CODE(code, lino, _error);
×
3816
    
UNCOV
3817
      code = initExprSupp(&pExtW->scalarSupp, pSExpr, num, &pTaskInfo->storageAPI.functionStore);
×
UNCOV
3818
      if (code != TSDB_CODE_SUCCESS) {
×
3819
        goto _error;
×
3820
      }
UNCOV
3821
      checkIndefRowsFuncs(&pExtW->scalarSupp);
×
3822
    }
3823
    
3824
    size_t keyBufSize = sizeof(int64_t) * 2 + POINTER_BYTES;
1,345,098✔
3825
    initResultSizeInfo(&pOperator->resultInfo, 4096);
1,345,098✔
3826
    //code = blockDataEnsureCapacity(pExtW->binfo.pRes, pOperator->resultInfo.capacity);
3827
    //QUERY_CHECK_CODE(code, lino, _error);
3828

3829
    pExtW->pWinRowIdx = taosArrayInit(4096, sizeof(int64_t));
1,345,098✔
3830
    TSDB_CHECK_NULL(pExtW->pWinRowIdx, code, lino, _error, terrno);
1,345,098✔
3831
    
3832
    int32_t num = 0;
1,345,098✔
3833
    SExprInfo* pExprInfo = NULL;
1,345,098✔
3834
    code = createExprInfo(pPhynode->window.pFuncs, NULL, &pExprInfo, &num);
1,345,098✔
3835
    QUERY_CHECK_CODE(code, lino, _error);
1,345,098✔
3836
    pOperator->exprSupp.hasWindow = true;
1,345,098✔
3837
    pOperator->exprSupp.hasWindowOrGroup = true;
1,345,098✔
3838
    code = initAggSup(&pOperator->exprSupp, &pExtW->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, 0, 0);
1,345,098✔
3839
    QUERY_CHECK_CODE(code, lino, _error);
1,345,098✔
3840

3841
    code = filterInitFromNode((SNode*)pNode->pConditions, &pOperator->exprSupp.pFilterInfo, 0,
1,344,694✔
3842
                              pTaskInfo->pStreamRuntimeInfo);
1,345,098✔
3843
    QUERY_CHECK_CODE(code, lino, _error);
1,344,694✔
3844

3845
    nodesWalkExprs(pPhynode->window.pFuncs, extWinHasCountLikeFunc, &pExtW->hasCountFunc);
1,344,694✔
3846
    if (pExtW->hasCountFunc && pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
1,345,098✔
UNCOV
3847
      code = extWinCreateEmptyInputBlock(pOperator, &pExtW->pEmptyInputBlock);
×
UNCOV
3848
      QUERY_CHECK_CODE(code, lino, _error);
×
UNCOV
3849
      qDebug("%s ext window has CountLikeFunc", pOperator->pTaskInfo->id.str);
×
3850
    } else {
3851
      qDebug("%s ext window have CountLikeFunc: %d. IsInStream:%d", pOperator->pTaskInfo->id.str, pExtW->hasCountFunc,
1,345,098✔
3852
             pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
3853
    }
3854

3855
    code = initExecTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pTaskInfo->window);
1,345,098✔
3856
    QUERY_CHECK_CODE(code, lino, _error);
1,345,098✔
3857
  } else {
UNCOV
3858
    size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
×
3859
    
3860
    if (pPhynode->window.pExprs != NULL) {
×
3861
      int32_t    num = 0;
×
3862
      SExprInfo* pSExpr = NULL;
×
3863
      code = createExprInfo(pPhynode->window.pExprs, NULL, &pSExpr, &num);
×
3864
      QUERY_CHECK_CODE(code, lino, _error);
×
3865
    
3866
      code = initExprSupp(&pExtW->scalarSupp, pSExpr, num, &pTaskInfo->storageAPI.functionStore);
×
3867
      if (code != TSDB_CODE_SUCCESS) {
×
3868
        goto _error;
×
3869
      }
3870
    }
3871
    
3872
    int32_t    numOfExpr = 0;
×
3873
    SExprInfo* pExprInfo = NULL;
×
3874
    code = createExprInfo(pPhynode->window.pFuncs, NULL, &pExprInfo, &numOfExpr);
×
3875
    TSDB_CHECK_CODE(code, lino, _error);
×
3876
    
3877
    code = initAggSup(&pOperator->exprSupp, &pExtW->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str,
×
3878
                              NULL, &pTaskInfo->storageAPI.functionStore);
3879
    TSDB_CHECK_CODE(code, lino, _error);
×
3880
    pOperator->exprSupp.hasWindowOrGroup = false;
×
3881
    
UNCOV
3882
    code = filterInitFromNode((SNode*)pNode->pConditions, &pOperator->exprSupp.pFilterInfo, 0,
×
UNCOV
3883
                              pTaskInfo->pStreamRuntimeInfo);
×
3884
    TSDB_CHECK_CODE(code, lino, _error);
×
3885
    
3886
    pExtW->binfo.inputTsOrder = pNode->inputTsOrder;
×
3887
    pExtW->binfo.outputTsOrder = pNode->outputTsOrder;
×
3888
    code = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfExpr, &pExtW->pPseudoColInfo);
×
3889
    TSDB_CHECK_CODE(code, lino, _error);
×
3890

NEW
3891
    pExtW->pOutputBlocks = taosArrayInit(STREAM_CALC_REQ_MAX_WIN_NUM, POINTER_BYTES);
×
3892
    if (!pExtW->pOutputBlocks) QUERY_CHECK_CODE(terrno, lino, _error);
×
3893

3894
    pExtW->pFreeBlocks = tdListNew(POINTER_BYTES * 2);
×
3895
    QUERY_CHECK_NULL(pExtW->pFreeBlocks, code, lino, _error, terrno);  
×
3896
  }
3897

3898
  pExtW->timeRangeExpr = (STimeRangeNode*)pPhynode->pTimeRange;
1,395,598✔
3899
  if (pExtW->timeRangeExpr) {
1,395,598✔
UNCOV
3900
    QUERY_CHECK_NULL(pExtW->timeRangeExpr->pStart, code, lino, _error, TSDB_CODE_STREAM_INTERNAL_ERROR);
×
UNCOV
3901
    QUERY_CHECK_NULL(pExtW->timeRangeExpr->pEnd, code, lino, _error, TSDB_CODE_STREAM_INTERNAL_ERROR);
×
3902
  }
3903

3904
  if (pPhynode->isSingleTable) {
1,395,598✔
3905
    if (!isInStream) {
806,726✔
3906
      pExtW->getWinFp = extWinGetOvlpWin;
806,726✔
3907
      pExtW->multiTableMode = false;
806,726✔
3908
    } else {
NEW
3909
      pExtW->getWinFp =
×
NEW
3910
          (pExtW->timeRangeExpr && (pExtW->timeRangeExpr->needCalc ||
×
NEW
3911
                                    (pTaskInfo->pStreamRuntimeInfo->funcInfo.addOptions & CALC_SLIDING_OVERLAP)))
×
3912
              ? extWinGetOvlpWin
NEW
3913
              : extWinGetNoOvlpWin;
×
NEW
3914
      pExtW->multiTableMode = false;
×
3915
    }
3916
  } else {
3917
    if (!isInStream) {
588,872✔
3918
      pExtW->getWinFp = extWinGetMultiTbOvlpWin;
588,872✔
3919
      pExtW->multiTableMode = true;
588,872✔
3920
    } else {
NEW
3921
      pExtW->getWinFp =
×
NEW
3922
          (pExtW->timeRangeExpr && (pExtW->timeRangeExpr->needCalc ||
×
NEW
3923
                                    (pTaskInfo->pStreamRuntimeInfo->funcInfo.addOptions & CALC_SLIDING_OVERLAP)))
×
3924
              ? extWinGetMultiTbOvlpWin
NEW
3925
              : extWinGetMultiTbNoOvlpWin;
×
NEW
3926
      pExtW->multiTableMode = true;
×
3927
    }
3928
  }
3929
  pExtW->inputHasOrder = pPhynode->inputHasOrder;
1,395,598✔
3930
  pExtW->orgTableUid = pPhynode->orgTableUid;
1,395,598✔
3931
  pExtW->orgTableVgId = pPhynode->orgTableVgId;
1,395,598✔
3932

3933
  code = extWinInitResRows(pExtW, pTaskInfo);
1,395,598✔
3934
  TSDB_CHECK_CODE(code, lino, _error);
1,395,598✔
3935

3936
  pOperator->fpSet = createOperatorFpSet(extWinOpen, extWinNext, NULL, destroyExternalWindowOperatorInfo,
1,395,598✔
3937
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
3938
  setOperatorResetStateFn(pOperator, resetExternalWindowOperator);
1,395,598✔
3939
  code = appendDownstream(pOperator, &pDownstream, 1);
1,395,598✔
3940
  if (code != 0) {
1,395,598✔
3941
    goto _error;
×
3942
  }
3943

3944
  if (!isInStream) {
1,395,598✔
3945
    code = extWinApplyNonStreamTimeRangeToDownstream(pOperator, &nonStreamExtWinRange);
1,395,598✔
3946
    QUERY_CHECK_CODE(code, lino, _error);
1,395,598✔
3947
  }
3948

3949
  *pOptrOut = pOperator;
1,395,598✔
3950

3951
  qDebug("%s extWin operator created, mode:%s, multiTableMnode:%d, inputHasOrder:%d, hasTimeRangeExpr:%d, timeRangeNeedCalc:%d "
1,395,598✔
3952
    "needGroupSort:%d, extWinSplit:%d, calcWithPartition:%d",
3953
    GET_TASKID(pTaskInfo), extWinModeStr(pExtW->mode), pExtW->multiTableMode, pExtW->inputHasOrder, pExtW->timeRangeExpr ? 1 : 0,
3954
    pExtW->timeRangeExpr ? pExtW->timeRangeExpr->needCalc : -1, pExtW->needGroupSort, pExtW->extWinSplit, pExtW->calcWithPartition);
3955
  
3956
  return code;
1,395,598✔
3957

3958
_error:
808✔
3959

3960
  if (pExtW != NULL) {
808✔
3961
    destroyExternalWindowOperatorInfo(pExtW);
808✔
3962
  }
3963

3964
  destroyOperatorAndDownstreams(pOperator, &pDownstream, 1);
808✔
3965
  pTaskInfo->code = code;
808✔
3966
  qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code));
808✔
3967
  return code;
808✔
3968
}
3969

3970
// Pre-initialize external-window runtime from subquery results before building children.
3971
// This enables scan operators to see isMultiGroupCalc/curGrpRead and build tablelist with baseGId.
NEW
3972
int32_t extWinPreInitFromSubquery(SPhysiNode* pNode, SExecTaskInfo* pTaskInfo) {
×
NEW
3973
  if (pNode == NULL || pTaskInfo == NULL) {
×
NEW
3974
    return TSDB_CODE_INVALID_PARA;
×
3975
  }
NEW
3976
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
×
NEW
3977
    return TSDB_CODE_SUCCESS;  // stream path unaffected
×
3978
  }
3979
  // If already initialized, skip.
NEW
3980
  if (pTaskInfo->pStreamRuntimeInfo &&
×
NEW
3981
      pTaskInfo->pStreamRuntimeInfo->funcInfo.pGroupCalcInfos != NULL) {
×
NEW
3982
    return TSDB_CODE_SUCCESS;
×
3983
  }
NEW
3984
  SExternalWindowPhysiNode* pPhynode = (SExternalWindowPhysiNode*)pNode;
×
NEW
3985
  STimeWindow tmpRange = {.skey = INT64_MAX, .ekey = INT64_MIN};
×
NEW
3986
  return extWinInitNonStreamWindowDataFromBlock(pPhynode, pTaskInfo, &tmpRange);
×
3987
}
3988

3989
static int32_t extWinValidateNonStreamBlock(SSDataBlock* pBlock, SColumnInfoData** ppStartCol,
499,748✔
3990
                                            SColumnInfoData** ppEndCol, int32_t* pNumRows, int32_t* pNumCols) {
3991
  int32_t code = TSDB_CODE_SUCCESS;
499,748✔
3992
  int32_t lino = 0;
499,748✔
3993

3994
  TSDB_CHECK_NULL(pBlock, code, lino, _exit, TSDB_CODE_INVALID_PARA);
499,748✔
3995
  TSDB_CHECK_NULL(pBlock->pDataBlock, code, lino, _exit, TSDB_CODE_INVALID_PARA);
499,748✔
3996
  TSDB_CHECK_NULL(ppStartCol, code, lino, _exit, TSDB_CODE_INVALID_PARA);
499,748✔
3997
  TSDB_CHECK_NULL(ppEndCol, code, lino, _exit, TSDB_CODE_INVALID_PARA);
499,748✔
3998
  TSDB_CHECK_NULL(pNumRows, code, lino, _exit, TSDB_CODE_INVALID_PARA);
499,748✔
3999
  TSDB_CHECK_NULL(pNumCols, code, lino, _exit, TSDB_CODE_INVALID_PARA);
499,748✔
4000

4001
  int32_t numRows = pBlock->info.rows;
499,748✔
4002
  int32_t numCols = taosArrayGetSize(pBlock->pDataBlock);
499,748✔
4003
  if (numCols < 2) {
499,748✔
NEW
4004
    qError("%s invalid external-window block: expected at least 2 columns, got %d", __func__, numCols);
×
NEW
4005
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_PARA);
×
4006
  }
4007

4008
  SColumnInfoData* pStartCol = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, 0);
499,748✔
4009
  SColumnInfoData* pEndCol = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, 1);
499,748✔
4010
  TSDB_CHECK_NULL(pStartCol, code, lino, _exit, TSDB_CODE_INVALID_PARA);
499,748✔
4011
  TSDB_CHECK_NULL(pEndCol, code, lino, _exit, TSDB_CODE_INVALID_PARA);
499,748✔
4012
  if (pStartCol == NULL || pEndCol == NULL || pStartCol->info.type != TSDB_DATA_TYPE_TIMESTAMP ||
499,748✔
4013
      pEndCol->info.type != TSDB_DATA_TYPE_TIMESTAMP) {
499,748✔
NEW
4014
    qError("%s invalid external-window block: first two columns must be timestamp, got (%d, %d)", __func__,
×
4015
           pStartCol ? pStartCol->info.type : -1, pEndCol ? pEndCol->info.type : -1);
NEW
4016
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_PARA);
×
4017
  }
4018
  *ppStartCol = pStartCol;
499,748✔
4019
  *ppEndCol = pEndCol;
499,748✔
4020
  *pNumRows = numRows;
499,748✔
4021
  *pNumCols = numCols;
499,748✔
4022

4023
_exit:
499,748✔
4024
  return code;
499,748✔
4025
}
4026

4027
static int32_t extWinCheckMonotonicWstart(bool hasPrevStart, int64_t prevStart, int64_t currStart, int32_t row) {
54,004,700✔
4028
  if (hasPrevStart && currStart < prevStart) {
54,004,700✔
4029
    qError("%s invalid external-window block: wstart must be monotonic non-decreasing, row:%d, prev:%" PRId64
808✔
4030
           ", curr:%" PRId64,
4031
           __func__, row, prevStart, currStart);
4032
    return TSDB_CODE_EXT_WIN_SUB_UNORDERED;
808✔
4033
  }
4034

4035
  return TSDB_CODE_SUCCESS;
54,003,892✔
4036
}
4037

NEW
4038
static void extWinDestroyExternalDataValue(void* ptr) {
×
NEW
4039
  SValue* pVal = (SValue*)ptr;
×
NEW
4040
  if (pVal != NULL && (IS_VAR_DATA_TYPE(pVal->type) || pVal->type == TSDB_DATA_TYPE_DECIMAL)) {
×
NEW
4041
    valueClearDatum(pVal, pVal->type);
×
4042
  }
NEW
4043
}
×
4044

4045
static int32_t extWinBuildExternalWindowDataForRow(SSDataBlock* pBlock, int32_t numCols, int32_t row,
53,989,348✔
4046
                                                   SArray** ppExternalData) {
4047
  int32_t code = TSDB_CODE_SUCCESS;
53,989,348✔
4048
  int32_t lino = 0;
53,989,348✔
4049
  SArray* pExternalData = taosArrayInit(numCols - 2, sizeof(SStreamGroupValue));
53,989,348✔
4050
  TSDB_CHECK_NULL(pBlock, code, lino, _exit, TSDB_CODE_INVALID_PARA);
54,102,064✔
4051
  TSDB_CHECK_NULL(pBlock->pDataBlock, code, lino, _exit, TSDB_CODE_INVALID_PARA);
54,102,064✔
4052
  TSDB_CHECK_NULL(ppExternalData, code, lino, _exit, TSDB_CODE_INVALID_PARA);
54,552,524✔
4053
  TSDB_CHECK_NULL(pExternalData, code, lino, _exit, terrno);
54,552,524✔
4054

4055
  for (int32_t col = 2; col < numCols; ++col) {
108,255,840✔
4056
    SColumnInfoData* pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, col);
53,079,944✔
4057
    if (pColData == NULL) {
53,534,444✔
NEW
4058
      continue;
×
4059
    }
4060

4061
    SStreamGroupValue data = {0};
53,534,444✔
4062
    SValue* pVal = &data.data;
53,662,512✔
4063

4064
    pVal->type = pColData->info.type;
53,662,512✔
4065

4066
    if (!colDataIsNull_s(pColData, row)) {
106,660,848✔
4067
      void* pData = colDataGetData(pColData, row);
53,485,560✔
4068
      if (IS_VAR_DATA_TYPE(pVal->type) || pVal->type == TSDB_DATA_TYPE_DECIMAL) {
52,229,120✔
NEW
4069
        int32_t datumLen = IS_VAR_DATA_TYPE(pVal->type) ? calcStrBytesByType(pVal->type, (char*)pData) : pColData->info.bytes;
×
4070
        void* pDataCopy = taosMemoryMalloc(datumLen);
64,640✔
4071
        TSDB_CHECK_NULL(pDataCopy, code, lino, _exit, terrno);
64,640✔
4072
        memcpy(pDataCopy, pData, datumLen);
64,640✔
4073
        valueSetDatum(pVal, pVal->type, pDataCopy, datumLen);
64,640✔
4074
      } else {
4075
        valueSetDatum(pVal, pVal->type, pData, pColData->info.bytes);
53,746,140✔
4076
      }
4077
    } else {
NEW
4078
      data.isNull = true;
×
4079
    }
4080

4081
    TSDB_CHECK_NULL(taosArrayPush(pExternalData, &data), code, lino, _exit, terrno);
53,654,028✔
4082
  }
4083

4084
  *ppExternalData = pExternalData;
55,175,896✔
4085

4086
_exit:
54,052,372✔
4087
  if (code) {
54,052,372✔
NEW
4088
    taosArrayDestroyEx(pExternalData, extWinDestroyExternalDataValue);
×
4089
  }
4090
  return code;
54,826,436✔
4091
}
4092

4093
static int32_t extWinBuildTriggerParamForRow(SSDataBlock* pBlock, SColumnInfoData* pStartCol, SColumnInfoData* pEndCol,
54,110,144✔
4094
                                             int32_t numCols, int32_t row, SSTriggerCalcParam* pParam) {
4095
  int32_t code = TSDB_CODE_SUCCESS;
54,110,144✔
4096
  int32_t lino = 0;
54,110,144✔
4097
  TSDB_CHECK_NULL(pBlock, code, lino, _exit, TSDB_CODE_INVALID_PARA);
54,110,144✔
4098
  TSDB_CHECK_NULL(pStartCol, code, lino, _exit, TSDB_CODE_INVALID_PARA);
54,110,144✔
4099
  TSDB_CHECK_NULL(pEndCol, code, lino, _exit, TSDB_CODE_INVALID_PARA);
54,110,144✔
4100
  TSDB_CHECK_NULL(pParam, code, lino, _exit, TSDB_CODE_INVALID_PARA);
54,110,144✔
4101

4102
  *pParam = (SSTriggerCalcParam){0};
54,110,144✔
4103

4104
  if (!colDataIsNull_s(pStartCol, row)) {
109,174,940✔
4105
    pParam->wstart = *(int64_t*)colDataGetData(pStartCol, row);
54,200,236✔
4106
  }
4107

4108
  if (!colDataIsNull_s(pEndCol, row)) {
110,671,760✔
4109
    pParam->wend = *(int64_t*)colDataGetData(pEndCol, row);
54,272,552✔
4110
  }
4111

4112
  pParam->wduration = pParam->wend - pParam->wstart;
55,370,220✔
4113
  TAOS_CHECK_EXIT(extWinBuildExternalWindowDataForRow(pBlock, numCols, row, &pParam->pExternalWindowData));
54,959,352✔
4114

4115
_exit:
54,243,060✔
4116
  return code;
54,243,060✔
4117
}
4118

4119
#if defined(BUILD_TEST)
4120
static SArray* extWinGetSSDataBlocksInTest(SExternalWindowPhysiNode* pPhynode);
4121
#endif
4122

4123
static uint64_t extWinGetRemoteResultGroupId(const SSDataBlock* pBlock) {
688,012✔
4124
  if (pBlock == NULL) {
688,012✔
NEW
4125
    return 0;
×
4126
  }
4127

4128
  return (pBlock->info.id.baseGId != 0) ? pBlock->info.id.baseGId : pBlock->info.id.groupId;
688,012✔
4129
}
4130

4131
static bool hasGroupedRemoteResult(SArray* pBlocks) {
186,648✔
4132
  if (pBlocks == NULL) {
186,648✔
NEW
4133
    return false;
×
4134
  }
4135

4136
  int32_t blockNum = taosArrayGetSize(pBlocks);
186,648✔
4137
  for (int32_t blockIdx = 0; blockIdx < blockNum; ++blockIdx) {
304,616✔
4138
    SSDataBlock** ppOne = taosArrayGet(pBlocks, blockIdx);
188,264✔
4139
    if (ppOne == NULL || *ppOne == NULL) {
188,264✔
NEW
4140
      continue;
×
4141
    }
4142

4143
    if (extWinGetRemoteResultGroupId(*ppOne) != 0) {
188,264✔
4144
      return true;
70,296✔
4145
    }
4146
  }
4147

4148
  return false;
116,352✔
4149
}
4150

4151
static int32_t extWinAppendNonGroupedCalcParam(SStreamRuntimeFuncInfo* pRt, SSTriggerCalcParam* pParam,
20,744,188✔
4152
                                               bool* pHasPrevStart, int64_t* pPrevStart, int32_t row) {
4153
  int32_t code = TSDB_CODE_SUCCESS;
20,744,188✔
4154

4155
  code = extWinCheckMonotonicWstart(*pHasPrevStart, *pPrevStart, pParam->wstart, row);
20,744,188✔
4156
  if (code != TSDB_CODE_SUCCESS) {
22,569,864✔
4157
    return code;
808✔
4158
  }
4159

4160
  *pPrevStart = pParam->wstart;
22,569,056✔
4161
  *pHasPrevStart = true;
22,569,056✔
4162

4163
  void* pRet = taosArrayPush(pRt->pStreamPesudoFuncVals, pParam);
22,569,460✔
4164
  if (pRet == NULL) {
22,562,592✔
NEW
4165
    return terrno;
×
4166
  }
4167

4168
  return TSDB_CODE_SUCCESS;
22,562,592✔
4169
}
4170

4171
static int32_t extWinGetOrCreateGroupedCalcInfo(SStreamRuntimeFuncInfo* pRt, uint64_t groupId,
31,518,060✔
4172
                                                SSTriggerGroupCalcInfo** ppGroupInfo) {
4173
  int32_t code = TSDB_CODE_SUCCESS;
31,518,060✔
4174
  int32_t lino = 0;
31,518,060✔
4175
  SSTriggerGroupCalcInfo* pGroupInfo = tSimpleHashGet(pRt->pGroupCalcInfos, &groupId, sizeof(groupId));
31,518,060✔
4176

4177
  if (pGroupInfo == NULL) {
32,365,652✔
4178
    SSTriggerGroupCalcInfo info = {0};
139,380✔
4179
    info.pParams = taosArrayInit(4, sizeof(SSTriggerCalcParam));
139,380✔
4180
    TSDB_CHECK_NULL(info.pParams, code, lino, _exit, terrno);
139,380✔
4181

4182
    TAOS_CHECK_EXIT(tSimpleHashPut(pRt->pGroupCalcInfos, &groupId, sizeof(groupId), &info, sizeof(info)));
139,380✔
4183

4184
    pGroupInfo = tSimpleHashGet(pRt->pGroupCalcInfos, &groupId, sizeof(groupId));
139,380✔
4185
    TSDB_CHECK_NULL(pGroupInfo, code, lino, _exit, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
139,380✔
4186
  }
4187

4188
  *ppGroupInfo = pGroupInfo;
32,355,552✔
4189

4190
_exit:
31,519,272✔
4191
  return code;
31,519,272✔
4192
}
4193

4194
static int32_t extWinAppendGroupedCalcParam(SStreamRuntimeFuncInfo* pRt, uint64_t groupId, SSTriggerCalcParam* pParam,
31,511,596✔
4195
                                            int32_t row) {
4196
  int32_t code = TSDB_CODE_SUCCESS;
31,511,596✔
4197
  int32_t lino = 0;
31,511,596✔
4198
  SSTriggerGroupCalcInfo* pGroupInfo = NULL;
31,511,596✔
4199
  bool groupHasPrevStart = false;
31,807,728✔
4200
  int64_t groupPrevStart = 0;
31,807,728✔
4201

4202
  TAOS_CHECK_EXIT(extWinGetOrCreateGroupedCalcInfo(pRt, groupId, &pGroupInfo));
31,807,728✔
4203

4204
  groupHasPrevStart = (taosArrayGetSize(pGroupInfo->pParams) > 0);
31,517,252✔
4205
  if (groupHasPrevStart) {
31,631,180✔
4206
    SSTriggerCalcParam* pLast = taosArrayGetLast(pGroupInfo->pParams);
32,117,192✔
4207
    TSDB_CHECK_NULL(pLast, code, lino, _exit, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
27,179,100✔
4208
    groupPrevStart = pLast->wstart;
27,179,100✔
4209
  }
4210

4211
  code = extWinCheckMonotonicWstart(groupHasPrevStart, groupPrevStart, pParam->wstart, row);
31,008,616✔
4212
  if (code != TSDB_CODE_SUCCESS) {
32,062,652✔
NEW
4213
    TAOS_CHECK_EXIT(code);
×
4214
  }
4215

4216
  TSDB_CHECK_NULL(taosArrayPush(pGroupInfo->pParams, pParam), code, lino, _exit, terrno);
63,484,560✔
4217

4218
_exit:
31,421,908✔
4219
  return code;
31,573,408✔
4220
}
4221

4222
static int32_t extWinBuildNonGroupedCalcInfosFromBlocks(SArray* pBlocks, SStreamRuntimeFuncInfo* pRt,
329,260✔
4223
                                                        STimeWindow* pExtWinTimeRange) {
4224
  int32_t code = TSDB_CODE_SUCCESS;
329,260✔
4225
  int32_t lino = 0;
329,260✔
4226
  SColumnInfoData* pStartCol = NULL;
329,260✔
4227
  SColumnInfoData* pEndCol = NULL;
329,260✔
4228
  int32_t numRows = 0;
329,260✔
4229
  int32_t numCols = 0;
329,260✔
4230
  bool hasPrevStart = false;
329,260✔
4231
  int64_t prevStart = 0;
329,260✔
4232
  uint64_t prevGroupId = UINT64_MAX;
329,260✔
4233
  int32_t blockNum = taosArrayGetSize(pBlocks);
329,260✔
4234

4235
  for (int32_t blockIdx = 0; blockIdx < blockNum; ++blockIdx) {
662,964✔
4236
    SSDataBlock** ppOne = taosArrayGet(pBlocks, blockIdx);
334,512✔
4237
    TSDB_CHECK_NULL(ppOne, code, lino, _exit, TSDB_CODE_INVALID_PARA);
334,512✔
4238
    SSDataBlock* pBlock = *ppOne;
334,512✔
4239
    TSDB_CHECK_NULL(pBlock, code, lino, _exit, TSDB_CODE_INVALID_PARA);
334,512✔
4240

4241
    TAOS_CHECK_EXIT(extWinValidateNonStreamBlock(pBlock, &pStartCol, &pEndCol, &numRows, &numCols));
334,512✔
4242

4243
    uint64_t groupId = extWinGetRemoteResultGroupId(pBlock);
334,512✔
4244
    if (prevGroupId != groupId) {
334,512✔
4245
      hasPrevStart = false;
326,836✔
4246
      prevGroupId = groupId;
326,836✔
4247
    }
4248

4249
    for (int32_t row = 0; row < numRows; ++row) {
22,897,104✔
4250
      SSTriggerCalcParam param = {0};
22,563,400✔
4251
      TAOS_CHECK_EXIT(extWinBuildTriggerParamForRow(pBlock, pStartCol, pEndCol, numCols, row, &param));
22,562,188✔
4252

4253
      pExtWinTimeRange->skey = TMIN(pExtWinTimeRange->skey, param.wstart);
22,559,764✔
4254
      pExtWinTimeRange->ekey = TMAX(pExtWinTimeRange->ekey, param.wend);
22,560,168✔
4255

4256
      code = extWinAppendNonGroupedCalcParam(pRt, &param, &hasPrevStart, &prevStart, row);
22,559,360✔
4257
      if (code != TSDB_CODE_SUCCESS) {
22,563,804✔
4258
        tDestroySSTriggerCalcParam(&param);
808✔
4259
        TAOS_CHECK_EXIT(code);
808✔
4260
      }
4261
    }
4262
  }
4263

4264
_exit:
328,856✔
4265
  return code;
329,260✔
4266
}
4267

4268
static int32_t extWinBuildGroupedCalcInfosFromBlocks(SArray* pBlocks, SStreamRuntimeFuncInfo* pRt,
70,296✔
4269
                                                     STimeWindow* pExtWinTimeRange) {
4270
  int32_t code = TSDB_CODE_SUCCESS;
70,296✔
4271
  int32_t lino = 0;
70,296✔
4272
  SColumnInfoData* pStartCol = NULL;
70,296✔
4273
  SColumnInfoData* pEndCol = NULL;
70,296✔
4274
  int32_t numRows = 0;
70,296✔
4275
  int32_t numCols = 0;
70,296✔
4276
  int32_t blockNum = taosArrayGetSize(pBlocks);
70,296✔
4277

4278
  for (int32_t blockIdx = 0; blockIdx < blockNum; ++blockIdx) {
235,532✔
4279
    SSDataBlock** ppOne = taosArrayGet(pBlocks, blockIdx);
165,236✔
4280
    TSDB_CHECK_NULL(ppOne, code, lino, _exit, TSDB_CODE_INVALID_PARA);
165,236✔
4281
    SSDataBlock* pBlock = *ppOne;
165,236✔
4282
    TSDB_CHECK_NULL(pBlock, code, lino, _exit, TSDB_CODE_INVALID_PARA);
165,236✔
4283

4284
    TAOS_CHECK_EXIT(extWinValidateNonStreamBlock(pBlock, &pStartCol, &pEndCol, &numRows, &numCols));
165,236✔
4285

4286
    uint64_t groupId = extWinGetRemoteResultGroupId(pBlock);
165,236✔
4287
    for (int32_t row = 0; row < numRows; ++row) {
32,392,720✔
4288
      SSTriggerCalcParam param = {0};
32,217,788✔
4289
      TAOS_CHECK_EXIT(extWinBuildTriggerParamForRow(pBlock, pStartCol, pEndCol, numCols, row, &param));
32,197,992✔
4290

4291
      pExtWinTimeRange->skey = TMIN(pExtWinTimeRange->skey, param.wstart);
31,690,568✔
4292
      pExtWinTimeRange->ekey = TMAX(pExtWinTimeRange->ekey, param.wend);
31,610,980✔
4293

4294
      code = extWinAppendGroupedCalcParam(pRt, groupId, &param, row);
32,176,176✔
4295
      if (code != TSDB_CODE_SUCCESS) {
32,093,356✔
NEW
4296
        tDestroySSTriggerCalcParam(&param);
×
NEW
4297
        TAOS_CHECK_EXIT(code);
×
4298
      }
4299
    }
4300
  }
4301

NEW
4302
_exit:
×
4303
  return code;
70,296✔
4304
}
4305

4306
static int32_t extWinInitNonStreamWindowDataFromBlock(SExternalWindowPhysiNode* pPhynode, SExecTaskInfo* pTaskInfo, STimeWindow* pTimeRange) {
1,439,230✔
4307
  int32_t code = TSDB_CODE_SUCCESS;
1,439,230✔
4308
  int32_t     lino = 0;
1,439,230✔
4309
  SArray*     pBlocks = NULL;
1,439,230✔
4310
  STimeWindow extWinTimeRange = {.skey = INT64_MAX, .ekey = INT64_MIN};
1,439,230✔
4311

4312
  if (NULL == pPhynode->pSubquery || nodeType(pPhynode->pSubquery) != QUERY_NODE_REMOTE_TABLE) {
1,439,230✔
4313
    qDebug("invalid subquery in external window, pSubquery:%p, type:%d", pPhynode->pSubquery,
1,039,674✔
4314
           pPhynode->pSubquery ? nodeType(pPhynode->pSubquery) : -1);
4315
    return TSDB_CODE_SUCCESS;
1,039,674✔
4316
  }
4317
  TSDB_CHECK_NULL(pTaskInfo, code, lino, _exit, TSDB_CODE_INVALID_PARA);
399,556✔
4318

4319
  if (pTaskInfo->pStreamRuntimeInfo == NULL) {
399,556✔
4320
    pTaskInfo->pStreamRuntimeInfo = (SStreamRuntimeInfo*)taosMemoryCalloc(1, sizeof(SStreamRuntimeInfo));
399,556✔
4321
    TSDB_CHECK_NULL(pTaskInfo->pStreamRuntimeInfo, code, lino, _exit, terrno);
399,556✔
4322
    pTaskInfo->ownStreamRtInfo = true;
399,556✔
4323
  }
4324

4325
  // Initialize basic runtime function parameters.
4326
  SStreamRuntimeFuncInfo* pRt = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
399,556✔
4327
  pRt->withExternalWindow = true;
399,556✔
4328
  pRt->isWindowTrigger = true;
399,556✔
4329
  pRt->triggerType = STREAM_TRIGGER_SESSION;
399,556✔
4330
  pRt->precision = 0;
399,556✔
4331
  pRt->curIdx = 0;
399,556✔
4332

4333
#if defined(BUILD_TEST)
4334
  // Test-only path:
4335
  // 1) route by subquery source db name and build mock external-window blocks;
4336
  // 2) non-test db names are treated as unsupported since production interface is not implemented yet.
4337
  pBlocks = extWinGetSSDataBlocksInTest(pPhynode);
4338
  TSDB_CHECK_NULL(pBlocks, code, lino, _exit, terrno);
4339
#else
4340
  // get the block with external window values from subquery, for now just return error since this code
4341
  // path is only for non-stream query which is not supported yet.
4342
  
4343
  SRemoteTableNode* pRemote = (SRemoteTableNode*)pPhynode->pSubquery;
399,556✔
4344
  TAOS_CHECK_EXIT(qFetchRemoteNode(gTaskScalarExtra.pSubJobCtx, pRemote->subQIdx, (SNode*)pRemote));
399,556✔
4345

4346
  pBlocks = pRemote->pResBlks;
399,556✔
4347
#endif
4348
  // For non-stream external_window, whether trigger windows should be built as
4349
  // grouped calc infos must follow the OUTER query partition semantics instead of
4350
  // raw remote block ids. Remote subquery result blocks may carry non-zero ids
4351
  // even when the subquery is semantically non-grouped, which would incorrectly
4352
  // split one logical external window into multiple TGrps.
4353
  bool groupedRemoteResult = pPhynode->calcWithPartition && hasGroupedRemoteResult(pBlocks);
399,556✔
4354
  pRt->isMultiGroupCalc = groupedRemoteResult ? 1 : 0;
399,556✔
4355
  pRt->curGrpCalc = NULL;
399,556✔
4356
  pRt->groupId = 0;
399,556✔
4357

4358
  // Initialize/reset pseudo function values.
4359
  if (pRt->pStreamPesudoFuncVals == NULL) {
399,556✔
4360
    pRt->pStreamPesudoFuncVals = taosArrayInit(4, sizeof(SSTriggerCalcParam));
399,556✔
4361
    TSDB_CHECK_NULL(pRt->pStreamPesudoFuncVals, code, lino, _exit, terrno);
399,556✔
4362
  } else {
NEW
4363
    taosArrayClearEx(pRt->pStreamPesudoFuncVals, tDestroySSTriggerCalcParam);
×
4364
  }
4365

4366
  if (pRt->pGroupCalcInfos != NULL) {
399,556✔
NEW
4367
    tSimpleHashCleanup(pRt->pGroupCalcInfos);
×
NEW
4368
    pRt->pGroupCalcInfos = NULL;
×
4369
  }
4370

4371
  if (groupedRemoteResult) {
399,556✔
4372
    pRt->pGroupCalcInfos = tSimpleHashInit(256, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
70,296✔
4373
    TSDB_CHECK_NULL(pRt->pGroupCalcInfos, code, lino, _exit, terrno);
70,296✔
4374
    tSimpleHashSetFreeFp(pRt->pGroupCalcInfos, tDestroySSTriggerGroupCalcInfo);
70,296✔
4375
  }
4376
  
4377
  if (groupedRemoteResult) {
399,556✔
4378
    TAOS_CHECK_EXIT(extWinBuildGroupedCalcInfosFromBlocks(pBlocks, pRt, &extWinTimeRange));
70,296✔
4379
    // The grouped path uses per-group pParams (in pGroupCalcInfos) instead of
4380
    // pStreamPesudoFuncVals.  extWinInitCGrpCtx later overwrites the pointer,
4381
    // orphaning the original SArray.  Free it now while we still can.
4382
    taosArrayDestroy(pRt->pStreamPesudoFuncVals);
70,296✔
4383
    pRt->pStreamPesudoFuncVals = NULL;
70,296✔
4384
  } else {
4385
    TAOS_CHECK_EXIT(extWinBuildNonGroupedCalcInfosFromBlocks(pBlocks, pRt, &extWinTimeRange));
329,260✔
4386
  }
4387

4388
  if (pTimeRange != NULL) {
398,748✔
4389
    *pTimeRange = extWinTimeRange;
398,748✔
4390
  }
4391

4392
  if (!groupedRemoteResult && taosArrayGetSize(pRt->pStreamPesudoFuncVals) > 0) {
398,748✔
4393
    qInfo("%s non-stream extWin mock initialized from block, winNum:%d, firstWin:[%" PRId64 ", %" PRId64 "], wholeRange:[%" PRId64 ", %" PRId64 "]",
326,028✔
4394
          GET_TASKID(pTaskInfo), (int32_t)taosArrayGetSize(pRt->pStreamPesudoFuncVals),
4395
          ((SSTriggerCalcParam*)taosArrayGet(pRt->pStreamPesudoFuncVals, 0))->wstart,
4396
          ((SSTriggerCalcParam*)taosArrayGet(pRt->pStreamPesudoFuncVals, 0))->wend,
4397
          extWinTimeRange.skey, extWinTimeRange.ekey);
4398
  } else if (groupedRemoteResult) {
72,720✔
4399
    qInfo("%s non-stream extWin initialized from grouped remote result, groupNum:%d, wholeRange:[%" PRId64 ", %" PRId64 "]",
70,296✔
4400
          GET_TASKID(pTaskInfo), pRt->pGroupCalcInfos ? tSimpleHashGetSize(pRt->pGroupCalcInfos) : 0,
4401
          extWinTimeRange.skey, extWinTimeRange.ekey);
4402
  }
4403

4404
_exit:
399,556✔
4405
  if (pBlocks) {
399,556✔
4406
    for (int32_t blockIdx = 0; blockIdx < taosArrayGetSize(pBlocks); ++blockIdx) {
909,000✔
4407
      SSDataBlock** ppOne = taosArrayGet(pBlocks, blockIdx);
511,868✔
4408
      if (ppOne && *ppOne) {
511,868✔
4409
        blockDataDestroy(*ppOne);
511,868✔
4410
      }
4411
    }
4412
    taosArrayDestroy(pBlocks);
397,132✔
4413
  }
4414
  return code;
399,556✔
4415
}
4416

4417
#if defined(BUILD_TEST)
4418
// mockSSDataBlock is a helper function to create a sample SSDataBlock for testing purposes.
4419
static SSDataBlock* mockSSDataBlock() {
4420
  SSDataBlock* pBlock = NULL;
4421
  int32_t      code = createDataBlock(&pBlock);
4422
  if (code != TSDB_CODE_SUCCESS || pBlock == NULL) {
4423
    return NULL;
4424
  }
4425

4426
  // Add 4 columns: timestamp, timestamp, int, bigint.
4427
  SColumnInfoData col1 = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, 8, 1);
4428
  code = blockDataAppendColInfo(pBlock, &col1);
4429
  if (code != TSDB_CODE_SUCCESS) {
4430
    blockDataDestroy(pBlock);
4431
    return NULL;
4432
  }
4433

4434
  SColumnInfoData col2 = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, 8, 2);
4435
  code = blockDataAppendColInfo(pBlock, &col2);
4436
  if (code != TSDB_CODE_SUCCESS) {
4437
    blockDataDestroy(pBlock);
4438
    return NULL;
4439
  }
4440

4441
  SColumnInfoData col3 = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 3);
4442
  code = blockDataAppendColInfo(pBlock, &col3);
4443
  if (code != TSDB_CODE_SUCCESS) {
4444
    blockDataDestroy(pBlock);
4445
    return NULL;
4446
  }
4447

4448
  SColumnInfoData col4 = createColumnInfoData(TSDB_DATA_TYPE_BIGINT, 8, 4);
4449
  code = blockDataAppendColInfo(pBlock, &col4);
4450
  if (code != TSDB_CODE_SUCCESS) {
4451
    blockDataDestroy(pBlock);
4452
    return NULL;
4453
  }
4454

4455
  // Ensure capacity for all rows.
4456
  code = blockDataEnsureCapacity(pBlock, 2);
4457
  if (code != TSDB_CODE_SUCCESS) {
4458
    blockDataDestroy(pBlock);
4459
    return NULL;
4460
  }
4461

4462
  // Get column data pointers.
4463
  SColumnInfoData* pCol1 = taosArrayGet(pBlock->pDataBlock, 0);
4464
  SColumnInfoData* pCol2 = taosArrayGet(pBlock->pDataBlock, 1);
4465
  SColumnInfoData* pCol3 = taosArrayGet(pBlock->pDataBlock, 2);
4466
  SColumnInfoData* pCol4 = taosArrayGet(pBlock->pDataBlock, 3);
4467
  if (pCol1 == NULL || pCol2 == NULL || pCol3 == NULL || pCol4 == NULL) {
4468
    blockDataDestroy(pBlock);
4469
    return NULL;
4470
  }
4471

4472
  // Row 1 values.
4473
  int64_t ts1 = 1589335200000;
4474
  int64_t ts2 = 1589338140000;
4475
  int32_t intVal1 = 100;
4476
  int64_t bigintVal1 = 1000000LL;
4477

4478
  code = colDataSetVal(pCol1, 0, (const char*)&ts1, false);
4479
  if (code != TSDB_CODE_SUCCESS) {
4480
    blockDataDestroy(pBlock);
4481
    return NULL;
4482
  }
4483
  code = colDataSetVal(pCol2, 0, (const char*)&ts2, false);
4484
  if (code != TSDB_CODE_SUCCESS) {
4485
    blockDataDestroy(pBlock);
4486
    return NULL;
4487
  }
4488
  code = colDataSetVal(pCol3, 0, (const char*)&intVal1, false);
4489
  if (code != TSDB_CODE_SUCCESS) {
4490
    blockDataDestroy(pBlock);
4491
    return NULL;
4492
  }
4493
  code = colDataSetVal(pCol4, 0, (const char*)&bigintVal1, false);
4494
  if (code != TSDB_CODE_SUCCESS) {
4495
    blockDataDestroy(pBlock);
4496
    return NULL;
4497
  }
4498
  pBlock->info.rows++;
4499

4500
  // Row 2 values.
4501
  int64_t ts3 = 1589338140001;
4502
  int64_t ts4 = 1589340110000;
4503
  int32_t intVal2 = 200;
4504
  int64_t bigintVal2 = 2000000LL;
4505

4506
  code = colDataSetVal(pCol1, 1, (const char*)&ts3, false);
4507
  if (code != TSDB_CODE_SUCCESS) {
4508
    blockDataDestroy(pBlock);
4509
    return NULL;
4510
  }
4511
  code = colDataSetVal(pCol2, 1, (const char*)&ts4, false);
4512
  if (code != TSDB_CODE_SUCCESS) {
4513
    blockDataDestroy(pBlock);
4514
    return NULL;
4515
  }
4516
  code = colDataSetVal(pCol3, 1, (const char*)&intVal2, false);
4517
  if (code != TSDB_CODE_SUCCESS) {
4518
    blockDataDestroy(pBlock);
4519
    return NULL;
4520
  }
4521
  code = colDataSetVal(pCol4, 1, (const char*)&bigintVal2, false);
4522
  if (code != TSDB_CODE_SUCCESS) {
4523
    blockDataDestroy(pBlock);
4524
    return NULL;
4525
  }
4526
  pBlock->info.rows++;
4527

4528
  pBlock->info.id.groupId = 0;
4529
  pBlock->info.id.baseGId = pBlock->info.id.groupId;
4530

4531
  return pBlock;
4532
}
4533

4534
static int32_t extWinMockSSDataBlocksWithGroups(SArray** ppBlocks) {
4535
  int32_t      code = TSDB_CODE_SUCCESS;
4536
  int32_t      lino = 0;
4537
  SArray*      pBlocks = NULL;
4538
  SSDataBlock* pBlock1 = NULL;
4539
  SSDataBlock* pBlock2 = NULL;
4540

4541
  TSDB_CHECK_NULL(ppBlocks, code, lino, _exit, TSDB_CODE_INVALID_PARA);
4542

4543
  pBlocks = taosArrayInit(2, POINTER_BYTES);
4544
  TSDB_CHECK_NULL(pBlocks, code, lino, _exit, terrno);
4545

4546
  pBlock1 = mockSSDataBlock();
4547
  TSDB_CHECK_NULL(pBlock1, code, lino, _exit, terrno);
4548
  pBlock1->info.id.groupId = 1001;
4549
  pBlock1->info.id.baseGId = pBlock1->info.id.groupId;
4550

4551
  pBlock2 = mockSSDataBlock();
4552
  TSDB_CHECK_NULL(pBlock2, code, lino, _exit, terrno);
4553
  pBlock2->info.id.groupId = 1002;
4554
  pBlock2->info.id.baseGId = pBlock2->info.id.groupId;
4555

4556
  SColumnInfoData* pG2Start = taosArrayGet(pBlock2->pDataBlock, 0);
4557
  SColumnInfoData* pG2End = taosArrayGet(pBlock2->pDataBlock, 1);
4558
  SColumnInfoData* pG2Int = taosArrayGet(pBlock2->pDataBlock, 2);
4559
  SColumnInfoData* pG2BigInt = taosArrayGet(pBlock2->pDataBlock, 3);
4560
  TSDB_CHECK_NULL(pG2Start, code, lino, _exit, TSDB_CODE_INVALID_PARA);
4561
  TSDB_CHECK_NULL(pG2End, code, lino, _exit, TSDB_CODE_INVALID_PARA);
4562
  TSDB_CHECK_NULL(pG2Int, code, lino, _exit, TSDB_CODE_INVALID_PARA);
4563
  TSDB_CHECK_NULL(pG2BigInt, code, lino, _exit, TSDB_CODE_INVALID_PARA);
4564

4565
  for (int32_t row = 0; row < pBlock2->info.rows; ++row) {
4566
    int64_t startTs = *(int64_t*)colDataGetData(pG2Start, row);
4567
    int64_t endTs = *(int64_t*)colDataGetData(pG2End, row);
4568
    int32_t intVal = *(int32_t*)colDataGetData(pG2Int, row);
4569
    int64_t bigIntVal = *(int64_t*)colDataGetData(pG2BigInt, row);
4570

4571
    startTs += 3600000;
4572
    endTs += 3600000;
4573
    intVal += 100;
4574
    bigIntVal += 1000000;
4575

4576
    TAOS_CHECK_EXIT(colDataSetVal(pG2Start, row, (const char*)&startTs, false));
4577
    TAOS_CHECK_EXIT(colDataSetVal(pG2End, row, (const char*)&endTs, false));
4578
    TAOS_CHECK_EXIT(colDataSetVal(pG2Int, row, (const char*)&intVal, false));
4579
    TAOS_CHECK_EXIT(colDataSetVal(pG2BigInt, row, (const char*)&bigIntVal, false));
4580
  }
4581

4582
  TSDB_CHECK_NULL(taosArrayPush(pBlocks, &pBlock1), code, lino, _exit, terrno);
4583
  TSDB_CHECK_NULL(taosArrayPush(pBlocks, &pBlock2), code, lino, _exit, terrno);
4584

4585
  *ppBlocks = pBlocks;
4586
  return code;
4587

4588
_exit:
4589
  if (pBlock1) {
4590
    blockDataDestroy(pBlock1);
4591
  }
4592
  if (pBlock2) {
4593
    blockDataDestroy(pBlock2);
4594
  }
4595
  if (pBlocks) {
4596
    taosArrayDestroy(pBlocks);
4597
  }
4598
  return code;
4599
}
4600

4601
typedef enum {
4602
  EXT_WIN_TEST_MOCK_UNSUPPORTED = 0,
4603
  EXT_WIN_TEST_MOCK_SINGLE_BLOCK,
4604
  EXT_WIN_TEST_MOCK_GROUP_BLOCKS,
4605
} EExtWinTestMockMode;
4606

4607
typedef struct {
4608
  EExtWinTestMockMode mode;
4609
} SExtWinTestMockCtx;
4610

4611
static bool extWinDetectTestMockModeFromPhysiNode(SPhysiNode* pNode, SExtWinTestMockCtx* pCtx) {
4612
  if (pNode == NULL || pCtx == NULL) {
4613
    return false;
4614
  }
4615

4616
  if (nodeType(pNode) == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
4617
    STableScanPhysiNode* pScan = (STableScanPhysiNode*)pNode;
4618
    const char*          pDbName = tNameGetDbNameP(&pScan->scan.tableName);
4619
    if (pDbName == NULL) {
4620
      pCtx->mode = EXT_WIN_TEST_MOCK_UNSUPPORTED;
4621
      return true;
4622
    }
4623

4624
    if (0 == strcasecmp(pDbName, "external_window_test_single_block")) {
4625
      pCtx->mode = EXT_WIN_TEST_MOCK_SINGLE_BLOCK;
4626
    } else if (0 == strcasecmp(pDbName, "external_window_test_group_blocks")) {
4627
      pCtx->mode = EXT_WIN_TEST_MOCK_GROUP_BLOCKS;
4628
    } else {
4629
      pCtx->mode = EXT_WIN_TEST_MOCK_UNSUPPORTED;
4630
    }
4631

4632
    return true;
4633
  }
4634

4635
  SNode* pChild = NULL;
4636
  FOREACH(pChild, pNode->pChildren) {
4637
    if (extWinDetectTestMockModeFromPhysiNode((SPhysiNode*)pChild, pCtx)) {
4638
      return true;
4639
    }
4640
  }
4641

4642
  return false;
4643
}
4644

4645
static SArray* extWinGetSSDataBlocksInTest(SExternalWindowPhysiNode* pPhynode) {
4646
  int32_t code = TSDB_CODE_SUCCESS;
4647
  int32_t lino = 0;
4648
  if (pPhynode == NULL || pPhynode->pSubquery == NULL) {
4649
    terrno = TSDB_CODE_INVALID_PARA;
4650
    return NULL;
4651
  }
4652

4653
  SExtWinTestMockCtx ctx = {.mode = EXT_WIN_TEST_MOCK_UNSUPPORTED};
4654
  SNode*             pChild = NULL;
4655
  FOREACH(pChild, pPhynode->window.node.pChildren) {
4656
    if (extWinDetectTestMockModeFromPhysiNode((SPhysiNode*)pChild, &ctx)) {
4657
      break;
4658
    }
4659
  }
4660

4661
  if (ctx.mode == EXT_WIN_TEST_MOCK_SINGLE_BLOCK) {
4662
    SArray*      pBlocks = taosArrayInit(1, POINTER_BYTES);
4663
    SSDataBlock* pBlock = NULL;
4664
    if (pBlocks == NULL) {
4665
      return NULL;
4666
    }
4667

4668
    pBlock = mockSSDataBlock();
4669
    if (pBlock == NULL) {
4670
      taosArrayDestroy(pBlocks);
4671
      return NULL;
4672
    }
4673

4674
    if (taosArrayPush(pBlocks, &pBlock) == NULL) {
4675
      blockDataDestroy(pBlock);
4676
      taosArrayDestroy(pBlocks);
4677
      return NULL;
4678
    }
4679

4680
    return pBlocks;
4681
  }
4682

4683
  if (ctx.mode == EXT_WIN_TEST_MOCK_GROUP_BLOCKS) {
4684
    SArray* pBlocks = NULL;
4685
    int32_t code = extWinMockSSDataBlocksWithGroups(&pBlocks);
4686
    TAOS_CHECK_EXIT(code);
4687
    return pBlocks;
4688
  }
4689

4690
    if (NULL == pPhynode->pSubquery || nodeType(pPhynode->pSubquery) != QUERY_NODE_REMOTE_TABLE) {
4691
    qError("invalid subquery in external window, pSubquery:%p, type:%d", pPhynode->pSubquery, pPhynode->pSubquery ? nodeType(pPhynode->pSubquery) : -1);
4692
    code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
4693
    TAOS_CHECK_EXIT(code);
4694
  }
4695
  
4696
  SRemoteTableNode* pRemote = (SRemoteTableNode*)pPhynode->pSubquery;
4697
  TAOS_CHECK_EXIT(qFetchRemoteNode(gTaskScalarExtra.pSubJobCtx, pRemote->subQIdx, (SNode*)pRemote));
4698

4699
  return pRemote->pResBlks;
4700
_exit:
4701
  if (code != TSDB_CODE_SUCCESS) {
4702
    terrno = code;
4703
    qError("%s : %d error code:%s", __func__, lino, tstrerror(code));
4704
  }
4705

4706
  return NULL;
4707
}
4708
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc