• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4665

12 Aug 2025 07:34AM UTC coverage: 59.901% (-0.6%) from 60.536%
#4665

push

travis-ci

web-flow
Merge pull request #32547 from taosdata/refactor/wangxu/get-started-installer

refactor: get started for installer and docker

137370 of 291999 branches covered (47.04%)

Branch coverage included in aggregate %.

207872 of 284354 relevant lines covered (73.1%)

4846328.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

35.28
/source/libs/executor/src/externalwindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "operator.h"
18
#include "querytask.h"
19
#include "tdatablock.h"
20
#include "stream.h"
21
#include "filter.h"
22

23
typedef struct SBlockList {
24
  const SSDataBlock* pSrcBlock;
25
  SList*             pBlocks;
26
  int32_t            blockRowNumThreshold;
27
} SBlockList;
28

29
typedef struct SExternalWindowOperator {
30
  SOptrBasicInfo     binfo;
31
  SAggSupporter      aggSup;
32
  SExprSupp          scalarSupp;
33
  STimeWindowAggSupp twAggSup;
34
  SGroupResInfo      groupResInfo;
35
  int32_t            primaryTsIndex;
36
  EExtWinMode        mode;
37
  SArray*            pWins;
38
  SArray*            pOutputBlocks;  // for each window, we have a list of blocks
39
  // SArray*            pOffsetList;  // for each window
40
  int32_t            lastWinId;
41
  int32_t            outputWinId;
42
  SListNode*         pOutputBlockListNode;  // block index in block array used for output
43
  SSDataBlock*       pTmpBlock;
44
  SSDataBlock*       pEmptyInputBlock;
45
  SArray*            pPseudoColInfo;  
46
  bool               hasCountFunc;
47
  // SLimitInfo         limitInfo;  // limit info for each window
48
} SExternalWindowOperator;
49

50
static int32_t blockListInit(SBlockList* pBlockList, int32_t threshold) {
379✔
51
  pBlockList->pBlocks = tdListNew(sizeof(SSDataBlock*));
379✔
52
  if (!pBlockList->pBlocks) {
381!
53
    return terrno;
×
54
  }
55
  return 0;
381✔
56
}
57

58
static int32_t blockListAddBlock(SBlockList* pBlockList) {
×
59
  SSDataBlock* pRes = NULL;
×
60
  int32_t      code = createOneDataBlock(pBlockList->pSrcBlock, false, &pRes);
×
61
  if (code != 0) {
×
62
    return code;
×
63
  }
64
  code = blockDataEnsureCapacity(pRes, pBlockList->blockRowNumThreshold);
×
65
  if (code != 0) {
×
66
    blockDataDestroy(pRes);
×
67
    return code;
×
68
  }
69
  code = tdListAppend(pBlockList->pBlocks, &pRes);
×
70
  if (code != 0) {
×
71
    blockDataDestroy(pRes);
×
72
    return code;
×
73
  }
74
  return 0;
×
75
}
76

77
static int32_t blockListGetLastBlock(SBlockList* pBlockList, SSDataBlock** ppBlock) {
×
78
  SListNode* pNode = TD_DLIST_TAIL(pBlockList->pBlocks);
×
79
  int32_t    code = 0;
×
80
  *ppBlock = NULL;
×
81
  code = blockListAddBlock(pBlockList);
×
82
  if (0 == code) {
×
83
    pNode = TD_DLIST_TAIL(pBlockList->pBlocks);
×
84
    *ppBlock = *(SSDataBlock**)pNode->data;
×
85
  }
86
  return code;
×
87
}
88

89
static void blockListDestroy(void* p) {
381✔
90
  SBlockList* pBlockList = (SBlockList*)p;
381✔
91
  if (TD_DLIST_NELES(pBlockList->pBlocks) > 0) {
381!
92
    SListNode* pNode = TD_DLIST_HEAD(pBlockList->pBlocks);
×
93
    while (pNode) {
×
94
      SSDataBlock* pBlock = *(SSDataBlock**)pNode->data;
×
95
      blockDataDestroy(pBlock);
×
96
      pNode = pNode->dl_next_;
×
97
    }
98
  }
99
  taosMemoryFree(pBlockList->pBlocks);
381!
100
}
381✔
101

102
void destroyExternalWindowOperatorInfo(void* param) {
117✔
103
  if (NULL == param) {
117!
104
    return;
×
105
  }
106
  SExternalWindowOperator* pInfo = (SExternalWindowOperator*)param;
117✔
107
  cleanupBasicInfo(&pInfo->binfo);
117✔
108

109
  taosArrayDestroyEx(pInfo->pOutputBlocks, blockListDestroy);
117✔
110
  taosArrayDestroy(pInfo->pWins);
117✔
111
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
117✔
112
  cleanupGroupResInfo(&pInfo->groupResInfo);
117✔
113
 
114
  taosArrayDestroy(pInfo->pPseudoColInfo);
117✔
115
  blockDataDestroy(pInfo->pTmpBlock);
117✔
116
  blockDataDestroy(pInfo->pEmptyInputBlock);
117✔
117

118
  cleanupAggSup(&pInfo->aggSup);
117✔
119
  cleanupExprSupp(&pInfo->scalarSupp);
117✔
120

121
  pInfo->binfo.resultRowInfo.openWindow = tdListFree(pInfo->binfo.resultRowInfo.openWindow);
117✔
122

123
  taosMemoryFreeClear(pInfo);
117!
124
}
125

126
static int32_t doOpenExternalWindow(SOperatorInfo* pOperator);
127
static int32_t externalWindowNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
128

129
typedef struct SMergeAlignedExternalWindowOperator {
130
  SExternalWindowOperator* pExtW;
131
  int64_t curTs;
132
  SSDataBlock* pPrefetchedBlock;
133
  SResultRow*  pResultRow;
134
} SMergeAlignedExternalWindowOperator;
135

136
void destroyMergeAlignedExternalWindowOperator(void* pOperator) {
×
137
  SMergeAlignedExternalWindowOperator* pMlExtInfo = (SMergeAlignedExternalWindowOperator*)pOperator;
×
138
  destroyExternalWindowOperatorInfo(pMlExtInfo->pExtW);
×
139
}
×
140

141
void doMergeAlignExternalWindow(SOperatorInfo* pOperator);
142

143
static int32_t mergeAlignedExternalWindowNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
144
  SExecTaskInfo*                       pTaskInfo = pOperator->pTaskInfo;
×
145
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
×
146
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
×
147
  int32_t                              code = 0;
×
148
  int32_t lino = 0;
×
149

150
  if (pOperator->status == OP_EXEC_DONE) {
×
151
    (*ppRes) = NULL;
×
152
    return TSDB_CODE_SUCCESS;
×
153
  }
154

155
  SSDataBlock* pRes = pExtW->binfo.pRes;
×
156
  blockDataCleanup(pRes);
×
157

158
  if (!pExtW->pWins) {
×
159
    size_t size = taosArrayGetSize(pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPesudoFuncVals);
×
160
    pExtW->pWins = taosArrayInit(size, sizeof(STimeWindow));
×
161
    if (!pExtW->pWins) QUERY_CHECK_CODE(terrno, lino, _end);
×
162
    // pExtW->pOffsetList = taosArrayInit(size, sizeof(SLimitInfo));
163
    // if (!pExtW->pOffsetList) QUERY_CHECK_CODE(terrno, lino, _end);
164

165
    for (int32_t i = 0; i < size; ++i) {
×
166
      SSTriggerCalcParam* pParam = taosArrayGet(pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPesudoFuncVals, i);
×
167
      STimeWindow win = {.skey = pParam->wstart, .ekey = pParam->wend};
×
168
      if (pTaskInfo->pStreamRuntimeInfo->funcInfo.triggerType != 1){  // 1 meams STREAM_TRIGGER_SLIDING
×
169
        win.ekey++;
×
170
      }
171
      TSDB_CHECK_NULL(taosArrayPush(pExtW->pWins, &win), code, lino, _end, terrno);
×
172
    }
173
    pExtW->outputWinId = pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx;
×
174
  }
175

176
  doMergeAlignExternalWindow(pOperator);
×
177
  size_t rows = pRes->info.rows;
×
178
  pOperator->resultInfo.totalRows += rows;
×
179
  (*ppRes) = (rows == 0) ? NULL : pRes;
×
180

181
_end:
×
182
  if (code != 0) {
×
183
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
184
    pTaskInfo->code = code;
×
185
    T_LONG_JMP(pTaskInfo->env, code);
×
186
  }
187
  return code;
×
188
}
189

190
int32_t resetMergeAlignedExternalWindowOperator(SOperatorInfo* pOperator) {
×
191
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
×
192
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
×
193
  SExecTaskInfo*                       pTaskInfo = pOperator->pTaskInfo;
×
194
  SMergeAlignedIntervalPhysiNode * pPhynode = (SMergeAlignedIntervalPhysiNode*)pOperator->pPhyNode;
×
195
  pOperator->status = OP_NOT_OPENED;
×
196

197
  taosArrayDestroy(pExtW->pWins);
×
198
  pExtW->pWins = NULL;
×
199

200
  resetBasicOperatorState(&pExtW->binfo);
×
201
  pMlExtInfo->pResultRow = NULL;
×
202
  pMlExtInfo->curTs = INT64_MIN;
×
203
  if (pMlExtInfo->pPrefetchedBlock) blockDataCleanup(pMlExtInfo->pPrefetchedBlock);
×
204

205
  int32_t code = resetAggSup(&pOperator->exprSupp, &pExtW->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
×
206
                             sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
×
207
                             &pTaskInfo->storageAPI.functionStore);
208
  if (code == 0) {
×
209
    colDataDestroy(&pExtW->twAggSup.timeWindowData);
×
210
    code = initExecTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pTaskInfo->window);
×
211
  }
212
  return code;
×
213
}
214

215
int32_t createMergeAlignedExternalWindowOperator(SOperatorInfo* pDownstream, SPhysiNode* pNode,
×
216
                                                 SExecTaskInfo* pTaskInfo, SOperatorInfo** ppOptrOut) {
217
  SMergeAlignedIntervalPhysiNode* pPhynode = (SMergeAlignedIntervalPhysiNode*)pNode;
×
218
  int32_t code = 0;
×
219
  int32_t lino = 0;
×
220
  SMergeAlignedExternalWindowOperator* pMlExtInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedExternalWindowOperator));
×
221
  SOperatorInfo*                       pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
222

223
  if (pTaskInfo->pStreamRuntimeInfo != NULL){
×
224
    pTaskInfo->pStreamRuntimeInfo->funcInfo.withExternalWindow = true;
×
225
  }
226
  pOperator->pPhyNode = pNode;
×
227
  if (!pMlExtInfo || !pOperator) {
×
228
    code = terrno;
×
229
    goto _error;
×
230
  }
231

232
  pMlExtInfo->pExtW = taosMemoryCalloc(1, sizeof(SExternalWindowOperator));
×
233
  if (!pMlExtInfo->pExtW) {
×
234
    code = terrno;
×
235
    goto _error;
×
236
  }
237

238
  SExternalWindowOperator* pExtW = pMlExtInfo->pExtW;
×
239
  SExprSupp* pSup = &pOperator->exprSupp;
×
240
  pSup->hasWindowOrGroup = true;
×
241
  pMlExtInfo->curTs = INT64_MIN;
×
242

243
  pExtW->primaryTsIndex = ((SColumnNode*)pPhynode->window.pTspk)->slotId;
×
244
  pExtW->mode = pPhynode->window.pProjs ? EEXT_MODE_SCALAR : EEXT_MODE_AGG;
×
245
  pExtW->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC;
×
246
  pExtW->binfo.outputTsOrder = pExtW->binfo.inputTsOrder;
×
247

248
  // pExtW->limitInfo = (SLimitInfo){0};
249
  // initLimitInfo(pPhynode->window.node.pLimit, pPhynode->window.node.pSlimit, &pExtW->limitInfo);
250

251
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
×
252
  initResultSizeInfo(&pOperator->resultInfo, 512);
×
253

254
  int32_t num = 0;
×
255
  SExprInfo* pExprInfo = NULL;
×
256
  code = createExprInfo(pPhynode->window.pFuncs, NULL, &pExprInfo, &num);
×
257
  QUERY_CHECK_CODE(code, lino, _error);
×
258

259
  if (pExtW->mode == EEXT_MODE_AGG) {
×
260
    code = initAggSup(pSup, &pExtW->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
×
261
                      &pTaskInfo->storageAPI.functionStore);
262
    QUERY_CHECK_CODE(code, lino, _error);
×
263
  }
264

265
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhynode->window.node.pOutputDataBlockDesc);
×
266
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
×
267
  initBasicInfo(&pExtW->binfo, pResBlock);
×
268

269
  initResultRowInfo(&pExtW->binfo.resultRowInfo);
×
270
  code = blockDataEnsureCapacity(pExtW->binfo.pRes, pOperator->resultInfo.capacity);
×
271
  QUERY_CHECK_CODE(code, lino, _error);
×
272
  setOperatorInfo(pOperator, "MergeAlignedExternalWindowOperator", QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW, false, OP_NOT_OPENED, pMlExtInfo, pTaskInfo);
×
273
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignedExternalWindowNext, NULL,
×
274
                                         destroyMergeAlignedExternalWindowOperator, optrDefaultBufFn, NULL,
275
                                         optrDefaultGetNextExtFn, NULL);
276
  setOperatorResetStateFn(pOperator, resetMergeAlignedExternalWindowOperator);
×
277

278
  code = appendDownstream(pOperator, &pDownstream, 1);
×
279
  QUERY_CHECK_CODE(code, lino, _error);
×
280
  *ppOptrOut = pOperator;
×
281
  return code;
×
282
_error:
×
283
  if (pMlExtInfo) destroyMergeAlignedExternalWindowOperator(pMlExtInfo);
×
284
  destroyOperatorAndDownstreams(pOperator, &pDownstream, 1);
×
285
  pTaskInfo->code = code;
×
286
  return code;
×
287
}
288

289
static int32_t resetExternalWindowOperator(SOperatorInfo* pOperator) {
83✔
290
  SExternalWindowOperator* pExtW = pOperator->info;
83✔
291
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
83✔
292
  SExternalWindowPhysiNode* pPhynode = (SExternalWindowPhysiNode*)pOperator->pPhyNode;
83✔
293
  pOperator->status = OP_NOT_OPENED;
83✔
294

295
  resetBasicOperatorState(&pExtW->binfo);
83✔
296
  pExtW->outputWinId = 0;
83✔
297
  pExtW->lastWinId = -1;
83✔
298
  taosArrayDestroyEx(pExtW->pOutputBlocks, blockListDestroy);
83✔
299
  // taosArrayDestroy(pExtW->pOffsetList);
300
  taosArrayDestroy(pExtW->pWins);
83✔
301
  pExtW->pWins = NULL;
83✔
302
  pExtW->pOutputBlockListNode = NULL;
83✔
303
  pExtW->pOutputBlocks = NULL;
83✔
304
  // pExtW->pOffsetList = NULL;
305
  initResultSizeInfo(&pOperator->resultInfo, 512);
83✔
306
  int32_t code = blockDataEnsureCapacity(pExtW->binfo.pRes, pOperator->resultInfo.capacity);
83✔
307
  if (code == 0) {
83!
308
    code = resetAggSup(&pOperator->exprSupp, &pExtW->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
83✔
309
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
83✔
310
                       &pTaskInfo->storageAPI.functionStore);
311
  }
312
  if (code == 0) {
83!
313
    code = resetExprSupp(&pExtW->scalarSupp, pTaskInfo, pPhynode->window.pProjs, NULL,
83✔
314
                         &pTaskInfo->storageAPI.functionStore);
315
  }
316
  if (code == 0) {
83!
317
    colDataDestroy(&pExtW->twAggSup.timeWindowData);
83✔
318
    code = initExecTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pTaskInfo->window);
83✔
319
  }
320
  if (code == 0) {
83!
321
    cleanupGroupResInfo(&pExtW->groupResInfo);
83✔
322
  }
323
  blockDataDestroy(pExtW->pTmpBlock);
83✔
324
  pExtW->pTmpBlock = NULL;
83✔
325
  return code;
83✔
326
}
327

328
static EDealRes extWindowHasCountLikeFunc(SNode* pNode, void* res) {
801✔
329
  if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
801✔
330
    SFunctionNode* pFunc = (SFunctionNode*)pNode;
287✔
331
    if (fmIsCountLikeFunc(pFunc->funcId) || (pFunc->hasOriginalFunc && fmIsCountLikeFunc(pFunc->originalFuncId))) {
287!
332
      *(bool*)res = true;
116✔
333
      return DEAL_RES_END;
116✔
334
    }
335
  }
336
  return DEAL_RES_CONTINUE;
685✔
337
}
338

339

340
static int32_t extWindowCreateEmptyInputBlock(SOperatorInfo* pOperator, SSDataBlock** ppBlock) {
116✔
341
  int32_t code = TSDB_CODE_SUCCESS;
116✔
342
  int32_t lino = 0;
116✔
343
  SSDataBlock* pBlock = NULL;
116✔
344
  if (!tsCountAlwaysReturnValue) {
116!
345
    return TSDB_CODE_SUCCESS;
×
346
  }
347

348
  SExternalWindowOperator* pExtW = pOperator->info;
116✔
349

350
  if (!pExtW->hasCountFunc) {
116!
351
    return TSDB_CODE_SUCCESS;
×
352
  }
353

354
  code = createDataBlock(&pBlock);
116✔
355
  if (code) {
116!
356
    return code;
×
357
  }
358

359
  pBlock->info.rows = 1;
116✔
360
  pBlock->info.capacity = 0;
116✔
361

362
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
553✔
363
    SColumnInfoData colInfo = {0};
437✔
364
    colInfo.hasNull = true;
437✔
365
    colInfo.info.type = TSDB_DATA_TYPE_NULL;
437✔
366
    colInfo.info.bytes = 1;
437✔
367

368
    SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
437✔
369
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
939✔
370
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
502✔
371
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
502✔
372
        int32_t slotId = pFuncParam->pCol->slotId;
359✔
373
        int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
359✔
374
        if (slotId >= numOfCols) {
359✔
375
          code = taosArrayEnsureCap(pBlock->pDataBlock, slotId + 1);
212✔
376
          QUERY_CHECK_CODE(code, lino, _end);
212!
377

378
          for (int32_t k = numOfCols; k < slotId + 1; ++k) {
424✔
379
            void* tmp = taosArrayPush(pBlock->pDataBlock, &colInfo);
212✔
380
            QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
212!
381
          }
382
        }
383
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
143✔
384
        // do nothing
385
      }
386
    }
387
  }
388

389
  code = blockDataEnsureCapacity(pBlock, pBlock->info.rows);
116✔
390
  QUERY_CHECK_CODE(code, lino, _end);
116!
391

392
  for (int32_t i = 0; i < blockDataGetNumOfCols(pBlock); ++i) {
328✔
393
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
212✔
394
    QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
212!
395
    colDataSetNULL(pColInfoData, 0);
396
  }
397
  *ppBlock = pBlock;
116✔
398

399
_end:
116✔
400
  if (code != TSDB_CODE_SUCCESS) {
116!
401
    blockDataDestroy(pBlock);
×
402
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
403
  }
404
  return code;
116✔
405
}
406

407
int32_t createExternalWindowOperator(SOperatorInfo* pDownstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo,
117✔
408
                                     SOperatorInfo** pOptrOut) {
409
  SExternalWindowPhysiNode* pPhynode = (SExternalWindowPhysiNode*)pNode;
117✔
410
  QRY_PARAM_CHECK(pOptrOut);
117!
411
  int32_t                  code = 0;
117✔
412
  int32_t                  lino = 0;
117✔
413
  SExternalWindowOperator* pExtW = taosMemoryCalloc(1, sizeof(SExternalWindowOperator));
117!
414
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
117!
415
  pOperator->pPhyNode = pNode;
117✔
416
  if (!pExtW || !pOperator) {
117!
417
    code = terrno;
×
418
    lino = __LINE__;
×
419
    goto _error;
×
420
  }
421
  
422
  setOperatorInfo(pOperator, "ExternalWindowOperator", QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW, true, OP_NOT_OPENED,
117✔
423
                  pExtW, pTaskInfo);
424
                  
425
  if (pTaskInfo->pStreamRuntimeInfo != NULL){
117!
426
    pTaskInfo->pStreamRuntimeInfo->funcInfo.withExternalWindow = true;
117✔
427
  }
428
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhynode->window.node.pOutputDataBlockDesc);
117✔
429
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
117!
430
  initBasicInfo(&pExtW->binfo, pResBlock);
117✔
431

432
  pExtW->primaryTsIndex = ((SColumnNode*)pPhynode->window.pTspk)->slotId;
117✔
433
  pExtW->mode = pPhynode->window.pProjs ? EEXT_MODE_SCALAR : (pPhynode->window.indefRowsFunc ? EEXT_MODE_INDEFR_FUNC : EEXT_MODE_AGG);
117!
434
  pExtW->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC;
117✔
435
  pExtW->binfo.outputTsOrder = pExtW->binfo.inputTsOrder;
117✔
436

437
  // pExtW->limitInfo = (SLimitInfo){0};
438
  // initLimitInfo(pPhynode->window.node.pLimit, pPhynode->window.node.pSlimit, &pExtW->limitInfo);
439

440
  if (pPhynode->window.pProjs) {
117!
441
    int32_t    numOfScalarExpr = 0;
×
442
    SExprInfo* pScalarExprInfo = NULL;
×
443
    code = createExprInfo(pPhynode->window.pProjs, NULL, &pScalarExprInfo, &numOfScalarExpr);
×
444
    QUERY_CHECK_CODE(code, lino, _error);
×
445

446
    code = initExprSupp(&pExtW->scalarSupp, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
×
447
    QUERY_CHECK_CODE(code, lino, _error);
×
448
  } else if (pExtW->mode == EEXT_MODE_AGG) {
117!
449
    if (pPhynode->window.pExprs != NULL) {
117!
450
      int32_t    num = 0;
×
451
      SExprInfo* pSExpr = NULL;
×
452
      code = createExprInfo(pPhynode->window.pExprs, NULL, &pSExpr, &num);
×
453
      QUERY_CHECK_CODE(code, lino, _error);
×
454
    
455
      code = initExprSupp(&pExtW->scalarSupp, pSExpr, num, &pTaskInfo->storageAPI.functionStore);
×
456
      if (code != TSDB_CODE_SUCCESS) {
×
457
        goto _error;
×
458
      }
459
    }
460
    
461
    size_t keyBufSize = sizeof(int64_t) * 2 + POINTER_BYTES;
117✔
462
    initResultSizeInfo(&pOperator->resultInfo, 512);
117✔
463
    code = blockDataEnsureCapacity(pExtW->binfo.pRes, pOperator->resultInfo.capacity);
117✔
464
    QUERY_CHECK_CODE(code, lino, _error);
117!
465

466
    int32_t num = 0;
117✔
467
    SExprInfo* pExprInfo = NULL;
117✔
468
    code = createExprInfo(pPhynode->window.pFuncs, NULL, &pExprInfo, &num);
117✔
469
    QUERY_CHECK_CODE(code, lino, _error);
117!
470
    code = initAggSup(&pOperator->exprSupp, &pExtW->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, 0, 0);
117✔
471
    QUERY_CHECK_CODE(code, lino, _error);
117!
472

473
    nodesWalkExprs(pPhynode->window.pFuncs, extWindowHasCountLikeFunc, &pExtW->hasCountFunc);
117✔
474
    if (pExtW->hasCountFunc) {
117✔
475
      code = extWindowCreateEmptyInputBlock(pOperator, &pExtW->pEmptyInputBlock);
116✔
476
      QUERY_CHECK_CODE(code, lino, _error);
116!
477
      qDebug("%s ext window has CountLikeFunc", pOperator->pTaskInfo->id.str);
116✔
478
    } else {
479
      qDebug("%s ext window doesn't have CountLikeFunc", pOperator->pTaskInfo->id.str);
1!
480
    }
481

482
    code = initExecTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pTaskInfo->window);
117✔
483
    QUERY_CHECK_CODE(code, lino, _error);
117!
484
  } else {
485
    size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
×
486
    
487
    if (pPhynode->window.pExprs != NULL) {
×
488
      int32_t    num = 0;
×
489
      SExprInfo* pSExpr = NULL;
×
490
      code = createExprInfo(pPhynode->window.pExprs, NULL, &pSExpr, &num);
×
491
      QUERY_CHECK_CODE(code, lino, _error);
×
492
    
493
      code = initExprSupp(&pExtW->scalarSupp, pSExpr, num, &pTaskInfo->storageAPI.functionStore);
×
494
      if (code != TSDB_CODE_SUCCESS) {
×
495
        goto _error;
×
496
      }
497
    }
498
    
499
    initResultSizeInfo(&pOperator->resultInfo, 512);
×
500
    code = blockDataEnsureCapacity(pResBlock, 512);
×
501
    TSDB_CHECK_CODE(code, lino, _error);
×
502
    
503
    int32_t    numOfExpr = 0;
×
504
    SExprInfo* pExprInfo = NULL;
×
505
    code = createExprInfo(pPhynode->window.pFuncs, NULL, &pExprInfo, &numOfExpr);
×
506
    TSDB_CHECK_CODE(code, lino, _error);
×
507
    
508
    code = initAggSup(&pOperator->exprSupp, &pExtW->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str,
×
509
                              pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
×
510
    TSDB_CHECK_CODE(code, lino, _error);
×
511
    pOperator->exprSupp.hasWindowOrGroup = false;
×
512
    
513
    code = setFunctionResultOutput(pOperator, &pExtW->binfo, &pExtW->aggSup, MAIN_SCAN, numOfExpr);
×
514
    TSDB_CHECK_CODE(code, lino, _error);
×
515
    
516
    code = filterInitFromNode((SNode*)pNode->pConditions, &pOperator->exprSupp.pFilterInfo, 0,
×
517
                              pTaskInfo->pStreamRuntimeInfo);
×
518
    TSDB_CHECK_CODE(code, lino, _error);
×
519
    
520
    pExtW->binfo.pRes = pResBlock;
×
521
    pExtW->binfo.inputTsOrder = pNode->inputTsOrder;
×
522
    pExtW->binfo.outputTsOrder = pNode->outputTsOrder;
×
523
    code = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfExpr, &pExtW->pPseudoColInfo);
×
524
    TSDB_CHECK_CODE(code, lino, _error);
×
525
  }
526

527
  initResultRowInfo(&pExtW->binfo.resultRowInfo);
117✔
528

529
  pOperator->fpSet = createOperatorFpSet(doOpenExternalWindow, externalWindowNext, NULL, destroyExternalWindowOperatorInfo,
117✔
530
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
531
  setOperatorResetStateFn(pOperator, resetExternalWindowOperator);
117✔
532
  code = appendDownstream(pOperator, &pDownstream, 1);
117✔
533
  if (code != 0) {
117!
534
    goto _error;
×
535
  }
536

537
  *pOptrOut = pOperator;
117✔
538
  return code;
117✔
539

540
_error:
×
541

542
  if (pExtW != NULL) {
×
543
    destroyExternalWindowOperatorInfo(pExtW);
×
544
  }
545

546
  destroyOperatorAndDownstreams(pOperator, &pDownstream, 1);
×
547
  pTaskInfo->code = code;
×
548
  qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code));
×
549
  return code;
×
550
}
551

552
int64_t* extractTsCol(SSDataBlock* pBlock, int32_t primaryTsIndex, SExecTaskInfo* pTaskInfo) {
313✔
553
  TSKEY* tsCols = NULL;
313✔
554

555
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
313!
556
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, primaryTsIndex);
313✔
557
    if (!pColDataInfo) {
313!
558
      pTaskInfo->code = terrno;
×
559
      T_LONG_JMP(pTaskInfo->env, terrno);
×
560
    }
561

562
    tsCols = (int64_t*)pColDataInfo->pData;
313✔
563
    if (tsCols[0] == 0) {
313!
564
      qWarn("%s at line %d.block start ts:%" PRId64 ",end ts:%" PRId64, __func__, __LINE__, tsCols[0],
×
565
            tsCols[pBlock->info.rows - 1]);
566
    }
567

568
    if (tsCols[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
313!
569
      int32_t code = blockDataUpdateTsWindow(pBlock, primaryTsIndex);
313✔
570
      if (code != TSDB_CODE_SUCCESS) {
313!
571
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
572
        pTaskInfo->code = code;
×
573
        T_LONG_JMP(pTaskInfo->env, code);
×
574
      }
575
    }
576
  }
577

578
  return tsCols;
313✔
579
}
580

581
static int32_t getExtWinCurIdx(SExecTaskInfo* pTaskInfo) {
1,657✔
582
  return pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx;
1,657✔
583
}
584

585
static void incExtWinCurIdx(SOperatorInfo* pOperator) {
214✔
586
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
214✔
587
  pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx++;
214✔
588
}
214✔
589

590
static void setExtWinCurIdx(SOperatorInfo* pOperator, int32_t idx) {
871✔
591
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
871✔
592
  pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx = idx;
871✔
593
}
871✔
594

595

596
static void incExtWinOutIdx(SOperatorInfo* pOperator) {
×
597
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
×
598
  pTaskInfo->pStreamRuntimeInfo->funcInfo.curOutIdx++;
×
599
}
×
600

601
static const STimeWindow* getExtWindow(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW, TSKEY ts) {
313✔
602
  // TODO handle desc order
603
  for (int32_t i = 0; i < pExtW->pWins->size; ++i) {
820!
604
    const STimeWindow* pWin = taosArrayGet(pExtW->pWins, i);
820✔
605
    if (ts >= pWin->skey && ts < pWin->ekey) {
820!
606
      setExtWinCurIdx(pOperator, i);
313✔
607
      return pWin;
313✔
608
    }
609
  }
610
  return NULL;
×
611
}
612

613
static const STimeWindow* getExtNextWindow(SExternalWindowOperator* pExtW, SExecTaskInfo* pTaskInfo) {
527✔
614
  int32_t curIdx = getExtWinCurIdx(pTaskInfo);
527✔
615
  if (curIdx + 1 >= pExtW->pWins->size) return NULL;
527✔
616
  return taosArrayGet(pExtW->pWins, curIdx + 1);
343✔
617
}
618

619
static int32_t getNextStartPos(STimeWindow win, const SDataBlockInfo* pBlockInfo, int32_t lastEndPos, int32_t order, int32_t* nextPos, int64_t* tsCol) {
343✔
620
  bool ascQuery = order == TSDB_ORDER_ASC;
343✔
621

622
  if (win.ekey <= pBlockInfo->window.skey && ascQuery) {
343!
623
    return -2;
×
624
  }
625
//if (win.skey > pBlockInfo->window.ekey && !ascQuery) return -2;
626

627
  if (win.skey > pBlockInfo->window.ekey && ascQuery) return -1;
343!
628
//if (win.ekey < pBlockInfo->window.skey && !ascQuery) return -1;
629

630
  while (true) {
631
    if (win.ekey <= tsCol[lastEndPos + 1] && ascQuery) return -2;
214!
632
    if (win.skey <= tsCol[lastEndPos + 1] && ascQuery) break;
195!
633
    lastEndPos++;
×
634
  }
635

636
  *nextPos = lastEndPos + 1;
195✔
637
  return 0;
195✔
638
}
639

640
static int32_t setExtWindowOutputBuf(SResultRowInfo* pResultRowInfo, const STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
558✔
641
                                     int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
642
                                     SExecTaskInfo* pTaskInfo) {
643
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
558✔
644
                                                  true, tableGroupId, pTaskInfo, true, pAggSup, true);
645

646
  if (pResultRow == NULL || pTaskInfo->code != 0) {
558!
647
    *pResult = NULL;
×
648
    qError("failed to set result output buffer, error:%s", tstrerror(pTaskInfo->code));
×
649
    return pTaskInfo->code;
×
650
  }
651

652
  qDebug("current result rows num:%d", tSimpleHashGetSize(pAggSup->pResultRowHashTable));
558✔
653

654
  // set time window for current result
655
  TAOS_SET_POBJ_ALIGNED(&pResultRow->win, win);
558✔
656
  *pResult = pResultRow;
558✔
657
  pTaskInfo->code = setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
558✔
658
  if (pTaskInfo->code) {
558!
659
    *pResult = NULL;
×
660
    qError("failed to set result row ctx, error:%s", tstrerror(pTaskInfo->code));
×
661
    return pTaskInfo->code;
×
662
  }
663

664
  return TSDB_CODE_SUCCESS;
558✔
665
}
666

667
static int32_t extWindowDoHashAgg(SOperatorInfo* pOperator, int32_t startPos, int32_t forwardRows,
558✔
668
                                  SSDataBlock* pInputBlock) {
669
  if (forwardRows == 0) return 0;
558!
670
  SExprSupp*               pSup = &pOperator->exprSupp;
558✔
671
  SExternalWindowOperator* pExtW = pOperator->info;
558✔
672
  return applyAggFunctionOnPartialTuples(pOperator->pTaskInfo, pSup->pCtx, &pExtW->twAggSup.timeWindowData, startPos,
558✔
673
                                         forwardRows, pInputBlock->info.rows, pSup->numOfExprs);
558✔
674
}
675

676
static SSDataBlock* extWindowGetOutputBlock(SOperatorInfo* pOperator, int32_t winIdx) {
×
677
  SExternalWindowOperator* pExtW = pOperator->info;
×
678
  SSDataBlock*             pBlock = NULL;
×
679
  SBlockList*              pBlockList = taosArrayGet(pExtW->pOutputBlocks, winIdx);
×
680
  int32_t                  code = blockListGetLastBlock(pBlockList, &pBlock);
×
681
  if (code != 0) terrno = code;
×
682
  return pBlock;
×
683
}
684

685
static int32_t extWindowCopyRows(SOperatorInfo* pOperator, SSDataBlock* pInputBlock, int32_t startPos,
×
686
                                 int32_t forwardRows) {
687
  if (forwardRows == 0) return 0;
×
688
  SExternalWindowOperator* pExtW = pOperator->info;
×
689
  SSDataBlock*             pResBlock = extWindowGetOutputBlock(pOperator, getExtWinCurIdx(pOperator->pTaskInfo));
×
690
  SExprSupp*               pExprSup = &pExtW->scalarSupp;
×
691

692
  if (!pResBlock) {
×
693
    qError("%s failed to get output block for ext window:%s", GET_TASKID(pOperator->pTaskInfo), tstrerror(terrno));
×
694
    return terrno;
×
695
  }
696
  int32_t rowsToCopy = forwardRows;
×
697
  int32_t code = 0;
×
698
  if (!pExtW->pTmpBlock)
×
699
    code = createOneDataBlock(pInputBlock, false, &pExtW->pTmpBlock);
×
700
  else
701
    blockDataCleanup(pExtW->pTmpBlock);
×
702
  if (code) {
×
703
    qError("%s failed to create datablock:%s", GET_TASKID(pOperator->pTaskInfo), tstrerror(terrno));
×
704
    return code;
×
705
  }
706
  code = blockDataEnsureCapacity(pExtW->pTmpBlock, TMAX(1, rowsToCopy));
×
707
  if (code) {
×
708
    qError("%s failed to ensure capacity:%s", GET_TASKID(pOperator->pTaskInfo), tstrerror(terrno));
×
709
    return code;
×
710
  }
711
  if (rowsToCopy > 0) {
×
712
    code = blockDataMergeNRows(pExtW->pTmpBlock, pInputBlock, startPos, rowsToCopy);
×
713
    if (code == 0) {
×
714
      code = projectApplyFunctionsWithSelect(pExprSup->pExprInfo, pResBlock, pExtW->pTmpBlock, pExprSup->pCtx, pExprSup->numOfExprs,
×
715
          NULL, GET_STM_RTINFO(pOperator->pTaskInfo), true);
×
716
    }
717
  }
718

719
  if (code != 0) return code;
×
720

721
  return code;
×
722
}
723

724
static int32_t hashExternalWindowProject(SOperatorInfo* pOperator, SSDataBlock* pInputBlock) {
×
725
  SExprSupp*               pSup = &pOperator->exprSupp;
×
726
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
×
727
  SExternalWindowOperator* pExtW = pOperator->info;
×
728
  SqlFunctionCtx*          pCtx = NULL;
×
729
  int64_t*                 tsCol = extractTsCol(pInputBlock, pExtW->primaryTsIndex, pTaskInfo);
×
730
  TSKEY                    ts = getStartTsKey(&pInputBlock->info.window, tsCol);
×
731
  const STimeWindow*       pWin = getExtWindow(pOperator, pExtW, ts);
×
732
  bool                     ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
×
733
  int32_t                  tsOrder = pExtW->binfo.inputTsOrder;
×
734
  int32_t startPos = 0;
×
735

736
  if (!pWin) return 0;
×
737
  TSKEY   ekey = ascScan ? pWin->ekey : pWin->skey;
×
738
  int32_t forwardRows =
739
      getNumOfRowsInTimeWindow(&pInputBlock->info, tsCol, startPos, ekey-1, binarySearchForKey, NULL, tsOrder);
×
740

741
  int32_t code = extWindowCopyRows(pOperator, pInputBlock, startPos, forwardRows);
×
742

743
  while (code == 0 && pInputBlock->info.rows > startPos + forwardRows) {
×
744
    pWin = getExtNextWindow(pExtW, pTaskInfo);
×
745
    if (!pWin) break;
×
746
    incExtWinCurIdx(pOperator);
×
747

748
    startPos = startPos + forwardRows;
×
749
    ekey = ascScan ? pWin->ekey : pWin->skey;
×
750
    forwardRows =
751
        getNumOfRowsInTimeWindow(&pInputBlock->info, tsCol, startPos, ekey-1, binarySearchForKey, NULL, tsOrder);
×
752
    code = extWindowCopyRows(pOperator, pInputBlock, startPos, forwardRows);
×
753
  }
754
  return code;
×
755
}
756

757
static int32_t extWindowDoIndefRows(SOperatorInfo* pOperator, SSDataBlock* pRes, SSDataBlock* pBlock) {
×
758
  SExternalWindowOperator* pExtW = pOperator->info;
×
759
  SOptrBasicInfo*     pInfo = &pExtW->binfo;
×
760
  SExprSupp*          pSup = &pOperator->exprSupp;
×
761
  SExecTaskInfo*      pTaskInfo = pOperator->pTaskInfo;
×
762
  int32_t order = pInfo->inputTsOrder;
×
763
  int32_t scanFlag = pBlock->info.scanFlag;
×
764
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
×
765

766
  SExprSupp* pScalarSup = &pExtW->scalarSupp;
×
767
  if (pScalarSup->pExprInfo != NULL) {
×
768
    TAOS_CHECK_EXIT(projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
×
769
                                 pExtW->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo)));
770
  }
771

772
  TAOS_CHECK_EXIT(setInputDataBlock(pSup, pBlock, order, scanFlag, false));
×
773

774
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pRes, pRes->info.rows + pBlock->info.rows));
×
775

776
  TAOS_CHECK_EXIT(projectApplyFunctions(pSup->pExprInfo, pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
×
777
                               pExtW->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo)));
778

779
_exit:
×
780

781
  if (code) {
×
782
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __FUNCTION__, lino, tstrerror(code));
×
783
  }
784

785
  return code;
×
786
}
787

788

789
static int32_t extWindowHandleIndefRows(SOperatorInfo* pOperator, SSDataBlock* pInputBlock, int32_t startPos,
×
790
                                 int32_t forwardRows) {
791
  if (forwardRows == 0) return 0;
×
792
  SExternalWindowOperator* pExtW = pOperator->info;
×
793
  SSDataBlock*             pResBlock = extWindowGetOutputBlock(pOperator, getExtWinCurIdx(pOperator->pTaskInfo));
×
794

795
  if (!pResBlock) {
×
796
    qError("%s failed to get output block for ext window:%s", GET_TASKID(pOperator->pTaskInfo), tstrerror(terrno));
×
797
    return terrno;
×
798
  }
799
  
800
  int32_t rowsToCopy = forwardRows;
×
801
  int32_t code = 0;
×
802
  if (!pExtW->pTmpBlock)
×
803
    code = createOneDataBlock(pInputBlock, false, &pExtW->pTmpBlock);
×
804
  else
805
    blockDataCleanup(pExtW->pTmpBlock);
×
806
    
807
  if (code) {
×
808
    qError("%s failed to create datablock:%s", GET_TASKID(pOperator->pTaskInfo), tstrerror(terrno));
×
809
    return code;
×
810
  }
811
  code = blockDataEnsureCapacity(pExtW->pTmpBlock, TMAX(1, rowsToCopy));
×
812
  if (code) {
×
813
    qError("%s failed to ensure capacity:%s", GET_TASKID(pOperator->pTaskInfo), tstrerror(terrno));
×
814
    return code;
×
815
  }
816
  
817
  if (rowsToCopy > 0) {
×
818
    code = blockDataMergeNRows(pExtW->pTmpBlock, pInputBlock, startPos, rowsToCopy);
×
819
    if (code == 0) {
×
820
      code = extWindowDoIndefRows(pOperator, pResBlock, pExtW->pTmpBlock);
×
821
    }
822
  }
823

824
  return code;
×
825
}
826

827

828
static int32_t hashExternalWindowIndefRows(SOperatorInfo* pOperator, SSDataBlock* pInputBlock) {
×
829
  SExprSupp*               pSup = &pOperator->exprSupp;
×
830
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
×
831
  SExternalWindowOperator* pExtW = pOperator->info;
×
832
  SqlFunctionCtx*          pCtx = NULL;
×
833
  int64_t*                 tsCol = extractTsCol(pInputBlock, pExtW->primaryTsIndex, pTaskInfo);
×
834
  TSKEY                    ts = getStartTsKey(&pInputBlock->info.window, tsCol);
×
835
  const STimeWindow*       pWin = getExtWindow(pOperator, pExtW, ts);
×
836
  bool                     ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
×
837
  int32_t                  tsOrder = pExtW->binfo.inputTsOrder;
×
838
  int32_t                  startPos = 0;
×
839

840
  if (!pWin) return 0;
×
841
  TSKEY   ekey = ascScan ? pWin->ekey : pWin->skey;
×
842
  int32_t forwardRows =
843
      getNumOfRowsInTimeWindow(&pInputBlock->info, tsCol, startPos, ekey-1, binarySearchForKey, NULL, tsOrder);
×
844

845
  int32_t code = extWindowHandleIndefRows(pOperator, pInputBlock, startPos, forwardRows);
×
846

847
  while (code == 0 && pInputBlock->info.rows > startPos + forwardRows) {
×
848
    pWin = getExtNextWindow(pExtW, pTaskInfo);
×
849
    if (!pWin) break;
×
850
    incExtWinCurIdx(pOperator);
×
851

852
    startPos = startPos + forwardRows;
×
853
    ekey = ascScan ? pWin->ekey : pWin->skey;
×
854
    forwardRows =
855
        getNumOfRowsInTimeWindow(&pInputBlock->info, tsCol, startPos, ekey-1, binarySearchForKey, NULL, tsOrder);
×
856
    code = extWindowHandleIndefRows(pOperator, pInputBlock, startPos, forwardRows);
×
857
  }
858
  
859
  return code;
×
860
}
861

862

863
static int32_t extWindowProcessEmptyWins(SOperatorInfo* pOperator, SSDataBlock* pBlock, bool allRemains) {
622✔
864
  int32_t code = 0, lino = 0;
622✔
865
  SExternalWindowOperator* pExtW = (SExternalWindowOperator*)pOperator->info;
622✔
866
  bool ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
622✔
867
  int32_t currIdx = getExtWinCurIdx(pOperator->pTaskInfo);
622✔
868
  SExprSupp* pSup = &pOperator->exprSupp;
622✔
869

870
  if (NULL == pExtW->pEmptyInputBlock) {
622✔
871
    goto _exit;
3✔
872
  }
873

874
  int32_t endIdx = allRemains ? (pExtW->pWins->size - 1) : (currIdx - 1);
619✔
875
  SResultRowInfo* pResultRowInfo = &pExtW->binfo.resultRowInfo;
619✔
876
  SResultRow* pResult = NULL;
619✔
877
  SSDataBlock* pInput = pExtW->pEmptyInputBlock;
619✔
878

879
  if ((pExtW->lastWinId + 1) <= endIdx) {
619✔
880
    TAOS_CHECK_EXIT(setInputDataBlock(pSup, pExtW->pEmptyInputBlock, pExtW->binfo.inputTsOrder, MAIN_SCAN, true));
33!
881
  }
882
  
883
  for (int32_t i = pExtW->lastWinId + 1; i <= endIdx; ++i) {
669✔
884
    const STimeWindow* pWin = taosArrayGet(pExtW->pWins, i);
50✔
885

886
    setExtWinCurIdx(pOperator, i);
50✔
887
    qDebug("%s %dth ext empty window start:%" PRId64 ", end:%" PRId64 ", ascScan:%d",
50✔
888
           GET_TASKID(pOperator->pTaskInfo), i, pWin->skey, pWin->ekey, ascScan);
889

890
    TAOS_CHECK_EXIT(setExtWindowOutputBuf(pResultRowInfo, pWin, &pResult, pInput->info.id.groupId, pSup->pCtx, pSup->numOfExprs,
50!
891
                                pSup->rowEntryInfoOffset, &pExtW->aggSup, pOperator->pTaskInfo));
892

893
    updateTimeWindowInfo(&pExtW->twAggSup.timeWindowData, pWin, 1);
50✔
894
    code = extWindowDoHashAgg(pOperator, 0, 1, pInput);
50✔
895
    pExtW->lastWinId = i;  
50✔
896
    TAOS_CHECK_EXIT(code);
50!
897
  }
898

899
  
900
_exit:
619✔
901

902
  if (code) {
622!
903
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __FUNCTION__, lino, tstrerror(code));
×
904
  } else {
905
    if (pBlock) {
622✔
906
      TAOS_CHECK_EXIT(setInputDataBlock(pSup, pBlock, pExtW->binfo.inputTsOrder, MAIN_SCAN, true));
508!
907
    }
908

909
    if (!allRemains) {
622✔
910
      setExtWinCurIdx(pOperator, currIdx);  
508✔
911
    }
912
  }
913

914
  return code;
622✔
915
}
916

917
static void hashExternalWindowAgg(SOperatorInfo* pOperator, SSDataBlock* pInputBlock) {
313✔
918
  SExternalWindowOperator* pExtW = (SExternalWindowOperator*)pOperator->info;
313✔
919
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
313✔
920
  SExprSupp*               pSup = &pOperator->exprSupp;
313✔
921
  SResultRowInfo*          pResultRowInfo = &pExtW->binfo.resultRowInfo;
313✔
922
  int32_t                  startPos = 0;
313✔
923
  int32_t                  numOfOutput = pSup->numOfExprs;
313✔
924
  int64_t*                 tsCols = extractTsCol(pInputBlock, pExtW->primaryTsIndex, pTaskInfo);
313✔
925
  bool                     ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
313✔
926
  TSKEY                    ts = getStartTsKey(&pInputBlock->info.window, tsCols);
313✔
927
  SResultRow*              pResult = NULL;
313✔
928
  int32_t                  ret = 0;
313✔
929

930
  const STimeWindow* pWin = getExtWindow(pOperator, pExtW, ts);
313✔
931
  if (pWin == NULL) {
313!
932
    qError("%s failed to get time window for ts:%" PRId64 ", error:%s", GET_TASKID(pOperator->pTaskInfo), ts, tstrerror(terrno));
×
933
    return;
×
934
  }
935

936
  ret = extWindowProcessEmptyWins(pOperator, pInputBlock, false);
313✔
937
  if (ret != 0) {
313!
938
    T_LONG_JMP(pTaskInfo->env, ret);
×
939
  }
940
  
941
  qDebug("%s ext window1 start:%" PRId64 ", end:%" PRId64 ", ts:%" PRId64 ", ascScan:%d",
313✔
942
         GET_TASKID(pOperator->pTaskInfo), pWin->skey, pWin->ekey, ts, ascScan);        
943

944
  SExprSupp* pScalarSup = &pExtW->scalarSupp;
313✔
945
  if (pScalarSup->pExprInfo != NULL) {
313!
946
    ret = projectApplyFunctions(pScalarSup->pExprInfo, pInputBlock, pInputBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
×
947
                                 pExtW->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo));
×
948
    if (ret != 0) {
×
949
      T_LONG_JMP(pTaskInfo->env, ret);
×
950
    }
951
  }
952

953
  STimeWindow win = *pWin;
313✔
954
  ret = setExtWindowOutputBuf(pResultRowInfo, &win, &pResult, pInputBlock->info.id.groupId, pSup->pCtx, numOfOutput,
313✔
955
                              pSup->rowEntryInfoOffset, &pExtW->aggSup, pTaskInfo);
956
  if (ret != 0 || !pResult) {
313!
957
    T_LONG_JMP(pTaskInfo->env, ret);
×
958
  }
959
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
313!
960
  int32_t forwardRows = getNumOfRowsInTimeWindow(&pInputBlock->info, tsCols, startPos, ekey - 1, binarySearchForKey, NULL,
313✔
961
                                                 pExtW->binfo.inputTsOrder);
962

963
  updateTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &win, 1);
313✔
964
  ret = extWindowDoHashAgg(pOperator, startPos, forwardRows, pInputBlock);
313✔
965
  pExtW->lastWinId = getExtWinCurIdx(pTaskInfo);
313✔
966

967
  if (ret != 0) {
313!
968
    T_LONG_JMP(pTaskInfo->env, ret);
×
969
  }
970

971
  int32_t nextPosGot = 0;
313✔
972
  while (1) {
214✔
973
    int32_t prevEndPos = forwardRows + startPos - 1;
527✔
974
    if (prevEndPos >= pInputBlock->info.rows) {
527!
975
      break;
×
976
    }
977
    
978
    pWin = getExtNextWindow(pExtW, pTaskInfo);
527✔
979
    if (!pWin)
527✔
980
      break;
184✔
981
    else
982
      win = *pWin;
343✔
983
      
984
    qDebug("%s ext window2 start:%" PRId64 ", end:%" PRId64 ", ts:%" PRId64 ", ascScan:%d",
343✔
985
           GET_TASKID(pOperator->pTaskInfo), win.skey, win.ekey, ts, ascScan);
986
           
987
    nextPosGot = getNextStartPos(win, &pInputBlock->info, prevEndPos, pExtW->binfo.inputTsOrder, &startPos, tsCols);
343✔
988
    if (-1 == nextPosGot) {
343✔
989
      qDebug("%s ignore current block", GET_TASKID(pOperator->pTaskInfo));
129!
990
      break;
129✔
991
    }
992
    if (-2 == nextPosGot) {
214✔
993
      qDebug("%s skip current window", GET_TASKID(pOperator->pTaskInfo));
19!
994
      incExtWinCurIdx(pOperator);
19✔
995
      continue;
19✔
996
    }
997
    
998
    incExtWinCurIdx(pOperator);
195✔
999

1000
    ret = extWindowProcessEmptyWins(pOperator, pInputBlock, false);
195✔
1001
    if (ret != 0) {
195!
1002
      T_LONG_JMP(pTaskInfo->env, ret);
×
1003
    }
1004

1005
    ekey = ascScan ? win.ekey : win.skey;
195!
1006
    forwardRows = getNumOfRowsInTimeWindow(&pInputBlock->info, tsCols, startPos, ekey - 1, binarySearchForKey, NULL,
195✔
1007
                                           pExtW->binfo.inputTsOrder);
1008

1009
    ret = setExtWindowOutputBuf(pResultRowInfo, &win, &pResult, pInputBlock->info.id.groupId, pSup->pCtx,
195✔
1010
                                numOfOutput, pSup->rowEntryInfoOffset, &pExtW->aggSup, pTaskInfo);
1011
    if (ret != 0 || !pResult) T_LONG_JMP(pTaskInfo->env, ret);
195!
1012

1013
    updateTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &win, 1);
195✔
1014
    ret = extWindowDoHashAgg(pOperator, startPos, forwardRows, pInputBlock);
195✔
1015
    pExtW->lastWinId = getExtWinCurIdx(pTaskInfo);  
195✔
1016
    if (ret != 0) {
195!
1017
      T_LONG_JMP(pTaskInfo->env, ret);
×
1018
    }
1019
  }
1020
}
1021

1022

1023
static int32_t doOpenExternalWindow(SOperatorInfo* pOperator) {
120✔
1024
  if (OPTR_IS_OPENED(pOperator)) return TSDB_CODE_SUCCESS;
120!
1025
  int32_t                  code = 0;
120✔
1026
  int32_t                  lino = 0;
120✔
1027
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
120✔
1028
  SOperatorInfo*           pDownstream = pOperator->pDownstream[0];
120✔
1029
  SExternalWindowOperator* pExtW = pOperator->info;
120✔
1030
  SExprSupp*               pSup = &pOperator->exprSupp;
120✔
1031
  pTaskInfo->pStreamRuntimeInfo->funcInfo.extWinProjMode = (pExtW->mode != EEXT_MODE_AGG);
120✔
1032

1033
  int32_t scanFlag = MAIN_SCAN;
120✔
1034
  int64_t st = taosGetTimestampUs();
120✔
1035

1036
  if (!pExtW->pWins) {
120!
1037
    size_t size = taosArrayGetSize(pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPesudoFuncVals);
120✔
1038
    pExtW->pWins = taosArrayInit(size, sizeof(STimeWindow));
120✔
1039
    if (!pExtW->pWins) QUERY_CHECK_CODE(terrno, lino, _exit);
120!
1040
    pExtW->pOutputBlocks = taosArrayInit(size, sizeof(SBlockList));
120✔
1041
    if (!pExtW->pOutputBlocks) QUERY_CHECK_CODE(terrno, lino, _exit);
120!
1042
    // pExtW->pOffsetList = taosArrayInit(size, sizeof(SLimitInfo));
1043
    // if (!pExtW->pOffsetList) QUERY_CHECK_CODE(terrno, lino, _end);
1044
    for (int32_t i = 0; i < size; ++i) {
501✔
1045
      SSTriggerCalcParam* pParam = taosArrayGet(pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPesudoFuncVals, i);
381✔
1046
      STimeWindow win = {.skey = pParam->wstart, .ekey = pParam->wend};
381✔
1047
      if (pTaskInfo->pStreamRuntimeInfo->funcInfo.triggerType != STREAM_TRIGGER_SLIDING) {
381✔
1048
        win.ekey++;
176✔
1049
      }
1050
      TSDB_CHECK_NULL(taosArrayPush(pExtW->pWins, &win), code, lino, _exit, terrno);
762!
1051

1052
      SBlockList bl = {.pSrcBlock = pExtW->binfo.pRes, .pBlocks = 0, .blockRowNumThreshold = 4096};
381✔
1053
      code = blockListInit(&bl, 4096);
381✔
1054
      if (code != 0) QUERY_CHECK_CODE(code, lino, _exit);
381!
1055
      TSDB_CHECK_NULL(taosArrayPush(pExtW->pOutputBlocks, &bl), code, lino, _exit, terrno);
762!
1056
    }
1057
    pExtW->outputWinId = pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx;
120✔
1058
    pExtW->lastWinId = -1;
120✔
1059
  }
1060

1061
  while (1) {
313✔
1062
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
433✔
1063
    if (pBlock == NULL) {
427✔
1064
      if (EEXT_MODE_AGG == pExtW->mode) {
114!
1065
        TAOS_CHECK_EXIT(extWindowProcessEmptyWins(pOperator, pBlock, true));
114!
1066
      }
1067
      break;
114✔
1068
    }
1069

1070
    printDataBlock(pBlock, __func__, "externalwindow");
313✔
1071

1072
    qDebug("ext windowpExtW->mode:%d", pExtW->mode);
313✔
1073
    switch (pExtW->mode) {
313!
1074
      case EEXT_MODE_SCALAR:
×
1075
        TAOS_CHECK_EXIT(hashExternalWindowProject(pOperator, pBlock));
×
1076
        break;
×
1077
      case EEXT_MODE_AGG:
313✔
1078
        hashExternalWindowAgg(pOperator, pBlock);
313✔
1079
        break;
313✔
1080
      case EEXT_MODE_INDEFR_FUNC:
×
1081
        TAOS_CHECK_EXIT(hashExternalWindowIndefRows(pOperator, pBlock));
×
1082
        break;
×
1083
      default:
×
1084
        break;
×
1085
    }
1086
  }
1087

1088
  OPTR_SET_OPENED(pOperator);
114✔
1089

1090
  if (pExtW->mode == EEXT_MODE_AGG) {
114!
1091
    qDebug("ext window before dump final rows num:%d", tSimpleHashGetSize(pExtW->aggSup.pResultRowHashTable));
114✔
1092

1093
    code = initGroupedResultInfo(&pExtW->groupResInfo, pExtW->aggSup.pResultRowHashTable, pExtW->binfo.inputTsOrder);
114✔
1094
    QUERY_CHECK_CODE(code, lino, _exit);
114!
1095

1096
    qDebug("ext window after dump final rows num:%d", tSimpleHashGetSize(pExtW->aggSup.pResultRowHashTable));
114✔
1097
  }
1098

1099
_exit:
22✔
1100

1101
  if (code != 0) {
114!
1102
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
1103
    pTaskInfo->code = code;
×
1104
    T_LONG_JMP(pTaskInfo->env, code);
×
1105
  }
1106
  return code;
114✔
1107
}
1108

1109
static int32_t externalWindowNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
199✔
1110
  int32_t                  code = 0;
199✔
1111
  int32_t                  lino = 0;
199✔
1112
  SExternalWindowOperator* pExtW = pOperator->info;
199✔
1113
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
199✔
1114
  SExprSupp*               pSup = &pOperator->exprSupp;
199✔
1115

1116
  if (pOperator->status == OP_EXEC_DONE) {
199✔
1117
    *ppRes = NULL;
80✔
1118
    return code;
80✔
1119
  }
1120

1121
  SSDataBlock* pBlock = pExtW->binfo.pRes;
119✔
1122
  blockDataCleanup(pBlock);
119✔
1123
  code = pOperator->fpSet._openFn(pOperator);
120✔
1124
  QUERY_CHECK_CODE(code, lino, _end);
114!
1125

1126
  while (1) {
1127
    if (pExtW->mode == EEXT_MODE_SCALAR || pExtW->mode == EEXT_MODE_INDEFR_FUNC) {
114!
1128
      if (pExtW->outputWinId >= taosArrayGetSize(pExtW->pOutputBlocks)) {
×
1129
        pBlock->info.rows = 0;
×
1130
        break;
×
1131
      }
1132
      SBlockList* pList = taosArrayGet(pExtW->pOutputBlocks, pExtW->outputWinId);
×
1133
      // SLimitInfo* pLimitInfo = taosArrayGet(pExtW->pOffsetList, pExtW->outputWinId);
1134
      if (pExtW->pOutputBlockListNode == NULL) {
×
1135
        pExtW->pOutputBlockListNode = tdListGetHead(pList->pBlocks);
×
1136
      } else {
1137
        if (!pExtW->pOutputBlockListNode->dl_next_) {
×
1138
          pExtW->pOutputBlockListNode = NULL;
×
1139
          pExtW->outputWinId++;
×
1140
          incExtWinOutIdx(pOperator);
×
1141
          continue;
×
1142
        }
1143
        pExtW->pOutputBlockListNode = pExtW->pOutputBlockListNode->dl_next_;
×
1144
      }
1145

1146
      if (pExtW->pOutputBlockListNode) {
×
1147
        code = blockDataMerge(pBlock, *(SSDataBlock**)pExtW->pOutputBlockListNode->data);
×
1148
        if (code != 0) goto _end;
×
1149

1150
        // int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock);
1151
        // if (status == EXTERNAL_RETRIEVE_CONTINUE) {
1152
        //   continue;
1153
        // } else if (status == EXTERNAL_RETRIEVE_NEXT_WINDOW) {
1154
        //   pExtW->pOutputBlockListNode = NULL;
1155
        //   pExtW->outputWinId++;
1156
        //   incExtWinOutIdx(pOperator);
1157
        //   continue;
1158
        // } else if (status == EXTERNAL_RETRIEVE_DONE) {
1159
        //   // do nothing, just break
1160
        // }
1161
      } else {
1162
        pExtW->outputWinId++;
×
1163
        incExtWinOutIdx(pOperator);
×
1164
        continue;
×
1165
      }
1166
      break;
×
1167
    } else {
1168
      doBuildResultDatablock(pOperator, &pExtW->binfo, &pExtW->groupResInfo, pExtW->aggSup.pResultBuf);
114✔
1169
      bool hasRemain = hasRemainResults(&pExtW->groupResInfo);
114✔
1170
      if (!hasRemain) {
114!
1171
        setOperatorCompleted(pOperator);
114✔
1172
        break;
114✔
1173
      }
1174
      if (pExtW->binfo.pRes->info.rows > 0) break;
×
1175
    }
1176
  }
1177

1178
  pOperator->resultInfo.totalRows += pExtW->binfo.pRes->info.rows;
114✔
1179
  
1180
_end:
114✔
1181

1182
  if (code != 0) {
114!
1183
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1184
    pTaskInfo->code = code;
×
1185
    T_LONG_JMP(pTaskInfo->env, code);
×
1186
  }
1187
  (*ppRes) = (pExtW->binfo.pRes->info.rows == 0) ? NULL : pExtW->binfo.pRes;
114!
1188

1189
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM && (*ppRes)) {
114!
1190
    printDataBlock(*ppRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
114✔
1191
  }
1192
  
1193
  return code;
114✔
1194
}
1195

1196
static int32_t setSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, const STimeWindow* pWin, SResultRow** pResult,
×
1197
                                       SExprSupp* pExprSup, SAggSupporter* pAggSup) {
1198
  if (*pResult == NULL) {
×
1199
    *pResult = getNewResultRow(pAggSup->pResultBuf, &pAggSup->currentPageId, pAggSup->resultRowSize);
×
1200
    if (!*pResult) {
×
1201
      qError("get new resultRow failed, err:%s", tstrerror(terrno));
×
1202
      return terrno;
×
1203
    }
1204
    pResultRowInfo->cur = (SResultRowPosition){.pageId = (*pResult)->pageId, .offset = (*pResult)->offset};
×
1205
  }
1206
  (*pResult)->win = *pWin;
×
1207
  return setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
×
1208
}
1209

1210
static int32_t doMergeAlignExtWindowAgg(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock, SSDataBlock* pResultBlock) {
×
1211
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
×
1212
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
×
1213

1214
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
×
1215
  SExprSupp*     pSup = &pOperator->exprSupp;
×
1216
  int32_t        code = 0;
×
1217

1218
  int32_t startPos = 0;
×
1219
  int64_t* tsCols = extractTsCol(pBlock, pExtW->primaryTsIndex, pTaskInfo);
×
1220
  TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
×
1221

1222
  const STimeWindow *pWin = getExtWindow(pOperator, pExtW, ts);
×
1223
  if (pWin == NULL) {
×
1224
    qError("failed to get time window for ts:%" PRId64 ", index:%d, error:%s", ts, pExtW->primaryTsIndex, tstrerror(terrno));
×
1225
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_INVALID_PARA);
×
1226
  }
1227
  code = setSingleOutputTupleBuf(pResultRowInfo, pWin, &pMlExtInfo->pResultRow, pSup, &pExtW->aggSup);
×
1228
  if (code != 0 || pMlExtInfo->pResultRow == NULL) {
×
1229
    T_LONG_JMP(pTaskInfo->env, code);
×
1230
  }
1231

1232
  int32_t currPos = startPos;
×
1233
  pMlExtInfo->curTs = pWin->skey;
×
1234
  while (++currPos < pBlock->info.rows) {
×
1235
    if (tsCols[currPos] == pMlExtInfo->curTs) continue;
×
1236

1237
    qDebug("current ts:%" PRId64 ", startPos:%d, currPos:%d, tsCols[currPos]:%" PRId64,
×
1238
      pMlExtInfo->curTs, startPos, currPos, tsCols[currPos]); 
1239
    code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pExtW->twAggSup.timeWindowData, startPos,
×
1240
                                           currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
×
1241
    if (code != 0) {
×
1242
      T_LONG_JMP(pTaskInfo->env, code);
×
1243
    }
1244

1245
    finalizeResultRows(pExtW->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
×
1246
    resetResultRow(pMlExtInfo->pResultRow, pExtW->aggSup.resultRowSize - sizeof(SResultRow));
×
1247

1248
    pWin = getExtNextWindow(pExtW, pTaskInfo);
×
1249
    if (!pWin) break;
×
1250
    qDebug("ext window align2 start:%" PRId64 ", end:%" PRId64, pWin->skey, pWin->ekey);
×
1251
    incExtWinCurIdx(pOperator);
×
1252
    startPos = currPos;
×
1253
    code = setSingleOutputTupleBuf(pResultRowInfo, pWin, &pMlExtInfo->pResultRow, pSup, &pExtW->aggSup);
×
1254
    if (code != 0 || pMlExtInfo->pResultRow == NULL) {
×
1255
      T_LONG_JMP(pTaskInfo->env, code);
×
1256
    }
1257
    pMlExtInfo->curTs = pWin->skey;
×
1258
  }
1259

1260
  code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pExtW->twAggSup.timeWindowData, startPos,
×
1261
                                         currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
×
1262
  if (code != 0) {
×
1263
    T_LONG_JMP(pTaskInfo->env, code);
×
1264
  }
1265
  return code;
×
1266
}
1267

1268
static int32_t doMergeAlignExtWindowProject(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
×
1269
                                            SSDataBlock* pResultBlock) {
1270
  SExternalWindowOperator* pExtW = pOperator->info;
×
1271
  SExprSupp*               pExprSup = &pExtW->scalarSupp;
×
1272
  int32_t code = projectApplyFunctions(pExprSup->pExprInfo, pResultBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
×
1273
                        GET_STM_RTINFO(pOperator->pTaskInfo));
×
1274
  return code;
×
1275
}
1276

1277
void doMergeAlignExternalWindow(SOperatorInfo* pOperator) {
×
1278
  SExecTaskInfo*                       pTaskInfo = pOperator->pTaskInfo;
×
1279
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
×
1280
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
×
1281
  SResultRow*                          pResultRow = NULL;
×
1282
  int32_t                              code = 0;
×
1283
  SSDataBlock*                         pRes = pExtW->binfo.pRes;
×
1284
  SExprSupp*                           pSup = &pOperator->exprSupp;
×
1285
  int32_t                              lino = 0;
×
1286

1287
  while (1) {
×
1288
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
×
1289

1290
    if (pBlock == NULL) {
×
1291
      // close last time window
1292
      if (pMlExtInfo->curTs != INT64_MIN && EEXT_MODE_AGG == pExtW->mode) {
×
1293
        finalizeResultRows(pMlExtInfo->pExtW->aggSup.pResultBuf, &pExtW->binfo.resultRowInfo.cur, pSup, pRes, pTaskInfo);
×
1294
        resetResultRow(pMlExtInfo->pResultRow,pExtW->aggSup.resultRowSize - sizeof(SResultRow));
×
1295
      }
1296
      setOperatorCompleted(pOperator);
×
1297
      break;
×
1298
    }
1299

1300
    pRes->info.scanFlag = pBlock->info.scanFlag;
×
1301
    code = setInputDataBlock(pSup, pBlock, pExtW->binfo.inputTsOrder, pBlock->info.scanFlag, true);
×
1302
    QUERY_CHECK_CODE(code, lino, _end);
×
1303

1304
    printDataBlock(pBlock, __func__, "externalwindowAlign");
×
1305
    qDebug("ext windowpExtWAlign->scalarMode:%d", pExtW->mode);
×
1306

1307

1308
    if (EEXT_MODE_SCALAR == pExtW->mode) {
×
1309
      code = doMergeAlignExtWindowProject(pOperator, &pExtW->binfo.resultRowInfo, pBlock, pRes);
×
1310
    } else {
1311
      code = doMergeAlignExtWindowAgg(pOperator, &pExtW->binfo.resultRowInfo, pBlock, pRes);
×
1312
    }
1313

1314
    // int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock);
1315
    // if (status == EXTERNAL_RETRIEVE_CONTINUE) {
1316
    //   continue;
1317
    // } else if (status == EXTERNAL_RETRIEVE_NEXT_WINDOW) {
1318
    //   pExtW->pOutputBlockListNode = NULL;
1319
    //   pExtW->outputWinId++;
1320
    //   incExtWinOutIdx(pOperator);
1321
    //   continue;
1322
    // } else if (status == EXTERNAL_RETRIEVE_DONE) {
1323
    //   // do nothing, just break
1324
    // }
1325
  }
1326
_end:
×
1327
  if (code != 0) {
×
1328
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
1329
    pTaskInfo->code = code;
×
1330
    T_LONG_JMP(pTaskInfo->env, code);
×
1331
  }
1332
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc