• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4720

08 Sep 2025 08:43AM UTC coverage: 58.139% (-0.6%) from 58.762%
#4720

push

travis-ci

web-flow
Merge pull request #32881 from taosdata/enh/add-new-windows-ci

fix(ci): update workflow reference to use new Windows CI YAML

133181 of 292179 branches covered (45.58%)

Branch coverage included in aggregate %.

201691 of 283811 relevant lines covered (71.07%)

5442780.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

34.99
/source/libs/executor/src/externalwindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "operator.h"
18
#include "querytask.h"
19
#include "tdatablock.h"
20
#include "stream.h"
21
#include "filter.h"
22

23
typedef struct SBlockList {
24
  const SSDataBlock* pSrcBlock;
25
  SList*             pBlocks;
26
  int32_t            blockRowNumThreshold;
27
} SBlockList;
28

29
typedef struct SExternalWindowOperator {
30
  SOptrBasicInfo     binfo;
31
  SAggSupporter      aggSup;
32
  SExprSupp          scalarSupp;
33
  STimeWindowAggSupp twAggSup;
34
  SGroupResInfo      groupResInfo;
35
  int32_t            primaryTsIndex;
36
  EExtWinMode        mode;
37
  SArray*            pWins;
38
  SArray*            pOutputBlocks;  // for each window, we have a list of blocks
39
  // SArray*            pOffsetList;  // for each window
40
  int32_t            lastWinId;
41
  int32_t            outputWinId;
42
  SListNode*         pOutputBlockListNode;  // block index in block array used for output
43
  SSDataBlock*       pTmpBlock;
44
  SSDataBlock*       pEmptyInputBlock;
45
  SArray*            pPseudoColInfo;  
46
  bool               hasCountFunc;
47
  // SLimitInfo         limitInfo;  // limit info for each window
48
} SExternalWindowOperator;
49

50
static int32_t blockListInit(SBlockList* pBlockList, int32_t threshold) {
263,149✔
51
  pBlockList->pBlocks = tdListNew(sizeof(SSDataBlock*));
263,149✔
52
  if (!pBlockList->pBlocks) {
263,149!
53
    return terrno;
×
54
  }
55
  return 0;
263,149✔
56
}
57

58
static int32_t blockListAddBlock(SBlockList* pBlockList) {
×
59
  SSDataBlock* pRes = NULL;
×
60
  int32_t      code = createOneDataBlock(pBlockList->pSrcBlock, false, &pRes);
×
61
  if (code != 0) {
×
62
    return code;
×
63
  }
64
  code = blockDataEnsureCapacity(pRes, pBlockList->blockRowNumThreshold);
×
65
  if (code != 0) {
×
66
    blockDataDestroy(pRes);
×
67
    return code;
×
68
  }
69
  code = tdListAppend(pBlockList->pBlocks, &pRes);
×
70
  if (code != 0) {
×
71
    blockDataDestroy(pRes);
×
72
    return code;
×
73
  }
74
  return 0;
×
75
}
76

77
static int32_t blockListGetLastBlock(SBlockList* pBlockList, SSDataBlock** ppBlock) {
×
78
  SListNode* pNode = TD_DLIST_TAIL(pBlockList->pBlocks);
×
79
  int32_t    code = 0;
×
80
  *ppBlock = NULL;
×
81
  code = blockListAddBlock(pBlockList);
×
82
  if (0 == code) {
×
83
    pNode = TD_DLIST_TAIL(pBlockList->pBlocks);
×
84
    *ppBlock = *(SSDataBlock**)pNode->data;
×
85
  }
86
  return code;
×
87
}
88

89
static void blockListDestroy(void* p) {
263,149✔
90
  SBlockList* pBlockList = (SBlockList*)p;
263,149✔
91
  if (TD_DLIST_NELES(pBlockList->pBlocks) > 0) {
263,149!
92
    SListNode* pNode = TD_DLIST_HEAD(pBlockList->pBlocks);
×
93
    while (pNode) {
×
94
      SSDataBlock* pBlock = *(SSDataBlock**)pNode->data;
×
95
      blockDataDestroy(pBlock);
×
96
      pNode = pNode->dl_next_;
×
97
    }
98
  }
99
  taosMemoryFree(pBlockList->pBlocks);
263,149!
100
}
263,149✔
101

102
void destroyExternalWindowOperatorInfo(void* param) {
128✔
103
  if (NULL == param) {
128!
104
    return;
×
105
  }
106
  SExternalWindowOperator* pInfo = (SExternalWindowOperator*)param;
128✔
107
  cleanupBasicInfo(&pInfo->binfo);
128✔
108

109
  taosArrayDestroyEx(pInfo->pOutputBlocks, blockListDestroy);
128✔
110
  taosArrayDestroy(pInfo->pWins);
128✔
111
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
128✔
112
  cleanupGroupResInfo(&pInfo->groupResInfo);
128✔
113
 
114
  taosArrayDestroy(pInfo->pPseudoColInfo);
128✔
115
  blockDataDestroy(pInfo->pTmpBlock);
128✔
116
  blockDataDestroy(pInfo->pEmptyInputBlock);
128✔
117

118
  cleanupAggSup(&pInfo->aggSup);
128✔
119
  cleanupExprSupp(&pInfo->scalarSupp);
128✔
120

121
  pInfo->binfo.resultRowInfo.openWindow = tdListFree(pInfo->binfo.resultRowInfo.openWindow);
128✔
122

123
  taosMemoryFreeClear(pInfo);
128!
124
}
125

126
static int32_t doOpenExternalWindow(SOperatorInfo* pOperator);
127
static int32_t externalWindowNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
128

129
typedef struct SMergeAlignedExternalWindowOperator {
130
  SExternalWindowOperator* pExtW;
131
  int64_t curTs;
132
  SSDataBlock* pPrefetchedBlock;
133
  SResultRow*  pResultRow;
134
} SMergeAlignedExternalWindowOperator;
135

136
void destroyMergeAlignedExternalWindowOperator(void* pOperator) {
×
137
  SMergeAlignedExternalWindowOperator* pMlExtInfo = (SMergeAlignedExternalWindowOperator*)pOperator;
×
138
  destroyExternalWindowOperatorInfo(pMlExtInfo->pExtW);
×
139
}
×
140

141
void doMergeAlignExternalWindow(SOperatorInfo* pOperator);
142

143
static int32_t mergeAlignedExternalWindowNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
144
  SExecTaskInfo*                       pTaskInfo = pOperator->pTaskInfo;
×
145
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
×
146
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
×
147
  int32_t                              code = 0;
×
148
  int32_t lino = 0;
×
149

150
  if (pOperator->status == OP_EXEC_DONE) {
×
151
    (*ppRes) = NULL;
×
152
    return TSDB_CODE_SUCCESS;
×
153
  }
154

155
  SSDataBlock* pRes = pExtW->binfo.pRes;
×
156
  blockDataCleanup(pRes);
×
157

158
  if (!pExtW->pWins) {
×
159
    size_t size = taosArrayGetSize(pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPesudoFuncVals);
×
160
    pExtW->pWins = taosArrayInit(size, sizeof(STimeWindow));
×
161
    if (!pExtW->pWins) QUERY_CHECK_CODE(terrno, lino, _end);
×
162
    // pExtW->pOffsetList = taosArrayInit(size, sizeof(SLimitInfo));
163
    // if (!pExtW->pOffsetList) QUERY_CHECK_CODE(terrno, lino, _end);
164

165
    for (int32_t i = 0; i < size; ++i) {
×
166
      SSTriggerCalcParam* pParam = taosArrayGet(pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPesudoFuncVals, i);
×
167
      STimeWindow win = {.skey = pParam->wstart, .ekey = pParam->wend};
×
168
      if (pTaskInfo->pStreamRuntimeInfo->funcInfo.triggerType != 1){  // 1 meams STREAM_TRIGGER_SLIDING
×
169
        win.ekey++;
×
170
      }
171
      TSDB_CHECK_NULL(taosArrayPush(pExtW->pWins, &win), code, lino, _end, terrno);
×
172
    }
173
    pExtW->outputWinId = pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx;
×
174
  }
175

176
  doMergeAlignExternalWindow(pOperator);
×
177
  size_t rows = pRes->info.rows;
×
178
  pOperator->resultInfo.totalRows += rows;
×
179
  (*ppRes) = (rows == 0) ? NULL : pRes;
×
180

181
_end:
×
182
  if (code != 0) {
×
183
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
184
    pTaskInfo->code = code;
×
185
    T_LONG_JMP(pTaskInfo->env, code);
×
186
  }
187
  return code;
×
188
}
189

190
int32_t resetMergeAlignedExternalWindowOperator(SOperatorInfo* pOperator) {
×
191
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
×
192
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
×
193
  SExecTaskInfo*                       pTaskInfo = pOperator->pTaskInfo;
×
194
  SMergeAlignedIntervalPhysiNode * pPhynode = (SMergeAlignedIntervalPhysiNode*)pOperator->pPhyNode;
×
195
  pOperator->status = OP_NOT_OPENED;
×
196

197
  taosArrayDestroy(pExtW->pWins);
×
198
  pExtW->pWins = NULL;
×
199

200
  resetBasicOperatorState(&pExtW->binfo);
×
201
  pMlExtInfo->pResultRow = NULL;
×
202
  pMlExtInfo->curTs = INT64_MIN;
×
203
  if (pMlExtInfo->pPrefetchedBlock) blockDataCleanup(pMlExtInfo->pPrefetchedBlock);
×
204

205
  int32_t code = resetAggSup(&pOperator->exprSupp, &pExtW->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
×
206
                             sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
×
207
                             &pTaskInfo->storageAPI.functionStore);
208
  if (code == 0) {
×
209
    colDataDestroy(&pExtW->twAggSup.timeWindowData);
×
210
    code = initExecTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pTaskInfo->window);
×
211
  }
212
  return code;
×
213
}
214

215
int32_t createMergeAlignedExternalWindowOperator(SOperatorInfo* pDownstream, SPhysiNode* pNode,
×
216
                                                 SExecTaskInfo* pTaskInfo, SOperatorInfo** ppOptrOut) {
217
  SMergeAlignedIntervalPhysiNode* pPhynode = (SMergeAlignedIntervalPhysiNode*)pNode;
×
218
  int32_t code = 0;
×
219
  int32_t lino = 0;
×
220
  SMergeAlignedExternalWindowOperator* pMlExtInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedExternalWindowOperator));
×
221
  SOperatorInfo*                       pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
222

223
  if (pTaskInfo->pStreamRuntimeInfo != NULL){
×
224
    pTaskInfo->pStreamRuntimeInfo->funcInfo.withExternalWindow = true;
×
225
  }
226
  pOperator->pPhyNode = pNode;
×
227
  if (!pMlExtInfo || !pOperator) {
×
228
    code = terrno;
×
229
    goto _error;
×
230
  }
231

232
  pMlExtInfo->pExtW = taosMemoryCalloc(1, sizeof(SExternalWindowOperator));
×
233
  if (!pMlExtInfo->pExtW) {
×
234
    code = terrno;
×
235
    goto _error;
×
236
  }
237

238
  SExternalWindowOperator* pExtW = pMlExtInfo->pExtW;
×
239
  SExprSupp* pSup = &pOperator->exprSupp;
×
240
  pSup->hasWindowOrGroup = true;
×
241
  pMlExtInfo->curTs = INT64_MIN;
×
242

243
  pExtW->primaryTsIndex = ((SColumnNode*)pPhynode->window.pTspk)->slotId;
×
244
  pExtW->mode = pPhynode->window.pProjs ? EEXT_MODE_SCALAR : EEXT_MODE_AGG;
×
245
  pExtW->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC;
×
246
  pExtW->binfo.outputTsOrder = pExtW->binfo.inputTsOrder;
×
247

248
  // pExtW->limitInfo = (SLimitInfo){0};
249
  // initLimitInfo(pPhynode->window.node.pLimit, pPhynode->window.node.pSlimit, &pExtW->limitInfo);
250

251
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
×
252
  initResultSizeInfo(&pOperator->resultInfo, 512);
×
253

254
  int32_t num = 0;
×
255
  SExprInfo* pExprInfo = NULL;
×
256
  code = createExprInfo(pPhynode->window.pFuncs, NULL, &pExprInfo, &num);
×
257
  QUERY_CHECK_CODE(code, lino, _error);
×
258

259
  if (pExtW->mode == EEXT_MODE_AGG) {
×
260
    code = initAggSup(pSup, &pExtW->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
×
261
                      &pTaskInfo->storageAPI.functionStore);
262
    QUERY_CHECK_CODE(code, lino, _error);
×
263
  }
264

265
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhynode->window.node.pOutputDataBlockDesc);
×
266
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
×
267
  initBasicInfo(&pExtW->binfo, pResBlock);
×
268

269
  initResultRowInfo(&pExtW->binfo.resultRowInfo);
×
270
  code = blockDataEnsureCapacity(pExtW->binfo.pRes, pOperator->resultInfo.capacity);
×
271
  QUERY_CHECK_CODE(code, lino, _error);
×
272
  setOperatorInfo(pOperator, "MergeAlignedExternalWindowOperator", QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW, false, OP_NOT_OPENED, pMlExtInfo, pTaskInfo);
×
273
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignedExternalWindowNext, NULL,
×
274
                                         destroyMergeAlignedExternalWindowOperator, optrDefaultBufFn, NULL,
275
                                         optrDefaultGetNextExtFn, NULL);
276
  setOperatorResetStateFn(pOperator, resetMergeAlignedExternalWindowOperator);
×
277

278
  code = appendDownstream(pOperator, &pDownstream, 1);
×
279
  QUERY_CHECK_CODE(code, lino, _error);
×
280
  *ppOptrOut = pOperator;
×
281
  return code;
×
282
_error:
×
283
  if (pMlExtInfo) destroyMergeAlignedExternalWindowOperator(pMlExtInfo);
×
284
  destroyOperatorAndDownstreams(pOperator, &pDownstream, 1);
×
285
  pTaskInfo->code = code;
×
286
  return code;
×
287
}
288

289
static int32_t resetExternalWindowOperator(SOperatorInfo* pOperator) {
216✔
290
  SExternalWindowOperator* pExtW = pOperator->info;
216✔
291
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
216✔
292
  SExternalWindowPhysiNode* pPhynode = (SExternalWindowPhysiNode*)pOperator->pPhyNode;
216✔
293
  pOperator->status = OP_NOT_OPENED;
216✔
294

295
  resetBasicOperatorState(&pExtW->binfo);
216✔
296
  pExtW->outputWinId = 0;
216✔
297
  pExtW->lastWinId = -1;
216✔
298
  taosArrayDestroyEx(pExtW->pOutputBlocks, blockListDestroy);
216✔
299
  // taosArrayDestroy(pExtW->pOffsetList);
300
  taosArrayDestroy(pExtW->pWins);
216✔
301
  pExtW->pWins = NULL;
216✔
302
  pExtW->pOutputBlockListNode = NULL;
216✔
303
  pExtW->pOutputBlocks = NULL;
216✔
304
  // pExtW->pOffsetList = NULL;
305
  initResultSizeInfo(&pOperator->resultInfo, 512);
216✔
306
  int32_t code = blockDataEnsureCapacity(pExtW->binfo.pRes, pOperator->resultInfo.capacity);
216✔
307
  if (code == 0) {
216!
308
    code = resetAggSup(&pOperator->exprSupp, &pExtW->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
216✔
309
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
216✔
310
                       &pTaskInfo->storageAPI.functionStore);
311
  }
312
  if (code == 0) {
216!
313
    code = resetExprSupp(&pExtW->scalarSupp, pTaskInfo, pPhynode->window.pProjs, NULL,
216✔
314
                         &pTaskInfo->storageAPI.functionStore);
315
  }
316
  if (code == 0) {
216!
317
    colDataDestroy(&pExtW->twAggSup.timeWindowData);
216✔
318
    code = initExecTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pTaskInfo->window);
216✔
319
  }
320
  if (code == 0) {
216!
321
    cleanupGroupResInfo(&pExtW->groupResInfo);
216✔
322
  }
323
  blockDataDestroy(pExtW->pTmpBlock);
216✔
324
  pExtW->pTmpBlock = NULL;
216✔
325
  return code;
216✔
326
}
327

328
static EDealRes extWindowHasCountLikeFunc(SNode* pNode, void* res) {
862✔
329
  if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
862✔
330
    SFunctionNode* pFunc = (SFunctionNode*)pNode;
312✔
331
    if (fmIsCountLikeFunc(pFunc->funcId) || (pFunc->hasOriginalFunc && fmIsCountLikeFunc(pFunc->originalFuncId))) {
312!
332
      *(bool*)res = true;
128✔
333
      return DEAL_RES_END;
128✔
334
    }
335
  }
336
  return DEAL_RES_CONTINUE;
734✔
337
}
338

339

340
static int32_t extWindowCreateEmptyInputBlock(SOperatorInfo* pOperator, SSDataBlock** ppBlock) {
128✔
341
  int32_t code = TSDB_CODE_SUCCESS;
128✔
342
  int32_t lino = 0;
128✔
343
  SSDataBlock* pBlock = NULL;
128✔
344
  if (!tsCountAlwaysReturnValue) {
128!
345
    return TSDB_CODE_SUCCESS;
×
346
  }
347

348
  SExternalWindowOperator* pExtW = pOperator->info;
128✔
349

350
  if (!pExtW->hasCountFunc) {
128!
351
    return TSDB_CODE_SUCCESS;
×
352
  }
353

354
  code = createDataBlock(&pBlock);
128✔
355
  if (code) {
128!
356
    return code;
×
357
  }
358

359
  pBlock->info.rows = 1;
128✔
360
  pBlock->info.capacity = 0;
128✔
361

362
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
603✔
363
    SColumnInfoData colInfo = {0};
476✔
364
    colInfo.hasNull = true;
476✔
365
    colInfo.info.type = TSDB_DATA_TYPE_NULL;
476✔
366
    colInfo.info.bytes = 1;
476✔
367

368
    SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
476✔
369
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
1,017✔
370
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
542✔
371
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
542✔
372
        int32_t slotId = pFuncParam->pCol->slotId;
386✔
373
        int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
386✔
374
        if (slotId >= numOfCols) {
386✔
375
          code = taosArrayEnsureCap(pBlock->pDataBlock, slotId + 1);
237✔
376
          QUERY_CHECK_CODE(code, lino, _end);
236!
377

378
          for (int32_t k = numOfCols; k < slotId + 1; ++k) {
472✔
379
            void* tmp = taosArrayPush(pBlock->pDataBlock, &colInfo);
236✔
380
            QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
236!
381
          }
382
        }
383
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
156✔
384
        // do nothing
385
      }
386
    }
387
  }
388

389
  code = blockDataEnsureCapacity(pBlock, pBlock->info.rows);
127✔
390
  QUERY_CHECK_CODE(code, lino, _end);
128!
391

392
  for (int32_t i = 0; i < blockDataGetNumOfCols(pBlock); ++i) {
364✔
393
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
236✔
394
    QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
236!
395
    colDataSetNULL(pColInfoData, 0);
396
  }
397
  *ppBlock = pBlock;
128✔
398

399
_end:
128✔
400
  if (code != TSDB_CODE_SUCCESS) {
128!
401
    blockDataDestroy(pBlock);
×
402
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
403
  }
404
  return code;
128✔
405
}
406

407
int32_t createExternalWindowOperator(SOperatorInfo* pDownstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo,
128✔
408
                                     SOperatorInfo** pOptrOut) {
409
  SExternalWindowPhysiNode* pPhynode = (SExternalWindowPhysiNode*)pNode;
128✔
410
  QRY_PARAM_CHECK(pOptrOut);
128!
411
  int32_t                  code = 0;
128✔
412
  int32_t                  lino = 0;
128✔
413
  SExternalWindowOperator* pExtW = taosMemoryCalloc(1, sizeof(SExternalWindowOperator));
128!
414
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
128!
415
  pOperator->pPhyNode = pNode;
128✔
416
  if (!pExtW || !pOperator) {
128!
417
    code = terrno;
×
418
    lino = __LINE__;
×
419
    goto _error;
×
420
  }
421
  
422
  setOperatorInfo(pOperator, "ExternalWindowOperator", QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW, true, OP_NOT_OPENED,
128✔
423
                  pExtW, pTaskInfo);
424
                  
425
  if (pTaskInfo->pStreamRuntimeInfo != NULL){
128!
426
    pTaskInfo->pStreamRuntimeInfo->funcInfo.withExternalWindow = true;
128✔
427
  }
428
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhynode->window.node.pOutputDataBlockDesc);
128✔
429
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
128!
430
  initBasicInfo(&pExtW->binfo, pResBlock);
128✔
431

432
  pExtW->primaryTsIndex = ((SColumnNode*)pPhynode->window.pTspk)->slotId;
128✔
433
  pExtW->mode = pPhynode->window.pProjs ? EEXT_MODE_SCALAR : (pPhynode->window.indefRowsFunc ? EEXT_MODE_INDEFR_FUNC : EEXT_MODE_AGG);
128!
434
  pExtW->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC;
128✔
435
  pExtW->binfo.outputTsOrder = pExtW->binfo.inputTsOrder;
128✔
436

437
  // pExtW->limitInfo = (SLimitInfo){0};
438
  // initLimitInfo(pPhynode->window.node.pLimit, pPhynode->window.node.pSlimit, &pExtW->limitInfo);
439

440
  if (pPhynode->window.pProjs) {
128!
441
    int32_t    numOfScalarExpr = 0;
×
442
    SExprInfo* pScalarExprInfo = NULL;
×
443
    code = createExprInfo(pPhynode->window.pProjs, NULL, &pScalarExprInfo, &numOfScalarExpr);
×
444
    QUERY_CHECK_CODE(code, lino, _error);
×
445

446
    code = initExprSupp(&pExtW->scalarSupp, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
×
447
    QUERY_CHECK_CODE(code, lino, _error);
×
448
  } else if (pExtW->mode == EEXT_MODE_AGG) {
128!
449
    if (pPhynode->window.pExprs != NULL) {
128!
450
      int32_t    num = 0;
×
451
      SExprInfo* pSExpr = NULL;
×
452
      code = createExprInfo(pPhynode->window.pExprs, NULL, &pSExpr, &num);
×
453
      QUERY_CHECK_CODE(code, lino, _error);
×
454
    
455
      code = initExprSupp(&pExtW->scalarSupp, pSExpr, num, &pTaskInfo->storageAPI.functionStore);
×
456
      if (code != TSDB_CODE_SUCCESS) {
×
457
        goto _error;
×
458
      }
459
    }
460
    
461
    size_t keyBufSize = sizeof(int64_t) * 2 + POINTER_BYTES;
128✔
462
    initResultSizeInfo(&pOperator->resultInfo, 512);
128✔
463
    code = blockDataEnsureCapacity(pExtW->binfo.pRes, pOperator->resultInfo.capacity);
128✔
464
    QUERY_CHECK_CODE(code, lino, _error);
128!
465

466
    int32_t num = 0;
128✔
467
    SExprInfo* pExprInfo = NULL;
128✔
468
    code = createExprInfo(pPhynode->window.pFuncs, NULL, &pExprInfo, &num);
128✔
469
    QUERY_CHECK_CODE(code, lino, _error);
128!
470
    code = initAggSup(&pOperator->exprSupp, &pExtW->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, 0, 0);
128✔
471
    QUERY_CHECK_CODE(code, lino, _error);
128!
472

473
    nodesWalkExprs(pPhynode->window.pFuncs, extWindowHasCountLikeFunc, &pExtW->hasCountFunc);
128✔
474
    if (pExtW->hasCountFunc) {
128!
475
      code = extWindowCreateEmptyInputBlock(pOperator, &pExtW->pEmptyInputBlock);
128✔
476
      QUERY_CHECK_CODE(code, lino, _error);
128!
477
      qDebug("%s ext window has CountLikeFunc", pOperator->pTaskInfo->id.str);
128✔
478
    } else {
479
      qDebug("%s ext window doesn't have CountLikeFunc", pOperator->pTaskInfo->id.str);
×
480
    }
481

482
    code = initExecTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pTaskInfo->window);
128✔
483
    QUERY_CHECK_CODE(code, lino, _error);
128!
484
  } else {
485
    size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
×
486
    
487
    if (pPhynode->window.pExprs != NULL) {
×
488
      int32_t    num = 0;
×
489
      SExprInfo* pSExpr = NULL;
×
490
      code = createExprInfo(pPhynode->window.pExprs, NULL, &pSExpr, &num);
×
491
      QUERY_CHECK_CODE(code, lino, _error);
×
492
    
493
      code = initExprSupp(&pExtW->scalarSupp, pSExpr, num, &pTaskInfo->storageAPI.functionStore);
×
494
      if (code != TSDB_CODE_SUCCESS) {
×
495
        goto _error;
×
496
      }
497
    }
498
    
499
    initResultSizeInfo(&pOperator->resultInfo, 512);
×
500
    code = blockDataEnsureCapacity(pResBlock, 512);
×
501
    TSDB_CHECK_CODE(code, lino, _error);
×
502
    
503
    int32_t    numOfExpr = 0;
×
504
    SExprInfo* pExprInfo = NULL;
×
505
    code = createExprInfo(pPhynode->window.pFuncs, NULL, &pExprInfo, &numOfExpr);
×
506
    TSDB_CHECK_CODE(code, lino, _error);
×
507
    
508
    code = initAggSup(&pOperator->exprSupp, &pExtW->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str,
×
509
                              pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
×
510
    TSDB_CHECK_CODE(code, lino, _error);
×
511
    pOperator->exprSupp.hasWindowOrGroup = false;
×
512
    
513
    code = setFunctionResultOutput(pOperator, &pExtW->binfo, &pExtW->aggSup, MAIN_SCAN, numOfExpr);
×
514
    TSDB_CHECK_CODE(code, lino, _error);
×
515
    
516
    code = filterInitFromNode((SNode*)pNode->pConditions, &pOperator->exprSupp.pFilterInfo, 0,
×
517
                              pTaskInfo->pStreamRuntimeInfo);
×
518
    TSDB_CHECK_CODE(code, lino, _error);
×
519
    
520
    pExtW->binfo.pRes = pResBlock;
×
521
    pExtW->binfo.inputTsOrder = pNode->inputTsOrder;
×
522
    pExtW->binfo.outputTsOrder = pNode->outputTsOrder;
×
523
    code = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfExpr, &pExtW->pPseudoColInfo);
×
524
    TSDB_CHECK_CODE(code, lino, _error);
×
525
  }
526

527
  initResultRowInfo(&pExtW->binfo.resultRowInfo);
128✔
528

529
  pOperator->fpSet = createOperatorFpSet(doOpenExternalWindow, externalWindowNext, NULL, destroyExternalWindowOperatorInfo,
128✔
530
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
531
  setOperatorResetStateFn(pOperator, resetExternalWindowOperator);
128✔
532
  code = appendDownstream(pOperator, &pDownstream, 1);
128✔
533
  if (code != 0) {
128!
534
    goto _error;
×
535
  }
536

537
  *pOptrOut = pOperator;
128✔
538
  return code;
128✔
539

540
_error:
×
541

542
  if (pExtW != NULL) {
×
543
    destroyExternalWindowOperatorInfo(pExtW);
×
544
  }
545

546
  destroyOperatorAndDownstreams(pOperator, &pDownstream, 1);
×
547
  pTaskInfo->code = code;
×
548
  qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code));
×
549
  return code;
×
550
}
551

552
int64_t* extractTsCol(SSDataBlock* pBlock, int32_t primaryTsIndex, SExecTaskInfo* pTaskInfo) {
332✔
553
  TSKEY* tsCols = NULL;
332✔
554

555
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
332!
556
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, primaryTsIndex);
332✔
557
    if (!pColDataInfo) {
332!
558
      pTaskInfo->code = terrno;
×
559
      T_LONG_JMP(pTaskInfo->env, terrno);
×
560
    }
561

562
    tsCols = (int64_t*)pColDataInfo->pData;
332✔
563
    if (tsCols[0] == 0) {
332!
564
      qWarn("%s at line %d.block start ts:%" PRId64 ",end ts:%" PRId64, __func__, __LINE__, tsCols[0],
×
565
            tsCols[pBlock->info.rows - 1]);
566
    }
567

568
    if (tsCols[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
332!
569
      int32_t code = blockDataUpdateTsWindow(pBlock, primaryTsIndex);
332✔
570
      if (code != TSDB_CODE_SUCCESS) {
332!
571
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
572
        pTaskInfo->code = code;
×
573
        T_LONG_JMP(pTaskInfo->env, code);
×
574
      }
575
    }
576
  }
577

578
  return tsCols;
332✔
579
}
580

581
static int32_t getExtWinCurIdx(SExecTaskInfo* pTaskInfo) {
1,842✔
582
  return pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx;
1,842✔
583
}
584

585
static void incExtWinCurIdx(SOperatorInfo* pOperator) {
234✔
586
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
234✔
587
  pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx++;
234✔
588
}
234✔
589

590
static void setExtWinCurIdx(SOperatorInfo* pOperator, int32_t idx) {
263,672✔
591
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
263,672✔
592
  pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx = idx;
263,672✔
593
}
263,672✔
594

595

596
static void incExtWinOutIdx(SOperatorInfo* pOperator) {
×
597
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
×
598
  pTaskInfo->pStreamRuntimeInfo->funcInfo.curOutIdx++;
×
599
}
×
600

601
static const STimeWindow* getExtWindow(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW, TSKEY ts) {
332✔
602
  // TODO handle desc order
603
  for (int32_t i = 0; i < pExtW->pWins->size; ++i) {
1,101!
604
    const STimeWindow* pWin = taosArrayGet(pExtW->pWins, i);
1,101✔
605
    if (ts >= pWin->skey && ts < pWin->ekey) {
1,101!
606
      setExtWinCurIdx(pOperator, i);
332✔
607
      return pWin;
332✔
608
    }
609
  }
610
  return NULL;
×
611
}
612

613
static const STimeWindow* getExtNextWindow(SExternalWindowOperator* pExtW, SExecTaskInfo* pTaskInfo) {
566✔
614
  int32_t curIdx = getExtWinCurIdx(pTaskInfo);
566✔
615
  if (curIdx + 1 >= pExtW->pWins->size) return NULL;
566✔
616
  return taosArrayGet(pExtW->pWins, curIdx + 1);
378✔
617
}
618

619
static int32_t getNextStartPos(STimeWindow win, const SDataBlockInfo* pBlockInfo, int32_t lastEndPos, int32_t order, int32_t* nextPos, int64_t* tsCol) {
378✔
620
  bool ascQuery = order == TSDB_ORDER_ASC;
378✔
621

622
  if (win.ekey <= pBlockInfo->window.skey && ascQuery) {
378!
623
    return -2;
×
624
  }
625
//if (win.skey > pBlockInfo->window.ekey && !ascQuery) return -2;
626

627
  if (win.skey > pBlockInfo->window.ekey && ascQuery) return -1;
378!
628
//if (win.ekey < pBlockInfo->window.skey && !ascQuery) return -1;
629

630
  while (true) {
631
    if (win.ekey <= tsCol[lastEndPos + 1] && ascQuery) return -2;
234!
632
    if (win.skey <= tsCol[lastEndPos + 1] && ascQuery) break;
215!
633
    lastEndPos++;
×
634
  }
635

636
  *nextPos = lastEndPos + 1;
215✔
637
  return 0;
215✔
638
}
639

640
static int32_t setExtWindowOutputBuf(SResultRowInfo* pResultRowInfo, const STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
263,340✔
641
                                     int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
642
                                     SExecTaskInfo* pTaskInfo) {
643
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
263,340✔
644
                                                  true, tableGroupId, pTaskInfo, true, pAggSup, true);
645

646
  if (pResultRow == NULL || pTaskInfo->code != 0) {
263,340!
647
    *pResult = NULL;
×
648
    qError("failed to set result output buffer, error:%s", tstrerror(pTaskInfo->code));
×
649
    return pTaskInfo->code;
×
650
  }
651

652
  qDebug("current result rows num:%d", tSimpleHashGetSize(pAggSup->pResultRowHashTable));
263,340✔
653

654
  // set time window for current result
655
  TAOS_SET_POBJ_ALIGNED(&pResultRow->win, win);
263,340✔
656
  *pResult = pResultRow;
263,340✔
657
  pTaskInfo->code = setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
263,340✔
658
  if (pTaskInfo->code) {
263,340!
659
    *pResult = NULL;
×
660
    qError("failed to set result row ctx, error:%s", tstrerror(pTaskInfo->code));
×
661
    return pTaskInfo->code;
×
662
  }
663

664
  return TSDB_CODE_SUCCESS;
263,340✔
665
}
666

667
static int32_t extWindowDoHashAgg(SOperatorInfo* pOperator, int32_t startPos, int32_t forwardRows,
263,340✔
668
                                  SSDataBlock* pInputBlock) {
669
  if (forwardRows == 0) return 0;
263,340!
670
  SExprSupp*               pSup = &pOperator->exprSupp;
263,340✔
671
  SExternalWindowOperator* pExtW = pOperator->info;
263,340✔
672
  return applyAggFunctionOnPartialTuples(pOperator->pTaskInfo, pSup->pCtx, &pExtW->twAggSup.timeWindowData, startPos,
263,340✔
673
                                         forwardRows, pInputBlock->info.rows, pSup->numOfExprs);
263,340✔
674
}
675

676
static SSDataBlock* extWindowGetOutputBlock(SOperatorInfo* pOperator, int32_t winIdx) {
×
677
  SExternalWindowOperator* pExtW = pOperator->info;
×
678
  SSDataBlock*             pBlock = NULL;
×
679
  SBlockList*              pBlockList = taosArrayGet(pExtW->pOutputBlocks, winIdx);
×
680
  int32_t                  code = blockListGetLastBlock(pBlockList, &pBlock);
×
681
  if (code != 0) terrno = code;
×
682
  return pBlock;
×
683
}
684

685
static int32_t extWindowCopyRows(SOperatorInfo* pOperator, SSDataBlock* pInputBlock, int32_t startPos,
×
686
                                 int32_t forwardRows) {
687
  if (forwardRows == 0) return 0;
×
688
  SExternalWindowOperator* pExtW = pOperator->info;
×
689
  SSDataBlock*             pResBlock = extWindowGetOutputBlock(pOperator, getExtWinCurIdx(pOperator->pTaskInfo));
×
690
  SExprSupp*               pExprSup = &pExtW->scalarSupp;
×
691

692
  if (!pResBlock) {
×
693
    qError("%s failed to get output block for ext window:%s", GET_TASKID(pOperator->pTaskInfo), tstrerror(terrno));
×
694
    return terrno;
×
695
  }
696
  int32_t rowsToCopy = forwardRows;
×
697
  int32_t code = 0;
×
698
  if (!pExtW->pTmpBlock)
×
699
    code = createOneDataBlock(pInputBlock, false, &pExtW->pTmpBlock);
×
700
  else
701
    blockDataCleanup(pExtW->pTmpBlock);
×
702
  if (code) {
×
703
    qError("%s failed to create datablock:%s", GET_TASKID(pOperator->pTaskInfo), tstrerror(terrno));
×
704
    return code;
×
705
  }
706
  code = blockDataEnsureCapacity(pExtW->pTmpBlock, TMAX(1, rowsToCopy));
×
707
  if (code) {
×
708
    qError("%s failed to ensure capacity:%s", GET_TASKID(pOperator->pTaskInfo), tstrerror(terrno));
×
709
    return code;
×
710
  }
711
  if (rowsToCopy > 0) {
×
712
    code = blockDataMergeNRows(pExtW->pTmpBlock, pInputBlock, startPos, rowsToCopy);
×
713
    if (code == 0) {
×
714
      code = projectApplyFunctionsWithSelect(pExprSup->pExprInfo, pResBlock, pExtW->pTmpBlock, pExprSup->pCtx, pExprSup->numOfExprs,
×
715
          NULL, GET_STM_RTINFO(pOperator->pTaskInfo), true);
×
716
    }
717
  }
718

719
  if (code != 0) return code;
×
720

721
  return code;
×
722
}
723

724
static int32_t hashExternalWindowProject(SOperatorInfo* pOperator, SSDataBlock* pInputBlock) {
×
725
  SExprSupp*               pSup = &pOperator->exprSupp;
×
726
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
×
727
  SExternalWindowOperator* pExtW = pOperator->info;
×
728
  SqlFunctionCtx*          pCtx = NULL;
×
729
  int64_t*                 tsCol = extractTsCol(pInputBlock, pExtW->primaryTsIndex, pTaskInfo);
×
730
  TSKEY                    ts = getStartTsKey(&pInputBlock->info.window, tsCol);
×
731
  const STimeWindow*       pWin = getExtWindow(pOperator, pExtW, ts);
×
732
  bool                     ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
×
733
  int32_t                  tsOrder = pExtW->binfo.inputTsOrder;
×
734
  int32_t startPos = 0;
×
735

736
  if (!pWin) return 0;
×
737
  TSKEY   ekey = ascScan ? pWin->ekey : pWin->skey;
×
738
  int32_t forwardRows =
739
      getNumOfRowsInTimeWindow(&pInputBlock->info, tsCol, startPos, ekey-1, binarySearchForKey, NULL, tsOrder);
×
740

741
  int32_t code = extWindowCopyRows(pOperator, pInputBlock, startPos, forwardRows);
×
742

743
  while (code == 0 && pInputBlock->info.rows > startPos + forwardRows) {
×
744
    pWin = getExtNextWindow(pExtW, pTaskInfo);
×
745
    if (!pWin) break;
×
746
    incExtWinCurIdx(pOperator);
×
747

748
    startPos = startPos + forwardRows;
×
749
    ekey = ascScan ? pWin->ekey : pWin->skey;
×
750
    forwardRows =
751
        getNumOfRowsInTimeWindow(&pInputBlock->info, tsCol, startPos, ekey-1, binarySearchForKey, NULL, tsOrder);
×
752
    code = extWindowCopyRows(pOperator, pInputBlock, startPos, forwardRows);
×
753
  }
754
  return code;
×
755
}
756

757
static int32_t extWindowDoIndefRows(SOperatorInfo* pOperator, SSDataBlock* pRes, SSDataBlock* pBlock) {
×
758
  SExternalWindowOperator* pExtW = pOperator->info;
×
759
  SOptrBasicInfo*     pInfo = &pExtW->binfo;
×
760
  SExprSupp*          pSup = &pOperator->exprSupp;
×
761
  SExecTaskInfo*      pTaskInfo = pOperator->pTaskInfo;
×
762
  int32_t order = pInfo->inputTsOrder;
×
763
  int32_t scanFlag = pBlock->info.scanFlag;
×
764
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
×
765

766
  SExprSupp* pScalarSup = &pExtW->scalarSupp;
×
767
  if (pScalarSup->pExprInfo != NULL) {
×
768
    TAOS_CHECK_EXIT(projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
×
769
                                 pExtW->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo)));
770
  }
771

772
  TAOS_CHECK_EXIT(setInputDataBlock(pSup, pBlock, order, scanFlag, false));
×
773

774
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pRes, pRes->info.rows + pBlock->info.rows));
×
775

776
  TAOS_CHECK_EXIT(projectApplyFunctions(pSup->pExprInfo, pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
×
777
                               pExtW->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo)));
778

779
_exit:
×
780

781
  if (code) {
×
782
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __FUNCTION__, lino, tstrerror(code));
×
783
  }
784

785
  return code;
×
786
}
787

788

789
static int32_t extWindowHandleIndefRows(SOperatorInfo* pOperator, SSDataBlock* pInputBlock, int32_t startPos,
×
790
                                 int32_t forwardRows) {
791
  if (forwardRows == 0) return 0;
×
792
  SExternalWindowOperator* pExtW = pOperator->info;
×
793
  SSDataBlock*             pResBlock = extWindowGetOutputBlock(pOperator, getExtWinCurIdx(pOperator->pTaskInfo));
×
794

795
  if (!pResBlock) {
×
796
    qError("%s failed to get output block for ext window:%s", GET_TASKID(pOperator->pTaskInfo), tstrerror(terrno));
×
797
    return terrno;
×
798
  }
799
  
800
  int32_t rowsToCopy = forwardRows;
×
801
  int32_t code = 0;
×
802
  if (!pExtW->pTmpBlock)
×
803
    code = createOneDataBlock(pInputBlock, false, &pExtW->pTmpBlock);
×
804
  else
805
    blockDataCleanup(pExtW->pTmpBlock);
×
806
    
807
  if (code) {
×
808
    qError("%s failed to create datablock:%s", GET_TASKID(pOperator->pTaskInfo), tstrerror(terrno));
×
809
    return code;
×
810
  }
811
  code = blockDataEnsureCapacity(pExtW->pTmpBlock, TMAX(1, rowsToCopy));
×
812
  if (code) {
×
813
    qError("%s failed to ensure capacity:%s", GET_TASKID(pOperator->pTaskInfo), tstrerror(terrno));
×
814
    return code;
×
815
  }
816
  
817
  if (rowsToCopy > 0) {
×
818
    code = blockDataMergeNRows(pExtW->pTmpBlock, pInputBlock, startPos, rowsToCopy);
×
819
    if (code == 0) {
×
820
      code = extWindowDoIndefRows(pOperator, pResBlock, pExtW->pTmpBlock);
×
821
    }
822
  }
823

824
  return code;
×
825
}
826

827

828
static int32_t hashExternalWindowIndefRows(SOperatorInfo* pOperator, SSDataBlock* pInputBlock) {
×
829
  SExprSupp*               pSup = &pOperator->exprSupp;
×
830
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
×
831
  SExternalWindowOperator* pExtW = pOperator->info;
×
832
  SqlFunctionCtx*          pCtx = NULL;
×
833
  int64_t*                 tsCol = extractTsCol(pInputBlock, pExtW->primaryTsIndex, pTaskInfo);
×
834
  TSKEY                    ts = getStartTsKey(&pInputBlock->info.window, tsCol);
×
835
  const STimeWindow*       pWin = getExtWindow(pOperator, pExtW, ts);
×
836
  bool                     ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
×
837
  int32_t                  tsOrder = pExtW->binfo.inputTsOrder;
×
838
  int32_t                  startPos = 0;
×
839

840
  if (!pWin) return 0;
×
841
  TSKEY   ekey = ascScan ? pWin->ekey : pWin->skey;
×
842
  int32_t forwardRows =
843
      getNumOfRowsInTimeWindow(&pInputBlock->info, tsCol, startPos, ekey-1, binarySearchForKey, NULL, tsOrder);
×
844

845
  int32_t code = extWindowHandleIndefRows(pOperator, pInputBlock, startPos, forwardRows);
×
846

847
  while (code == 0 && pInputBlock->info.rows > startPos + forwardRows) {
×
848
    pWin = getExtNextWindow(pExtW, pTaskInfo);
×
849
    if (!pWin) break;
×
850
    incExtWinCurIdx(pOperator);
×
851

852
    startPos = startPos + forwardRows;
×
853
    ekey = ascScan ? pWin->ekey : pWin->skey;
×
854
    forwardRows =
855
        getNumOfRowsInTimeWindow(&pInputBlock->info, tsCol, startPos, ekey-1, binarySearchForKey, NULL, tsOrder);
×
856
    code = extWindowHandleIndefRows(pOperator, pInputBlock, startPos, forwardRows);
×
857
  }
858
  
859
  return code;
×
860
}
861

862

863
static int32_t extWindowProcessEmptyWins(SOperatorInfo* pOperator, SSDataBlock* pBlock, bool allRemains) {
729✔
864
  int32_t code = 0, lino = 0;
729✔
865
  SExternalWindowOperator* pExtW = (SExternalWindowOperator*)pOperator->info;
729✔
866
  bool ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
729✔
867
  int32_t currIdx = getExtWinCurIdx(pOperator->pTaskInfo);
729✔
868
  SExprSupp* pSup = &pOperator->exprSupp;
729✔
869

870
  if (NULL == pExtW->pEmptyInputBlock) {
729!
871
    goto _exit;
×
872
  }
873

874
  int32_t endIdx = allRemains ? (pExtW->pWins->size - 1) : (currIdx - 1);
729✔
875
  SResultRowInfo* pResultRowInfo = &pExtW->binfo.resultRowInfo;
729✔
876
  SResultRow* pResult = NULL;
729✔
877
  SSDataBlock* pInput = pExtW->pEmptyInputBlock;
729✔
878

879
  if ((pExtW->lastWinId + 1) <= endIdx) {
729✔
880
    TAOS_CHECK_EXIT(setInputDataBlock(pSup, pExtW->pEmptyInputBlock, pExtW->binfo.inputTsOrder, MAIN_SCAN, true));
97!
881
  }
882
  
883
  for (int32_t i = pExtW->lastWinId + 1; i <= endIdx; ++i) {
263,522✔
884
    const STimeWindow* pWin = taosArrayGet(pExtW->pWins, i);
262,793✔
885

886
    setExtWinCurIdx(pOperator, i);
262,793✔
887
    qDebug("%s %dth ext empty window start:%" PRId64 ", end:%" PRId64 ", ascScan:%d",
262,793✔
888
           GET_TASKID(pOperator->pTaskInfo), i, pWin->skey, pWin->ekey, ascScan);
889

890
    TAOS_CHECK_EXIT(setExtWindowOutputBuf(pResultRowInfo, pWin, &pResult, pInput->info.id.groupId, pSup->pCtx, pSup->numOfExprs,
262,793!
891
                                pSup->rowEntryInfoOffset, &pExtW->aggSup, pOperator->pTaskInfo));
892

893
    updateTimeWindowInfo(&pExtW->twAggSup.timeWindowData, pWin, 1);
262,793✔
894
    code = extWindowDoHashAgg(pOperator, 0, 1, pInput);
262,793✔
895
    pExtW->lastWinId = i;  
262,793✔
896
    TAOS_CHECK_EXIT(code);
262,793!
897
  }
898

899
  
900
_exit:
729✔
901

902
  if (code) {
729!
903
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __FUNCTION__, lino, tstrerror(code));
×
904
  } else {
905
    if (pBlock) {
729✔
906
      TAOS_CHECK_EXIT(setInputDataBlock(pSup, pBlock, pExtW->binfo.inputTsOrder, MAIN_SCAN, true));
547!
907
    }
908

909
    if (!allRemains) {
729✔
910
      setExtWinCurIdx(pOperator, currIdx);  
547✔
911
    }
912
  }
913

914
  return code;
729✔
915
}
916

917
static void hashExternalWindowAgg(SOperatorInfo* pOperator, SSDataBlock* pInputBlock) {
332✔
918
  SExternalWindowOperator* pExtW = (SExternalWindowOperator*)pOperator->info;
332✔
919
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
332✔
920
  SExprSupp*               pSup = &pOperator->exprSupp;
332✔
921
  SResultRowInfo*          pResultRowInfo = &pExtW->binfo.resultRowInfo;
332✔
922
  int32_t                  startPos = 0;
332✔
923
  int32_t                  numOfOutput = pSup->numOfExprs;
332✔
924
  int64_t*                 tsCols = extractTsCol(pInputBlock, pExtW->primaryTsIndex, pTaskInfo);
332✔
925
  bool                     ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
332✔
926
  TSKEY                    ts = getStartTsKey(&pInputBlock->info.window, tsCols);
332✔
927
  SResultRow*              pResult = NULL;
332✔
928
  int32_t                  ret = 0;
332✔
929

930
  const STimeWindow* pWin = getExtWindow(pOperator, pExtW, ts);
332✔
931
  if (pWin == NULL) {
332!
932
    qError("%s failed to get time window for ts:%" PRId64 ", error:%s", GET_TASKID(pOperator->pTaskInfo), ts, tstrerror(terrno));
×
933
    return;
×
934
  }
935

936
  ret = extWindowProcessEmptyWins(pOperator, pInputBlock, false);
332✔
937
  if (ret != 0) {
332!
938
    T_LONG_JMP(pTaskInfo->env, ret);
×
939
  }
940
  
941
  qDebug("%s ext window1 start:%" PRId64 ", end:%" PRId64 ", ts:%" PRId64 ", ascScan:%d",
332✔
942
         GET_TASKID(pOperator->pTaskInfo), pWin->skey, pWin->ekey, ts, ascScan);        
943

944
  SExprSupp* pScalarSup = &pExtW->scalarSupp;
332✔
945
  if (pScalarSup->pExprInfo != NULL) {
332!
946
    ret = projectApplyFunctions(pScalarSup->pExprInfo, pInputBlock, pInputBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
×
947
                                 pExtW->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo));
×
948
    if (ret != 0) {
×
949
      T_LONG_JMP(pTaskInfo->env, ret);
×
950
    }
951
  }
952

953
  STimeWindow win = *pWin;
332✔
954
  ret = setExtWindowOutputBuf(pResultRowInfo, &win, &pResult, pInputBlock->info.id.groupId, pSup->pCtx, numOfOutput,
332✔
955
                              pSup->rowEntryInfoOffset, &pExtW->aggSup, pTaskInfo);
956
  if (ret != 0 || !pResult) {
332!
957
    T_LONG_JMP(pTaskInfo->env, ret);
×
958
  }
959
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
332!
960
  int32_t forwardRows = getNumOfRowsInTimeWindow(&pInputBlock->info, tsCols, startPos, ekey - 1, binarySearchForKey, NULL,
332✔
961
                                                 pExtW->binfo.inputTsOrder);
962

963
  updateTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &win, 1);
332✔
964
  ret = extWindowDoHashAgg(pOperator, startPos, forwardRows, pInputBlock);
332✔
965
  pExtW->lastWinId = getExtWinCurIdx(pTaskInfo);
332✔
966

967
  if (ret != 0) {
332!
968
    T_LONG_JMP(pTaskInfo->env, ret);
×
969
  }
970

971
  int32_t nextPosGot = 0;
332✔
972
  while (1) {
234✔
973
    int32_t prevEndPos = forwardRows + startPos - 1;
566✔
974
    if (prevEndPos >= pInputBlock->info.rows) {
566!
975
      break;
×
976
    }
977
    
978
    pWin = getExtNextWindow(pExtW, pTaskInfo);
566✔
979
    if (!pWin)
566✔
980
      break;
188✔
981
    else
982
      win = *pWin;
378✔
983
      
984
    qDebug("%s ext window2 start:%" PRId64 ", end:%" PRId64 ", ts:%" PRId64 ", ascScan:%d",
378✔
985
           GET_TASKID(pOperator->pTaskInfo), win.skey, win.ekey, ts, ascScan);
986
           
987
    nextPosGot = getNextStartPos(win, &pInputBlock->info, prevEndPos, pExtW->binfo.inputTsOrder, &startPos, tsCols);
378✔
988
    if (-1 == nextPosGot) {
378✔
989
      qDebug("%s ignore current block", GET_TASKID(pOperator->pTaskInfo));
144!
990
      break;
144✔
991
    }
992
    if (-2 == nextPosGot) {
234✔
993
      qDebug("%s skip current window", GET_TASKID(pOperator->pTaskInfo));
19!
994
      incExtWinCurIdx(pOperator);
19✔
995
      continue;
19✔
996
    }
997
    
998
    incExtWinCurIdx(pOperator);
215✔
999

1000
    ret = extWindowProcessEmptyWins(pOperator, pInputBlock, false);
215✔
1001
    if (ret != 0) {
215!
1002
      T_LONG_JMP(pTaskInfo->env, ret);
×
1003
    }
1004

1005
    ekey = ascScan ? win.ekey : win.skey;
215!
1006
    forwardRows = getNumOfRowsInTimeWindow(&pInputBlock->info, tsCols, startPos, ekey - 1, binarySearchForKey, NULL,
215✔
1007
                                           pExtW->binfo.inputTsOrder);
1008

1009
    ret = setExtWindowOutputBuf(pResultRowInfo, &win, &pResult, pInputBlock->info.id.groupId, pSup->pCtx,
215✔
1010
                                numOfOutput, pSup->rowEntryInfoOffset, &pExtW->aggSup, pTaskInfo);
1011
    if (ret != 0 || !pResult) T_LONG_JMP(pTaskInfo->env, ret);
215!
1012

1013
    updateTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &win, 1);
215✔
1014
    ret = extWindowDoHashAgg(pOperator, startPos, forwardRows, pInputBlock);
215✔
1015
    pExtW->lastWinId = getExtWinCurIdx(pTaskInfo);  
215✔
1016
    if (ret != 0) {
215!
1017
      T_LONG_JMP(pTaskInfo->env, ret);
×
1018
    }
1019
  }
1020
}
1021

1022

1023
static int32_t doOpenExternalWindow(SOperatorInfo* pOperator) {
829✔
1024
  if (OPTR_IS_OPENED(pOperator)) return TSDB_CODE_SUCCESS;
829✔
1025
  int32_t                  code = 0;
188✔
1026
  int32_t                  lino = 0;
188✔
1027
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
188✔
1028
  SOperatorInfo*           pDownstream = pOperator->pDownstream[0];
188✔
1029
  SExternalWindowOperator* pExtW = pOperator->info;
188✔
1030
  SExprSupp*               pSup = &pOperator->exprSupp;
188✔
1031
  pTaskInfo->pStreamRuntimeInfo->funcInfo.extWinProjMode = (pExtW->mode != EEXT_MODE_AGG);
188✔
1032

1033
  int32_t scanFlag = MAIN_SCAN;
188✔
1034
  int64_t st = taosGetTimestampUs();
188✔
1035

1036
  if (!pExtW->pWins) {
188!
1037
    size_t size = taosArrayGetSize(pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPesudoFuncVals);
188✔
1038
    pExtW->pWins = taosArrayInit(size, sizeof(STimeWindow));
188✔
1039
    if (!pExtW->pWins) QUERY_CHECK_CODE(terrno, lino, _exit);
188!
1040
    pExtW->pOutputBlocks = taosArrayInit(size, sizeof(SBlockList));
188✔
1041
    if (!pExtW->pOutputBlocks) QUERY_CHECK_CODE(terrno, lino, _exit);
188!
1042
    // pExtW->pOffsetList = taosArrayInit(size, sizeof(SLimitInfo));
1043
    // if (!pExtW->pOffsetList) QUERY_CHECK_CODE(terrno, lino, _end);
1044
    for (int32_t i = 0; i < size; ++i) {
263,337✔
1045
      SSTriggerCalcParam* pParam = taosArrayGet(pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPesudoFuncVals, i);
263,149✔
1046
      STimeWindow win = {.skey = pParam->wstart, .ekey = pParam->wend};
263,149✔
1047
      if (pTaskInfo->pStreamRuntimeInfo->funcInfo.triggerType != STREAM_TRIGGER_SLIDING) {
263,149✔
1048
        win.ekey++;
154✔
1049
      }
1050
      TSDB_CHECK_NULL(taosArrayPush(pExtW->pWins, &win), code, lino, _exit, terrno);
526,298!
1051

1052
      SBlockList bl = {.pSrcBlock = pExtW->binfo.pRes, .pBlocks = 0, .blockRowNumThreshold = 4096};
263,149✔
1053
      code = blockListInit(&bl, 4096);
263,149✔
1054
      if (code != 0) QUERY_CHECK_CODE(code, lino, _exit);
263,149!
1055
      TSDB_CHECK_NULL(taosArrayPush(pExtW->pOutputBlocks, &bl), code, lino, _exit, terrno);
526,298!
1056
    }
1057
    pExtW->outputWinId = pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx;
188✔
1058
    pExtW->lastWinId = -1;
188✔
1059
  }
1060

1061
  while (1) {
332✔
1062
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
520✔
1063
    if (pBlock == NULL) {
514✔
1064
      if (EEXT_MODE_AGG == pExtW->mode) {
182!
1065
        TAOS_CHECK_EXIT(extWindowProcessEmptyWins(pOperator, pBlock, true));
182!
1066
      }
1067
      break;
182✔
1068
    }
1069

1070
    printDataBlock(pBlock, __func__, "externalwindow");
332✔
1071

1072
    qDebug("ext windowpExtW->mode:%d", pExtW->mode);
332✔
1073
    switch (pExtW->mode) {
332!
1074
      case EEXT_MODE_SCALAR:
×
1075
        TAOS_CHECK_EXIT(hashExternalWindowProject(pOperator, pBlock));
×
1076
        break;
×
1077
      case EEXT_MODE_AGG:
332✔
1078
        hashExternalWindowAgg(pOperator, pBlock);
332✔
1079
        break;
332✔
1080
      case EEXT_MODE_INDEFR_FUNC:
×
1081
        TAOS_CHECK_EXIT(hashExternalWindowIndefRows(pOperator, pBlock));
×
1082
        break;
×
1083
      default:
×
1084
        break;
×
1085
    }
1086
  }
1087

1088
  OPTR_SET_OPENED(pOperator);
182✔
1089

1090
  if (pExtW->mode == EEXT_MODE_AGG) {
182!
1091
    qDebug("ext window before dump final rows num:%d", tSimpleHashGetSize(pExtW->aggSup.pResultRowHashTable));
182✔
1092

1093
    code = initGroupedResultInfo(&pExtW->groupResInfo, pExtW->aggSup.pResultRowHashTable, pExtW->binfo.inputTsOrder);
182✔
1094
    QUERY_CHECK_CODE(code, lino, _exit);
182!
1095

1096
    qDebug("ext window after dump final rows num:%d", tSimpleHashGetSize(pExtW->aggSup.pResultRowHashTable));
182✔
1097
  }
1098

1099
_exit:
22✔
1100

1101
  if (code != 0) {
182!
1102
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
1103
    pTaskInfo->code = code;
×
1104
    T_LONG_JMP(pTaskInfo->env, code);
×
1105
  }
1106
  return code;
182✔
1107
}
1108

1109
static int32_t externalWindowNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
985✔
1110
  int32_t                  code = 0;
985✔
1111
  int32_t                  lino = 0;
985✔
1112
  SExternalWindowOperator* pExtW = pOperator->info;
985✔
1113
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
985✔
1114
  SExprSupp*               pSup = &pOperator->exprSupp;
985✔
1115

1116
  if (pOperator->status == OP_EXEC_DONE) {
985✔
1117
    *ppRes = NULL;
156✔
1118
    return code;
156✔
1119
  }
1120

1121
  SSDataBlock* pBlock = pExtW->binfo.pRes;
829✔
1122
  blockDataCleanup(pBlock);
829✔
1123
  code = pOperator->fpSet._openFn(pOperator);
829✔
1124
  QUERY_CHECK_CODE(code, lino, _end);
823!
1125

1126
  while (1) {
1127
    if (pExtW->mode == EEXT_MODE_SCALAR || pExtW->mode == EEXT_MODE_INDEFR_FUNC) {
823!
1128
      if (pExtW->outputWinId >= taosArrayGetSize(pExtW->pOutputBlocks)) {
×
1129
        pBlock->info.rows = 0;
×
1130
        break;
×
1131
      }
1132
      SBlockList* pList = taosArrayGet(pExtW->pOutputBlocks, pExtW->outputWinId);
×
1133
      // SLimitInfo* pLimitInfo = taosArrayGet(pExtW->pOffsetList, pExtW->outputWinId);
1134
      if (pExtW->pOutputBlockListNode == NULL) {
×
1135
        pExtW->pOutputBlockListNode = tdListGetHead(pList->pBlocks);
×
1136
      } else {
1137
        if (!pExtW->pOutputBlockListNode->dl_next_) {
×
1138
          pExtW->pOutputBlockListNode = NULL;
×
1139
          pExtW->outputWinId++;
×
1140
          incExtWinOutIdx(pOperator);
×
1141
          continue;
×
1142
        }
1143
        pExtW->pOutputBlockListNode = pExtW->pOutputBlockListNode->dl_next_;
×
1144
      }
1145

1146
      if (pExtW->pOutputBlockListNode) {
×
1147
        code = blockDataMerge(pBlock, *(SSDataBlock**)pExtW->pOutputBlockListNode->data);
×
1148
        if (code != 0) goto _end;
×
1149

1150
        // int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock);
1151
        // if (status == EXTERNAL_RETRIEVE_CONTINUE) {
1152
        //   continue;
1153
        // } else if (status == EXTERNAL_RETRIEVE_NEXT_WINDOW) {
1154
        //   pExtW->pOutputBlockListNode = NULL;
1155
        //   pExtW->outputWinId++;
1156
        //   incExtWinOutIdx(pOperator);
1157
        //   continue;
1158
        // } else if (status == EXTERNAL_RETRIEVE_DONE) {
1159
        //   // do nothing, just break
1160
        // }
1161
      } else {
1162
        pExtW->outputWinId++;
×
1163
        incExtWinOutIdx(pOperator);
×
1164
        continue;
×
1165
      }
1166
      break;
×
1167
    } else {
1168
      doBuildResultDatablock(pOperator, &pExtW->binfo, &pExtW->groupResInfo, pExtW->aggSup.pResultBuf);
823✔
1169
      bool hasRemain = hasRemainResults(&pExtW->groupResInfo);
823✔
1170
      if (!hasRemain) {
823✔
1171
        setOperatorCompleted(pOperator);
182✔
1172
        break;
182✔
1173
      }
1174
      if (pExtW->binfo.pRes->info.rows > 0) break;
641!
1175
    }
1176
  }
1177

1178
  pOperator->resultInfo.totalRows += pExtW->binfo.pRes->info.rows;
823✔
1179
  
1180
_end:
823✔
1181

1182
  if (code != 0) {
823!
1183
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1184
    pTaskInfo->code = code;
×
1185
    T_LONG_JMP(pTaskInfo->env, code);
×
1186
  }
1187
  (*ppRes) = (pExtW->binfo.pRes->info.rows == 0) ? NULL : pExtW->binfo.pRes;
823!
1188

1189
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM && (*ppRes)) {
823!
1190
    printDataBlock(*ppRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
823✔
1191
  }
1192
  
1193
  return code;
823✔
1194
}
1195

1196
static int32_t setSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, const STimeWindow* pWin, SResultRow** pResult,
×
1197
                                       SExprSupp* pExprSup, SAggSupporter* pAggSup) {
1198
  if (*pResult == NULL) {
×
1199
    *pResult = getNewResultRow(pAggSup->pResultBuf, &pAggSup->currentPageId, pAggSup->resultRowSize);
×
1200
    if (!*pResult) {
×
1201
      qError("get new resultRow failed, err:%s", tstrerror(terrno));
×
1202
      return terrno;
×
1203
    }
1204
    pResultRowInfo->cur = (SResultRowPosition){.pageId = (*pResult)->pageId, .offset = (*pResult)->offset};
×
1205
  }
1206
  (*pResult)->win = *pWin;
×
1207
  return setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
×
1208
}
1209

1210
static int32_t doMergeAlignExtWindowAgg(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock, SSDataBlock* pResultBlock) {
×
1211
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
×
1212
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
×
1213

1214
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
×
1215
  SExprSupp*     pSup = &pOperator->exprSupp;
×
1216
  int32_t        code = 0;
×
1217

1218
  int32_t startPos = 0;
×
1219
  int64_t* tsCols = extractTsCol(pBlock, pExtW->primaryTsIndex, pTaskInfo);
×
1220
  TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
×
1221

1222
  const STimeWindow* pWin = getExtWindow(pOperator, pExtW, ts);
×
1223
  if (pWin == NULL) {
×
1224
    qError("failed to get time window for ts:%" PRId64 ", index:%d, error:%s", ts, pExtW->primaryTsIndex,
×
1225
           tstrerror(terrno));
1226
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_INVALID_PARA);
×
1227
  }
1228
  if (pMlExtInfo->curTs != INT64_MIN && pMlExtInfo->curTs != pWin->skey) {
×
1229
    finalizeResultRows(pExtW->aggSup.pResultBuf, &pExtW->binfo.resultRowInfo.cur, pSup, pExtW->binfo.pRes, pTaskInfo);
×
1230
    resetResultRow(pMlExtInfo->pResultRow, pExtW->aggSup.resultRowSize - sizeof(SResultRow));
×
1231
  }
1232
  code = setSingleOutputTupleBuf(pResultRowInfo, pWin, &pMlExtInfo->pResultRow, pSup, &pExtW->aggSup);
×
1233
  if (code != 0 || pMlExtInfo->pResultRow == NULL) {
×
1234
    T_LONG_JMP(pTaskInfo->env, code);
×
1235
  }
1236

1237
  int32_t currPos = startPos;
×
1238
  pMlExtInfo->curTs = pWin->skey;
×
1239
  while (++currPos < pBlock->info.rows) {
×
1240
    if (tsCols[currPos] == pMlExtInfo->curTs) continue;
×
1241

1242
    qDebug("current ts:%" PRId64 ", startPos:%d, currPos:%d, tsCols[currPos]:%" PRId64,
×
1243
      pMlExtInfo->curTs, startPos, currPos, tsCols[currPos]); 
1244
    code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pExtW->twAggSup.timeWindowData, startPos,
×
1245
                                           currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
×
1246
    if (code != 0) {
×
1247
      T_LONG_JMP(pTaskInfo->env, code);
×
1248
    }
1249

1250
    finalizeResultRows(pExtW->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
×
1251
    resetResultRow(pMlExtInfo->pResultRow, pExtW->aggSup.resultRowSize - sizeof(SResultRow));
×
1252

1253
    pWin = getExtNextWindow(pExtW, pTaskInfo);
×
1254
    if (!pWin) break;
×
1255
    qDebug("ext window align2 start:%" PRId64 ", end:%" PRId64, pWin->skey, pWin->ekey);
×
1256
    incExtWinCurIdx(pOperator);
×
1257
    startPos = currPos;
×
1258
    code = setSingleOutputTupleBuf(pResultRowInfo, pWin, &pMlExtInfo->pResultRow, pSup, &pExtW->aggSup);
×
1259
    if (code != 0 || pMlExtInfo->pResultRow == NULL) {
×
1260
      T_LONG_JMP(pTaskInfo->env, code);
×
1261
    }
1262
    pMlExtInfo->curTs = pWin->skey;
×
1263
  }
1264

1265
  code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pExtW->twAggSup.timeWindowData, startPos,
×
1266
                                         currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
×
1267
  if (code != 0) {
×
1268
    T_LONG_JMP(pTaskInfo->env, code);
×
1269
  }
1270
  return code;
×
1271
}
1272

1273
static int32_t doMergeAlignExtWindowProject(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
×
1274
                                            SSDataBlock* pResultBlock) {
1275
  SExternalWindowOperator* pExtW = pOperator->info;
×
1276
  SExprSupp*               pExprSup = &pExtW->scalarSupp;
×
1277
  int32_t code = projectApplyFunctions(pExprSup->pExprInfo, pResultBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
×
1278
                        GET_STM_RTINFO(pOperator->pTaskInfo));
×
1279
  return code;
×
1280
}
1281

1282
void doMergeAlignExternalWindow(SOperatorInfo* pOperator) {
×
1283
  SExecTaskInfo*                       pTaskInfo = pOperator->pTaskInfo;
×
1284
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
×
1285
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
×
1286
  SResultRow*                          pResultRow = NULL;
×
1287
  int32_t                              code = 0;
×
1288
  SSDataBlock*                         pRes = pExtW->binfo.pRes;
×
1289
  SExprSupp*                           pSup = &pOperator->exprSupp;
×
1290
  int32_t                              lino = 0;
×
1291

1292
  while (1) {
×
1293
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
×
1294

1295
    if (pBlock == NULL) {
×
1296
      // close last time window
1297
      if (pMlExtInfo->curTs != INT64_MIN && EEXT_MODE_AGG == pExtW->mode) {
×
1298
        finalizeResultRows(pMlExtInfo->pExtW->aggSup.pResultBuf, &pExtW->binfo.resultRowInfo.cur, pSup, pRes, pTaskInfo);
×
1299
        resetResultRow(pMlExtInfo->pResultRow,pExtW->aggSup.resultRowSize - sizeof(SResultRow));
×
1300
      }
1301
      setOperatorCompleted(pOperator);
×
1302
      break;
×
1303
    }
1304

1305
    pRes->info.scanFlag = pBlock->info.scanFlag;
×
1306
    code = setInputDataBlock(pSup, pBlock, pExtW->binfo.inputTsOrder, pBlock->info.scanFlag, true);
×
1307
    QUERY_CHECK_CODE(code, lino, _end);
×
1308

1309
    printDataBlock(pBlock, __func__, "externalwindowAlign");
×
1310
    qDebug("ext windowpExtWAlign->scalarMode:%d", pExtW->mode);
×
1311

1312

1313
    if (EEXT_MODE_SCALAR == pExtW->mode) {
×
1314
      code = doMergeAlignExtWindowProject(pOperator, &pExtW->binfo.resultRowInfo, pBlock, pRes);
×
1315
    } else {
1316
      code = doMergeAlignExtWindowAgg(pOperator, &pExtW->binfo.resultRowInfo, pBlock, pRes);
×
1317
    }
1318

1319
    // int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock);
1320
    // if (status == EXTERNAL_RETRIEVE_CONTINUE) {
1321
    //   continue;
1322
    // } else if (status == EXTERNAL_RETRIEVE_NEXT_WINDOW) {
1323
    //   pExtW->pOutputBlockListNode = NULL;
1324
    //   pExtW->outputWinId++;
1325
    //   incExtWinOutIdx(pOperator);
1326
    //   continue;
1327
    // } else if (status == EXTERNAL_RETRIEVE_DONE) {
1328
    //   // do nothing, just break
1329
    // }
1330
  }
1331
_end:
×
1332
  if (code != 0) {
×
1333
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
1334
    pTaskInfo->code = code;
×
1335
    T_LONG_JMP(pTaskInfo->env, code);
×
1336
  }
1337
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc