• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4653

06 Aug 2025 01:23AM UTC coverage: 60.5% (-0.5%) from 60.986%
#4653

push

travis-ci

web-flow
fix: [TD-37190]: disable ignore_nodata_trigger when window type is not interval/sliding or period. (#32405)

138499 of 291677 branches covered (47.48%)

Branch coverage included in aggregate %.

52 of 106 new or added lines in 1 file covered. (49.06%)

1648 existing lines in 119 files now uncovered.

209836 of 284084 relevant lines covered (73.86%)

4732834.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

35.22
/source/libs/executor/src/externalwindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "operator.h"
18
#include "querytask.h"
19
#include "tdatablock.h"
20
#include "stream.h"
21
#include "filter.h"
22

23
typedef struct SBlockList {
24
  const SSDataBlock* pSrcBlock;
25
  SList*             pBlocks;
26
  int32_t            blockRowNumThreshold;
27
} SBlockList;
28

29
typedef struct SExternalWindowOperator {
30
  SOptrBasicInfo     binfo;
31
  SAggSupporter      aggSup;
32
  SExprSupp          scalarSupp;
33
  STimeWindowAggSupp twAggSup;
34
  SGroupResInfo      groupResInfo;
35
  int32_t            primaryTsIndex;
36
  EExtWinMode        mode;
37
  SArray*            pWins;
38
  SArray*            pOutputBlocks;  // for each window, we have a list of blocks
39
  // SArray*            pOffsetList;  // for each window
40
  int32_t            lastWinId;
41
  int32_t            outputWinId;
42
  SListNode*         pOutputBlockListNode;  // block index in block array used for output
43
  SSDataBlock*       pTmpBlock;
44
  SSDataBlock*       pEmptyInputBlock;
45
  SArray*            pPseudoColInfo;  
46
  bool               hasCountFunc;
47
  // SLimitInfo         limitInfo;  // limit info for each window
48
} SExternalWindowOperator;
49

50
static int32_t blockListInit(SBlockList* pBlockList, int32_t threshold) {
361✔
51
  pBlockList->pBlocks = tdListNew(sizeof(SSDataBlock*));
361✔
52
  if (!pBlockList->pBlocks) {
361!
53
    return terrno;
×
54
  }
55
  return 0;
361✔
56
}
57

58
static int32_t blockListAddBlock(SBlockList* pBlockList) {
×
59
  SSDataBlock* pRes = NULL;
×
60
  int32_t      code = createOneDataBlock(pBlockList->pSrcBlock, false, &pRes);
×
61
  if (code != 0) {
×
62
    return code;
×
63
  }
64
  code = blockDataEnsureCapacity(pRes, pBlockList->blockRowNumThreshold);
×
65
  if (code != 0) {
×
66
    blockDataDestroy(pRes);
×
67
    return code;
×
68
  }
69
  code = tdListAppend(pBlockList->pBlocks, &pRes);
×
70
  if (code != 0) {
×
71
    blockDataDestroy(pRes);
×
72
    return code;
×
73
  }
74
  return 0;
×
75
}
76

77
static int32_t blockListGetLastBlock(SBlockList* pBlockList, SSDataBlock** ppBlock) {
×
78
  SListNode* pNode = TD_DLIST_TAIL(pBlockList->pBlocks);
×
79
  int32_t    code = 0;
×
80
  *ppBlock = NULL;
×
81
  code = blockListAddBlock(pBlockList);
×
82
  if (0 == code) {
×
83
    pNode = TD_DLIST_TAIL(pBlockList->pBlocks);
×
84
    *ppBlock = *(SSDataBlock**)pNode->data;
×
85
  }
86
  return code;
×
87
}
88

89
static void blockListDestroy(void* p) {
361✔
90
  SBlockList* pBlockList = (SBlockList*)p;
361✔
91
  if (TD_DLIST_NELES(pBlockList->pBlocks) > 0) {
361!
92
    SListNode* pNode = TD_DLIST_HEAD(pBlockList->pBlocks);
×
93
    while (pNode) {
×
94
      SSDataBlock* pBlock = *(SSDataBlock**)pNode->data;
×
95
      blockDataDestroy(pBlock);
×
96
      pNode = pNode->dl_next_;
×
97
    }
98
  }
99
  taosMemoryFree(pBlockList->pBlocks);
361!
100
}
361✔
101

102
void destroyExternalWindowOperatorInfo(void* param) {
111✔
103
  if (NULL == param) {
111!
104
    return;
×
105
  }
106
  SExternalWindowOperator* pInfo = (SExternalWindowOperator*)param;
111✔
107
  cleanupBasicInfo(&pInfo->binfo);
111✔
108

109
  taosArrayDestroyEx(pInfo->pOutputBlocks, blockListDestroy);
111✔
110
  taosArrayDestroy(pInfo->pWins);
111✔
111
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
111✔
112
  cleanupGroupResInfo(&pInfo->groupResInfo);
111✔
113
 
114
  taosArrayDestroy(pInfo->pPseudoColInfo);
111✔
115
  blockDataDestroy(pInfo->pTmpBlock);
111✔
116
  blockDataDestroy(pInfo->pEmptyInputBlock);
111✔
117

118
  cleanupAggSup(&pInfo->aggSup);
111✔
119
  cleanupExprSupp(&pInfo->scalarSupp);
111✔
120

121
  pInfo->binfo.resultRowInfo.openWindow = tdListFree(pInfo->binfo.resultRowInfo.openWindow);
111✔
122

123
  taosMemoryFreeClear(pInfo);
111!
124
}
125

126
static int32_t doOpenExternalWindow(SOperatorInfo* pOperator);
127
static int32_t externalWindowNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
128

129
typedef struct SMergeAlignedExternalWindowOperator {
130
  SExternalWindowOperator* pExtW;
131
  int64_t curTs;
132
  SSDataBlock* pPrefetchedBlock;
133
  SResultRow*  pResultRow;
134
} SMergeAlignedExternalWindowOperator;
135

136
void destroyMergeAlignedExternalWindowOperator(void* pOperator) {
×
137
  SMergeAlignedExternalWindowOperator* pMlExtInfo = (SMergeAlignedExternalWindowOperator*)pOperator;
×
138
  destroyExternalWindowOperatorInfo(pMlExtInfo->pExtW);
×
139
}
×
140

141
void doMergeAlignExternalWindow(SOperatorInfo* pOperator);
142

143
static int32_t mergeAlignedExternalWindowNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
144
  SExecTaskInfo*                       pTaskInfo = pOperator->pTaskInfo;
×
145
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
×
146
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
×
147
  int32_t                              code = 0;
×
148
  int32_t lino = 0;
×
149

150
  if (pOperator->status == OP_EXEC_DONE) {
×
151
    (*ppRes) = NULL;
×
152
    return TSDB_CODE_SUCCESS;
×
153
  }
154

155
  SSDataBlock* pRes = pExtW->binfo.pRes;
×
156
  blockDataCleanup(pRes);
×
157

158
  if (!pExtW->pWins) {
×
159
    size_t size = taosArrayGetSize(pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPesudoFuncVals);
×
160
    pExtW->pWins = taosArrayInit(size, sizeof(STimeWindow));
×
161
    if (!pExtW->pWins) QUERY_CHECK_CODE(terrno, lino, _end);
×
162
    // pExtW->pOffsetList = taosArrayInit(size, sizeof(SLimitInfo));
163
    // if (!pExtW->pOffsetList) QUERY_CHECK_CODE(terrno, lino, _end);
164

165
    for (int32_t i = 0; i < size; ++i) {
×
166
      SSTriggerCalcParam* pParam = taosArrayGet(pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPesudoFuncVals, i);
×
167
      STimeWindow win = {.skey = pParam->wstart, .ekey = pParam->wend};
×
168
      if (pTaskInfo->pStreamRuntimeInfo->funcInfo.triggerType != 1){  // 1 meams STREAM_TRIGGER_SLIDING
×
169
        win.ekey++;
×
170
      }
171
      TSDB_CHECK_NULL(taosArrayPush(pExtW->pWins, &win), code, lino, _end, terrno);
×
172
    }
173
    pExtW->outputWinId = pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx;
×
174
  }
175

176
  doMergeAlignExternalWindow(pOperator);
×
177
  size_t rows = pRes->info.rows;
×
178
  pOperator->resultInfo.totalRows += rows;
×
179
  (*ppRes) = (rows == 0) ? NULL : pRes;
×
180

181
_end:
×
182
  if (code != 0) {
×
183
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
184
    pTaskInfo->code = code;
×
185
    T_LONG_JMP(pTaskInfo->env, code);
×
186
  }
187
  return code;
×
188
}
189

190
int32_t resetMergeAlignedExternalWindowOperator(SOperatorInfo* pOperator) {
×
191
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
×
192
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
×
193
  SExecTaskInfo*                       pTaskInfo = pOperator->pTaskInfo;
×
194
  SMergeAlignedIntervalPhysiNode * pPhynode = (SMergeAlignedIntervalPhysiNode*)pOperator->pPhyNode;
×
195
  pOperator->status = OP_NOT_OPENED;
×
196

197
  taosArrayDestroy(pExtW->pWins);
×
198
  pExtW->pWins = NULL;
×
199

200
  resetBasicOperatorState(&pExtW->binfo);
×
201
  pMlExtInfo->pResultRow = NULL;
×
202
  pMlExtInfo->curTs = INT64_MIN;
×
203
  if (pMlExtInfo->pPrefetchedBlock) blockDataCleanup(pMlExtInfo->pPrefetchedBlock);
×
204

205
  int32_t code = resetAggSup(&pOperator->exprSupp, &pExtW->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
×
206
                             sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
×
207
                             &pTaskInfo->storageAPI.functionStore);
208
  if (code == 0) {
×
209
    colDataDestroy(&pExtW->twAggSup.timeWindowData);
×
210
    code = initExecTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pTaskInfo->window);
×
211
  }
212
  return code;
×
213
}
214

215
int32_t createMergeAlignedExternalWindowOperator(SOperatorInfo* pDownstream, SPhysiNode* pNode,
×
216
                                                 SExecTaskInfo* pTaskInfo, SOperatorInfo** ppOptrOut) {
217
  SMergeAlignedIntervalPhysiNode* pPhynode = (SMergeAlignedIntervalPhysiNode*)pNode;
×
218
  int32_t code = 0;
×
219
  int32_t lino = 0;
×
220
  SMergeAlignedExternalWindowOperator* pMlExtInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedExternalWindowOperator));
×
221
  SOperatorInfo*                       pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
222

223
  if (pTaskInfo->pStreamRuntimeInfo != NULL){
×
224
    pTaskInfo->pStreamRuntimeInfo->funcInfo.withExternalWindow = true;
×
225
  }
226
  pOperator->pPhyNode = pNode;
×
227
  if (!pMlExtInfo || !pOperator) {
×
228
    code = terrno;
×
229
    goto _error;
×
230
  }
231

232
  pMlExtInfo->pExtW = taosMemoryCalloc(1, sizeof(SExternalWindowOperator));
×
233
  if (!pMlExtInfo->pExtW) {
×
234
    code = terrno;
×
235
    goto _error;
×
236
  }
237

238
  SExternalWindowOperator* pExtW = pMlExtInfo->pExtW;
×
239
  SExprSupp* pSup = &pOperator->exprSupp;
×
240
  pSup->hasWindowOrGroup = true;
×
241
  pMlExtInfo->curTs = INT64_MIN;
×
242

243
  pExtW->primaryTsIndex = ((SColumnNode*)pPhynode->window.pTspk)->slotId;
×
244
  pExtW->mode = pPhynode->window.pProjs ? EEXT_MODE_SCALAR : EEXT_MODE_AGG;
×
245
  pExtW->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC;
×
246
  pExtW->binfo.outputTsOrder = pExtW->binfo.inputTsOrder;
×
247

248
  // pExtW->limitInfo = (SLimitInfo){0};
249
  // initLimitInfo(pPhynode->window.node.pLimit, pPhynode->window.node.pSlimit, &pExtW->limitInfo);
250

251
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
×
252
  initResultSizeInfo(&pOperator->resultInfo, 512);
×
253

254
  int32_t num = 0;
×
255
  SExprInfo* pExprInfo = NULL;
×
256
  code = createExprInfo(pPhynode->window.pFuncs, NULL, &pExprInfo, &num);
×
257
  QUERY_CHECK_CODE(code, lino, _error);
×
258

259
  if (pExtW->mode == EEXT_MODE_AGG) {
×
260
    code = initAggSup(pSup, &pExtW->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
×
261
                      &pTaskInfo->storageAPI.functionStore);
262
    QUERY_CHECK_CODE(code, lino, _error);
×
263
  }
264

265
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhynode->window.node.pOutputDataBlockDesc);
×
266
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
×
267
  initBasicInfo(&pExtW->binfo, pResBlock);
×
268

269
  initResultRowInfo(&pExtW->binfo.resultRowInfo);
×
270
  code = blockDataEnsureCapacity(pExtW->binfo.pRes, pOperator->resultInfo.capacity);
×
271
  QUERY_CHECK_CODE(code, lino, _error);
×
272
  setOperatorInfo(pOperator, "MergeAlignedExternalWindowOperator", QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW, false, OP_NOT_OPENED, pMlExtInfo, pTaskInfo);
×
273
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignedExternalWindowNext, NULL,
×
274
                                         destroyMergeAlignedExternalWindowOperator, optrDefaultBufFn, NULL,
275
                                         optrDefaultGetNextExtFn, NULL);
276
  setOperatorResetStateFn(pOperator, resetMergeAlignedExternalWindowOperator);
×
277

278
  code = appendDownstream(pOperator, &pDownstream, 1);
×
279
  QUERY_CHECK_CODE(code, lino, _error);
×
280
  *ppOptrOut = pOperator;
×
281
  return code;
×
282
_error:
×
283
  if (pMlExtInfo) destroyMergeAlignedExternalWindowOperator(pMlExtInfo);
×
284
  destroyOperatorAndDownstreams(pOperator, &pDownstream, 1);
×
285
  pTaskInfo->code = code;
×
286
  return code;
×
287
}
288

289
static int32_t resetExternalWindowOperator(SOperatorInfo* pOperator) {
83✔
290
  SExternalWindowOperator* pExtW = pOperator->info;
83✔
291
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
83✔
292
  SExternalWindowPhysiNode* pPhynode = (SExternalWindowPhysiNode*)pOperator->pPhyNode;
83✔
293
  pOperator->status = OP_NOT_OPENED;
83✔
294

295
  resetBasicOperatorState(&pExtW->binfo);
83✔
296
  pExtW->outputWinId = 0;
83✔
297
  pExtW->lastWinId = -1;
83✔
298
  taosArrayDestroyEx(pExtW->pOutputBlocks, blockListDestroy);
83✔
299
  // taosArrayDestroy(pExtW->pOffsetList);
300
  taosArrayDestroy(pExtW->pWins);
83✔
301
  pExtW->pWins = NULL;
83✔
302
  pExtW->pOutputBlockListNode = NULL;
83✔
303
  pExtW->pOutputBlocks = NULL;
83✔
304
  // pExtW->pOffsetList = NULL;
305
  initResultSizeInfo(&pOperator->resultInfo, 512);
83✔
306
  int32_t code = blockDataEnsureCapacity(pExtW->binfo.pRes, pOperator->resultInfo.capacity);
83✔
307
  if (code == 0) {
83!
308
    code = resetAggSup(&pOperator->exprSupp, &pExtW->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
83✔
309
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
83✔
310
                       &pTaskInfo->storageAPI.functionStore);
311
  }
312
  if (code == 0) {
83!
313
    code = resetExprSupp(&pExtW->scalarSupp, pTaskInfo, pPhynode->window.pProjs, NULL,
83✔
314
                         &pTaskInfo->storageAPI.functionStore);
315
  }
316
  if (code == 0) {
83!
317
    colDataDestroy(&pExtW->twAggSup.timeWindowData);
83✔
318
    code = initExecTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pTaskInfo->window);
83✔
319
  }
320
  if (code == 0) {
83!
321
    cleanupGroupResInfo(&pExtW->groupResInfo);
83✔
322
  }
323
  blockDataDestroy(pExtW->pTmpBlock);
83✔
324
  pExtW->pTmpBlock = NULL;
83✔
325
  return code;
83✔
326
}
327

328
static EDealRes extWindowHasCountLikeFunc(SNode* pNode, void* res) {
724✔
329
  if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
724✔
330
    SFunctionNode* pFunc = (SFunctionNode*)pNode;
264✔
331
    if (fmIsCountLikeFunc(pFunc->funcId) || (pFunc->hasOriginalFunc && fmIsCountLikeFunc(pFunc->originalFuncId))) {
264!
332
      *(bool*)res = true;
110✔
333
      return DEAL_RES_END;
110✔
334
    }
335
  }
336
  return DEAL_RES_CONTINUE;
614✔
337
}
338

339

340
static int32_t extWindowCreateEmptyInputBlock(SOperatorInfo* pOperator, SSDataBlock** ppBlock) {
110✔
341
  int32_t code = TSDB_CODE_SUCCESS;
110✔
342
  int32_t lino = 0;
110✔
343
  SSDataBlock* pBlock = NULL;
110✔
344
  if (!tsCountAlwaysReturnValue) {
110!
345
    return TSDB_CODE_SUCCESS;
×
346
  }
347

348
  SExternalWindowOperator* pExtW = pOperator->info;
110✔
349

350
  if (!pExtW->hasCountFunc) {
110!
351
    return TSDB_CODE_SUCCESS;
×
352
  }
353

354
  code = createDataBlock(&pBlock);
110✔
355
  if (code) {
110!
356
    return code;
×
357
  }
358

359
  pBlock->info.rows = 1;
110✔
360
  pBlock->info.capacity = 0;
110✔
361

362
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
506✔
363
    SColumnInfoData colInfo = {0};
396✔
364
    colInfo.hasNull = true;
396✔
365
    colInfo.info.type = TSDB_DATA_TYPE_NULL;
396✔
366
    colInfo.info.bytes = 1;
396✔
367

368
    SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
396✔
369
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
844✔
370
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
448✔
371
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
448✔
372
        int32_t slotId = pFuncParam->pCol->slotId;
317✔
373
        int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
317✔
374
        if (slotId >= numOfCols) {
317✔
375
          code = taosArrayEnsureCap(pBlock->pDataBlock, slotId + 1);
200✔
376
          QUERY_CHECK_CODE(code, lino, _end);
200!
377

378
          for (int32_t k = numOfCols; k < slotId + 1; ++k) {
400✔
379
            void* tmp = taosArrayPush(pBlock->pDataBlock, &colInfo);
200✔
380
            QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
200!
381
          }
382
        }
383
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
131✔
384
        // do nothing
385
      }
386
    }
387
  }
388

389
  code = blockDataEnsureCapacity(pBlock, pBlock->info.rows);
110✔
390
  QUERY_CHECK_CODE(code, lino, _end);
110!
391

392
  for (int32_t i = 0; i < blockDataGetNumOfCols(pBlock); ++i) {
310✔
393
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
200✔
394
    QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
200!
395
    colDataSetNULL(pColInfoData, 0);
396
  }
397
  *ppBlock = pBlock;
110✔
398

399
_end:
110✔
400
  if (code != TSDB_CODE_SUCCESS) {
110!
401
    blockDataDestroy(pBlock);
×
402
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
403
  }
404
  return code;
110✔
405
}
406

407
int32_t createExternalWindowOperator(SOperatorInfo* pDownstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo,
111✔
408
                                     SOperatorInfo** pOptrOut) {
409
  SExternalWindowPhysiNode* pPhynode = (SExternalWindowPhysiNode*)pNode;
111✔
410
  QRY_PARAM_CHECK(pOptrOut);
111!
411
  int32_t                  code = 0;
111✔
412
  int32_t                  lino = 0;
111✔
413
  SExternalWindowOperator* pExtW = taosMemoryCalloc(1, sizeof(SExternalWindowOperator));
111!
414
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
111!
415
  pOperator->pPhyNode = pNode;
111✔
416
  if (!pExtW || !pOperator) {
111!
417
    code = terrno;
×
418
    lino = __LINE__;
×
419
    goto _error;
×
420
  }
421
  
422
  setOperatorInfo(pOperator, "ExternalWindowOperator", QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW, true, OP_NOT_OPENED,
111✔
423
                  pExtW, pTaskInfo);
424
                  
425
  if (pTaskInfo->pStreamRuntimeInfo != NULL){
111!
426
    pTaskInfo->pStreamRuntimeInfo->funcInfo.withExternalWindow = true;
111✔
427
  }
428
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhynode->window.node.pOutputDataBlockDesc);
111✔
429
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
111!
430
  initBasicInfo(&pExtW->binfo, pResBlock);
111✔
431

432
  pExtW->primaryTsIndex = ((SColumnNode*)pPhynode->window.pTspk)->slotId;
111✔
433
  pExtW->mode = pPhynode->window.pProjs ? EEXT_MODE_SCALAR : (pPhynode->window.indefRowsFunc ? EEXT_MODE_INDEFR_FUNC : EEXT_MODE_AGG);
111!
434
  pExtW->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC;
111✔
435
  pExtW->binfo.outputTsOrder = pExtW->binfo.inputTsOrder;
111✔
436

437
  // pExtW->limitInfo = (SLimitInfo){0};
438
  // initLimitInfo(pPhynode->window.node.pLimit, pPhynode->window.node.pSlimit, &pExtW->limitInfo);
439

440
  if (pPhynode->window.pProjs) {
111!
441
    int32_t    numOfScalarExpr = 0;
×
442
    SExprInfo* pScalarExprInfo = NULL;
×
443
    code = createExprInfo(pPhynode->window.pProjs, NULL, &pScalarExprInfo, &numOfScalarExpr);
×
444
    QUERY_CHECK_CODE(code, lino, _error);
×
445

446
    code = initExprSupp(&pExtW->scalarSupp, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
×
447
    QUERY_CHECK_CODE(code, lino, _error);
×
448
  } else if (pExtW->mode == EEXT_MODE_AGG) {
111!
449
    if (pPhynode->window.pExprs != NULL) {
111!
450
      int32_t    num = 0;
×
451
      SExprInfo* pSExpr = NULL;
×
452
      code = createExprInfo(pPhynode->window.pExprs, NULL, &pSExpr, &num);
×
453
      QUERY_CHECK_CODE(code, lino, _error);
×
454
    
455
      code = initExprSupp(&pExtW->scalarSupp, pSExpr, num, &pTaskInfo->storageAPI.functionStore);
×
456
      if (code != TSDB_CODE_SUCCESS) {
×
457
        goto _error;
×
458
      }
459
    }
460
    
461
    size_t keyBufSize = sizeof(int64_t) * 2 + POINTER_BYTES;
111✔
462
    initResultSizeInfo(&pOperator->resultInfo, 512);
111✔
463
    code = blockDataEnsureCapacity(pExtW->binfo.pRes, pOperator->resultInfo.capacity);
111✔
464
    QUERY_CHECK_CODE(code, lino, _error);
111!
465

466
    int32_t num = 0;
111✔
467
    SExprInfo* pExprInfo = NULL;
111✔
468
    code = createExprInfo(pPhynode->window.pFuncs, NULL, &pExprInfo, &num);
111✔
469
    QUERY_CHECK_CODE(code, lino, _error);
111!
470
    code = initAggSup(&pOperator->exprSupp, &pExtW->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, 0, 0);
111✔
471
    QUERY_CHECK_CODE(code, lino, _error);
111!
472

473
    nodesWalkExprs(pPhynode->window.pFuncs, extWindowHasCountLikeFunc, &pExtW->hasCountFunc);
111✔
474
    if (pExtW->hasCountFunc) {
111✔
475
      code = extWindowCreateEmptyInputBlock(pOperator, &pExtW->pEmptyInputBlock);
110✔
476
      QUERY_CHECK_CODE(code, lino, _error);
110!
477
      qDebug("%s ext window has CountLikeFunc", pOperator->pTaskInfo->id.str);
110✔
478
    } else {
479
      qDebug("%s ext window doesn't have CountLikeFunc", pOperator->pTaskInfo->id.str);
1!
480
    }
481

482
    code = initExecTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pTaskInfo->window);
111✔
483
    QUERY_CHECK_CODE(code, lino, _error);
111!
484
  } else {
485
    size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
×
486
    
487
    if (pPhynode->window.pExprs != NULL) {
×
488
      int32_t    num = 0;
×
489
      SExprInfo* pSExpr = NULL;
×
490
      code = createExprInfo(pPhynode->window.pExprs, NULL, &pSExpr, &num);
×
491
      QUERY_CHECK_CODE(code, lino, _error);
×
492
    
493
      code = initExprSupp(&pExtW->scalarSupp, pSExpr, num, &pTaskInfo->storageAPI.functionStore);
×
494
      if (code != TSDB_CODE_SUCCESS) {
×
495
        goto _error;
×
496
      }
497
    }
498
    
499
    initResultSizeInfo(&pOperator->resultInfo, 512);
×
500
    code = blockDataEnsureCapacity(pResBlock, 512);
×
501
    TSDB_CHECK_CODE(code, lino, _error);
×
502
    
503
    int32_t    numOfExpr = 0;
×
504
    SExprInfo* pExprInfo = NULL;
×
505
    code = createExprInfo(pPhynode->window.pFuncs, NULL, &pExprInfo, &numOfExpr);
×
506
    TSDB_CHECK_CODE(code, lino, _error);
×
507
    
508
    code = initAggSup(&pOperator->exprSupp, &pExtW->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str,
×
509
                              pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
×
510
    TSDB_CHECK_CODE(code, lino, _error);
×
511
    pOperator->exprSupp.hasWindowOrGroup = false;
×
512
    
513
    code = setFunctionResultOutput(pOperator, &pExtW->binfo, &pExtW->aggSup, MAIN_SCAN, numOfExpr);
×
514
    TSDB_CHECK_CODE(code, lino, _error);
×
515
    
516
    code = filterInitFromNode((SNode*)pNode->pConditions, &pOperator->exprSupp.pFilterInfo, 0,
×
517
                              pTaskInfo->pStreamRuntimeInfo);
×
518
    TSDB_CHECK_CODE(code, lino, _error);
×
519
    
520
    pExtW->binfo.pRes = pResBlock;
×
521
    pExtW->binfo.inputTsOrder = pNode->inputTsOrder;
×
522
    pExtW->binfo.outputTsOrder = pNode->outputTsOrder;
×
523
    code = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfExpr, &pExtW->pPseudoColInfo);
×
524
    TSDB_CHECK_CODE(code, lino, _error);
×
525
  }
526

527
  initResultRowInfo(&pExtW->binfo.resultRowInfo);
111✔
528

529
  pOperator->fpSet = createOperatorFpSet(doOpenExternalWindow, externalWindowNext, NULL, destroyExternalWindowOperatorInfo,
111✔
530
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
531
  setOperatorResetStateFn(pOperator, resetExternalWindowOperator);
111✔
532
  code = appendDownstream(pOperator, &pDownstream, 1);
111✔
533
  if (code != 0) {
111!
534
    goto _error;
×
535
  }
536

537
  *pOptrOut = pOperator;
111✔
538
  return code;
111✔
539

540
_error:
×
541

542
  if (pExtW != NULL) {
×
543
    destroyExternalWindowOperatorInfo(pExtW);
×
544
  }
545

546
  destroyOperatorAndDownstreams(pOperator, &pDownstream, 1);
×
547
  pTaskInfo->code = code;
×
548
  qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code));
×
549
  return code;
×
550
}
551

552
int64_t* extractTsCol(SSDataBlock* pBlock, int32_t primaryTsIndex, SExecTaskInfo* pTaskInfo) {
310✔
553
  TSKEY* tsCols = NULL;
310✔
554

555
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
310!
556
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, primaryTsIndex);
310✔
557
    if (!pColDataInfo) {
310!
558
      pTaskInfo->code = terrno;
×
559
      T_LONG_JMP(pTaskInfo->env, terrno);
×
560
    }
561

562
    tsCols = (int64_t*)pColDataInfo->pData;
310✔
563
    if (tsCols[0] == 0) {
310!
564
      qWarn("%s at line %d.block start ts:%" PRId64 ",end ts:%" PRId64, __func__, __LINE__, tsCols[0],
×
565
            tsCols[pBlock->info.rows - 1]);
566
    }
567

568
    if (tsCols[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
310!
569
      int32_t code = blockDataUpdateTsWindow(pBlock, primaryTsIndex);
310✔
570
      if (code != TSDB_CODE_SUCCESS) {
310!
571
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
572
        pTaskInfo->code = code;
×
573
        T_LONG_JMP(pTaskInfo->env, code);
×
574
      }
575
    }
576
  }
577

578
  return tsCols;
310✔
579
}
580

581
static int32_t getExtWinCurIdx(SExecTaskInfo* pTaskInfo) {
1,637✔
582
  return pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx;
1,637✔
583
}
584

585
static void incExtWinCurIdx(SOperatorInfo* pOperator) {
210✔
586
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
210✔
587
  pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx++;
210✔
588
}
210✔
589

590
static void setExtWinCurIdx(SOperatorInfo* pOperator, int32_t idx) {
860✔
591
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
860✔
592
  pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx = idx;
860✔
593
}
860✔
594

595

596
static void incExtWinOutIdx(SOperatorInfo* pOperator) {
×
597
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
×
598
  pTaskInfo->pStreamRuntimeInfo->funcInfo.curOutIdx++;
×
599
}
×
600

601
static const STimeWindow* getExtWindow(SOperatorInfo* pOperator, SExternalWindowOperator* pExtW, TSKEY ts) {
310✔
602
  // TODO handle desc order
603
  for (int32_t i = 0; i < pExtW->pWins->size; ++i) {
816!
604
    const STimeWindow* pWin = taosArrayGet(pExtW->pWins, i);
816✔
605
    if (ts >= pWin->skey && ts < pWin->ekey) {
816!
606
      setExtWinCurIdx(pOperator, i);
310✔
607
      return pWin;
310✔
608
    }
609
  }
610
  return NULL;
×
611
}
612

613
static const STimeWindow* getExtNextWindow(SExternalWindowOperator* pExtW, SExecTaskInfo* pTaskInfo) {
520✔
614
  int32_t curIdx = getExtWinCurIdx(pTaskInfo);
520✔
615
  if (curIdx + 1 >= pExtW->pWins->size) return NULL;
520✔
616
  return taosArrayGet(pExtW->pWins, curIdx + 1);
339✔
617
}
618

619
static int32_t getNextStartPos(STimeWindow win, const SDataBlockInfo* pBlockInfo, int32_t lastEndPos, int32_t order, int32_t* nextPos, int64_t* tsCol) {
339✔
620
  bool ascQuery = order == TSDB_ORDER_ASC;
339✔
621

622
  if (win.ekey <= pBlockInfo->window.skey && ascQuery) {
339!
623
    return -2;
×
624
  }
625
//if (win.skey > pBlockInfo->window.ekey && !ascQuery) return -2;
626

627
  if (win.skey > pBlockInfo->window.ekey && ascQuery) return -1;
339!
628
//if (win.ekey < pBlockInfo->window.skey && !ascQuery) return -1;
629

630
  while (true) {
631
    if (win.ekey <= tsCol[lastEndPos + 1] && ascQuery) return -2;
210!
632
    if (win.skey <= tsCol[lastEndPos + 1] && ascQuery) break;
193!
633
    lastEndPos++;
×
634
  }
635

636
  *nextPos = lastEndPos + 1;
193✔
637
  return 0;
193✔
638
}
639

640
static int32_t setExtWindowOutputBuf(SResultRowInfo* pResultRowInfo, const STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
550✔
641
                                     int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
642
                                     SExecTaskInfo* pTaskInfo) {
643
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
550✔
644
                                                  true, tableGroupId, pTaskInfo, true, pAggSup, true);
645

646
  if (pResultRow == NULL || pTaskInfo->code != 0) {
550!
647
    *pResult = NULL;
×
648
    qError("failed to set result output buffer, error:%s", tstrerror(pTaskInfo->code));
×
649
    return pTaskInfo->code;
×
650
  }
651

652
  qDebug("current result rows num:%d", tSimpleHashGetSize(pAggSup->pResultRowHashTable));
550✔
653

654
  // set time window for current result
655
  TAOS_SET_POBJ_ALIGNED(&pResultRow->win, win);
550✔
656
  *pResult = pResultRow;
550✔
657
  pTaskInfo->code = setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
550✔
658
  if (pTaskInfo->code) {
550!
659
    *pResult = NULL;
×
660
    qError("failed to set result row ctx, error:%s", tstrerror(pTaskInfo->code));
×
661
    return pTaskInfo->code;
×
662
  }
663

664
  return TSDB_CODE_SUCCESS;
550✔
665
}
666

667
static int32_t extWindowDoHashAgg(SOperatorInfo* pOperator, int32_t startPos, int32_t forwardRows,
550✔
668
                                  SSDataBlock* pInputBlock) {
669
  if (forwardRows == 0) return 0;
550!
670
  SExprSupp*               pSup = &pOperator->exprSupp;
550✔
671
  SExternalWindowOperator* pExtW = pOperator->info;
550✔
672
  return applyAggFunctionOnPartialTuples(pOperator->pTaskInfo, pSup->pCtx, &pExtW->twAggSup.timeWindowData, startPos,
550✔
673
                                         forwardRows, pInputBlock->info.rows, pSup->numOfExprs);
550✔
674
}
675

676
static SSDataBlock* extWindowGetOutputBlock(SOperatorInfo* pOperator, int32_t winIdx) {
×
677
  SExternalWindowOperator* pExtW = pOperator->info;
×
678
  SSDataBlock*             pBlock = NULL;
×
679
  SBlockList*              pBlockList = taosArrayGet(pExtW->pOutputBlocks, winIdx);
×
680
  int32_t                  code = blockListGetLastBlock(pBlockList, &pBlock);
×
681
  if (code != 0) terrno = code;
×
682
  return pBlock;
×
683
}
684

685
static int32_t extWindowCopyRows(SOperatorInfo* pOperator, SSDataBlock* pInputBlock, int32_t startPos,
×
686
                                 int32_t forwardRows) {
687
  if (forwardRows == 0) return 0;
×
688
  SExternalWindowOperator* pExtW = pOperator->info;
×
689
  SSDataBlock*             pResBlock = extWindowGetOutputBlock(pOperator, getExtWinCurIdx(pOperator->pTaskInfo));
×
690
  SExprSupp*               pExprSup = &pExtW->scalarSupp;
×
691

692
  if (!pResBlock) {
×
693
    qError("%s failed to get output block for ext window:%s", GET_TASKID(pOperator->pTaskInfo), tstrerror(terrno));
×
694
    return terrno;
×
695
  }
696
  int32_t rowsToCopy = forwardRows;
×
697
  int32_t code = 0;
×
698
  if (!pExtW->pTmpBlock)
×
699
    code = createOneDataBlock(pInputBlock, false, &pExtW->pTmpBlock);
×
700
  else
701
    blockDataCleanup(pExtW->pTmpBlock);
×
702
  if (code) {
×
703
    qError("%s failed to create datablock:%s", GET_TASKID(pOperator->pTaskInfo), tstrerror(terrno));
×
704
    return code;
×
705
  }
706
  code = blockDataEnsureCapacity(pExtW->pTmpBlock, TMAX(1, rowsToCopy));
×
707
  if (code) {
×
708
    qError("%s failed to ensure capacity:%s", GET_TASKID(pOperator->pTaskInfo), tstrerror(terrno));
×
709
    return code;
×
710
  }
711
  if (rowsToCopy > 0) {
×
712
    code = blockDataMergeNRows(pExtW->pTmpBlock, pInputBlock, startPos, rowsToCopy);
×
713
    if (code == 0) {
×
714
      code = projectApplyFunctionsWithSelect(pExprSup->pExprInfo, pResBlock, pExtW->pTmpBlock, pExprSup->pCtx, pExprSup->numOfExprs,
×
715
          NULL, GET_STM_RTINFO(pOperator->pTaskInfo), true);
×
716
    }
717
  }
718

719
  if (code != 0) return code;
×
720

721
  return code;
×
722
}
723

724
static int32_t hashExternalWindowProject(SOperatorInfo* pOperator, SSDataBlock* pInputBlock) {
×
725
  SExprSupp*               pSup = &pOperator->exprSupp;
×
726
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
×
727
  SExternalWindowOperator* pExtW = pOperator->info;
×
728
  SqlFunctionCtx*          pCtx = NULL;
×
729
  int64_t*                 tsCol = extractTsCol(pInputBlock, pExtW->primaryTsIndex, pTaskInfo);
×
730
  TSKEY                    ts = getStartTsKey(&pInputBlock->info.window, tsCol);
×
731
  const STimeWindow*       pWin = getExtWindow(pOperator, pExtW, ts);
×
732
  bool                     ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
×
733
  int32_t                  tsOrder = pExtW->binfo.inputTsOrder;
×
734
  int32_t startPos = 0;
×
735

736
  if (!pWin) return 0;
×
737
  TSKEY   ekey = ascScan ? pWin->ekey : pWin->skey;
×
738
  int32_t forwardRows =
739
      getNumOfRowsInTimeWindow(&pInputBlock->info, tsCol, startPos, ekey-1, binarySearchForKey, NULL, tsOrder);
×
740

741
  int32_t code = extWindowCopyRows(pOperator, pInputBlock, startPos, forwardRows);
×
742

743
  while (code == 0 && pInputBlock->info.rows > startPos + forwardRows) {
×
744
    pWin = getExtNextWindow(pExtW, pTaskInfo);
×
745
    if (!pWin) break;
×
746
    incExtWinCurIdx(pOperator);
×
747

748
    startPos = startPos + forwardRows;
×
749
    ekey = ascScan ? pWin->ekey : pWin->skey;
×
750
    forwardRows =
751
        getNumOfRowsInTimeWindow(&pInputBlock->info, tsCol, startPos, ekey-1, binarySearchForKey, NULL, tsOrder);
×
752
    code = extWindowCopyRows(pOperator, pInputBlock, startPos, forwardRows);
×
753
  }
754
  return code;
×
755
}
756

757
static int32_t extWindowDoIndefRows(SOperatorInfo* pOperator, SSDataBlock* pRes, SSDataBlock* pBlock) {
×
758
  SExternalWindowOperator* pExtW = pOperator->info;
×
759
  SOptrBasicInfo*     pInfo = &pExtW->binfo;
×
760
  SExprSupp*          pSup = &pOperator->exprSupp;
×
761
  SExecTaskInfo*      pTaskInfo = pOperator->pTaskInfo;
×
762
  int32_t order = pInfo->inputTsOrder;
×
763
  int32_t scanFlag = pBlock->info.scanFlag;
×
764
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
×
765

766
  SExprSupp* pScalarSup = &pExtW->scalarSupp;
×
767
  if (pScalarSup->pExprInfo != NULL) {
×
768
    TAOS_CHECK_EXIT(projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
×
769
                                 pExtW->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo)));
770
  }
771

772
  TAOS_CHECK_EXIT(setInputDataBlock(pSup, pBlock, order, scanFlag, false));
×
773

774
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pRes, pRes->info.rows + pBlock->info.rows));
×
775

776
  TAOS_CHECK_EXIT(projectApplyFunctions(pSup->pExprInfo, pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
×
777
                               pExtW->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo)));
778

779
_exit:
×
780

781
  if (code) {
×
782
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __FUNCTION__, lino, tstrerror(code));
×
783
  }
784

785
  return code;
×
786
}
787

788

789
static int32_t extWindowHandleIndefRows(SOperatorInfo* pOperator, SSDataBlock* pInputBlock, int32_t startPos,
×
790
                                 int32_t forwardRows) {
791
  if (forwardRows == 0) return 0;
×
792
  SExternalWindowOperator* pExtW = pOperator->info;
×
793
  SSDataBlock*             pResBlock = extWindowGetOutputBlock(pOperator, getExtWinCurIdx(pOperator->pTaskInfo));
×
794

795
  if (!pResBlock) {
×
796
    qError("%s failed to get output block for ext window:%s", GET_TASKID(pOperator->pTaskInfo), tstrerror(terrno));
×
797
    return terrno;
×
798
  }
799
  
800
  int32_t rowsToCopy = forwardRows;
×
801
  int32_t code = 0;
×
802
  if (!pExtW->pTmpBlock)
×
803
    code = createOneDataBlock(pInputBlock, false, &pExtW->pTmpBlock);
×
804
  else
805
    blockDataCleanup(pExtW->pTmpBlock);
×
806
    
807
  if (code) {
×
808
    qError("%s failed to create datablock:%s", GET_TASKID(pOperator->pTaskInfo), tstrerror(terrno));
×
809
    return code;
×
810
  }
811
  code = blockDataEnsureCapacity(pExtW->pTmpBlock, TMAX(1, rowsToCopy));
×
812
  if (code) {
×
813
    qError("%s failed to ensure capacity:%s", GET_TASKID(pOperator->pTaskInfo), tstrerror(terrno));
×
814
    return code;
×
815
  }
816
  
817
  if (rowsToCopy > 0) {
×
818
    code = blockDataMergeNRows(pExtW->pTmpBlock, pInputBlock, startPos, rowsToCopy);
×
819
    if (code == 0) {
×
820
      code = extWindowDoIndefRows(pOperator, pResBlock, pExtW->pTmpBlock);
×
821
    }
822
  }
823

824
  return code;
×
825
}
826

827

828
static int32_t hashExternalWindowIndefRows(SOperatorInfo* pOperator, SSDataBlock* pInputBlock) {
×
829
  SExprSupp*               pSup = &pOperator->exprSupp;
×
830
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
×
831
  SExternalWindowOperator* pExtW = pOperator->info;
×
832
  SqlFunctionCtx*          pCtx = NULL;
×
833
  int64_t*                 tsCol = extractTsCol(pInputBlock, pExtW->primaryTsIndex, pTaskInfo);
×
834
  TSKEY                    ts = getStartTsKey(&pInputBlock->info.window, tsCol);
×
835
  const STimeWindow*       pWin = getExtWindow(pOperator, pExtW, ts);
×
836
  bool                     ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
×
837
  int32_t                  tsOrder = pExtW->binfo.inputTsOrder;
×
838
  int32_t                  startPos = 0;
×
839

840
  if (!pWin) return 0;
×
841
  TSKEY   ekey = ascScan ? pWin->ekey : pWin->skey;
×
842
  int32_t forwardRows =
843
      getNumOfRowsInTimeWindow(&pInputBlock->info, tsCol, startPos, ekey-1, binarySearchForKey, NULL, tsOrder);
×
844

845
  int32_t code = extWindowHandleIndefRows(pOperator, pInputBlock, startPos, forwardRows);
×
846

847
  while (code == 0 && pInputBlock->info.rows > startPos + forwardRows) {
×
848
    pWin = getExtNextWindow(pExtW, pTaskInfo);
×
849
    if (!pWin) break;
×
850
    incExtWinCurIdx(pOperator);
×
851

852
    startPos = startPos + forwardRows;
×
853
    ekey = ascScan ? pWin->ekey : pWin->skey;
×
854
    forwardRows =
855
        getNumOfRowsInTimeWindow(&pInputBlock->info, tsCol, startPos, ekey-1, binarySearchForKey, NULL, tsOrder);
×
856
    code = extWindowHandleIndefRows(pOperator, pInputBlock, startPos, forwardRows);
×
857
  }
858
  
859
  return code;
×
860
}
861

862

863
static int32_t extWindowProcessEmptyWins(SOperatorInfo* pOperator, SSDataBlock* pBlock, bool allRemains) {
614✔
864
  int32_t code = 0, lino = 0;
614✔
865
  SExternalWindowOperator* pExtW = (SExternalWindowOperator*)pOperator->info;
614✔
866
  bool ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
614✔
867
  int32_t currIdx = getExtWinCurIdx(pOperator->pTaskInfo);
614✔
868
  SExprSupp* pSup = &pOperator->exprSupp;
614✔
869

870
  if (NULL == pExtW->pEmptyInputBlock) {
614✔
871
    goto _exit;
3✔
872
  }
873

874
  int32_t endIdx = allRemains ? (pExtW->pWins->size - 1) : (currIdx - 1);
611✔
875
  SResultRowInfo* pResultRowInfo = &pExtW->binfo.resultRowInfo;
611✔
876
  SResultRow* pResult = NULL;
611✔
877
  SSDataBlock* pInput = pExtW->pEmptyInputBlock;
611✔
878

879
  if ((pExtW->lastWinId + 1) <= endIdx) {
611✔
880
    TAOS_CHECK_EXIT(setInputDataBlock(pSup, pExtW->pEmptyInputBlock, pExtW->binfo.inputTsOrder, MAIN_SCAN, true));
30!
881
  }
882
  
883
  for (int32_t i = pExtW->lastWinId + 1; i <= endIdx; ++i) {
658✔
884
    const STimeWindow* pWin = taosArrayGet(pExtW->pWins, i);
47✔
885

886
    setExtWinCurIdx(pOperator, i);
47✔
887
    qDebug("%s %dth ext empty window start:%" PRId64 ", end:%" PRId64 ", ascScan:%d",
47✔
888
           GET_TASKID(pOperator->pTaskInfo), i, pWin->skey, pWin->ekey, ascScan);
889

890
    TAOS_CHECK_EXIT(setExtWindowOutputBuf(pResultRowInfo, pWin, &pResult, pInput->info.id.groupId, pSup->pCtx, pSup->numOfExprs,
47!
891
                                pSup->rowEntryInfoOffset, &pExtW->aggSup, pOperator->pTaskInfo));
892

893
    updateTimeWindowInfo(&pExtW->twAggSup.timeWindowData, pWin, 1);
47✔
894
    code = extWindowDoHashAgg(pOperator, 0, 1, pInput);
47✔
895
    pExtW->lastWinId = i;  
47✔
896
    TAOS_CHECK_EXIT(code);
47!
897
  }
898

899
  
900
_exit:
611✔
901

902
  if (code) {
614!
903
    qError("%s %s failed at line %d since %s", pOperator->pTaskInfo->id.str, __FUNCTION__, lino, tstrerror(code));
×
904
  } else {
905
    if (pBlock) {
614✔
906
      TAOS_CHECK_EXIT(setInputDataBlock(pSup, pBlock, pExtW->binfo.inputTsOrder, MAIN_SCAN, true));
503!
907
    }
908

909
    if (!allRemains) {
614✔
910
      setExtWinCurIdx(pOperator, currIdx);  
503✔
911
    }
912
  }
913

914
  return code;
614✔
915
}
916

917
static void hashExternalWindowAgg(SOperatorInfo* pOperator, SSDataBlock* pInputBlock) {
310✔
918
  SExternalWindowOperator* pExtW = (SExternalWindowOperator*)pOperator->info;
310✔
919
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
310✔
920
  SExprSupp*               pSup = &pOperator->exprSupp;
310✔
921
  SResultRowInfo*          pResultRowInfo = &pExtW->binfo.resultRowInfo;
310✔
922
  int32_t                  startPos = 0;
310✔
923
  int32_t                  numOfOutput = pSup->numOfExprs;
310✔
924
  int64_t*                 tsCols = extractTsCol(pInputBlock, pExtW->primaryTsIndex, pTaskInfo);
310✔
925
  bool                     ascScan = pExtW->binfo.inputTsOrder == TSDB_ORDER_ASC;
310✔
926
  TSKEY                    ts = getStartTsKey(&pInputBlock->info.window, tsCols);
310✔
927
  SResultRow*              pResult = NULL;
310✔
928
  int32_t                  ret = 0;
310✔
929

930
  const STimeWindow* pWin = getExtWindow(pOperator, pExtW, ts);
310✔
931
  if (pWin == NULL) {
310!
932
    qError("%s failed to get time window for ts:%" PRId64 ", error:%s", GET_TASKID(pOperator->pTaskInfo), ts, tstrerror(terrno));
×
933
    return;
×
934
  }
935

936
  ret = extWindowProcessEmptyWins(pOperator, pInputBlock, false);
310✔
937
  if (ret != 0) {
310!
938
    T_LONG_JMP(pTaskInfo->env, ret);
×
939
  }
940
  
941
  qDebug("%s ext window1 start:%" PRId64 ", end:%" PRId64 ", ts:%" PRId64 ", ascScan:%d",
310✔
942
         GET_TASKID(pOperator->pTaskInfo), pWin->skey, pWin->ekey, ts, ascScan);        
943

944
  SExprSupp* pScalarSup = &pExtW->scalarSupp;
310✔
945
  if (pScalarSup->pExprInfo != NULL) {
310!
946
    ret = projectApplyFunctions(pScalarSup->pExprInfo, pInputBlock, pInputBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
×
947
                                 pExtW->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo));
×
948
    if (ret != 0) {
×
949
      T_LONG_JMP(pTaskInfo->env, ret);
×
950
    }
951
  }
952

953
  STimeWindow win = *pWin;
310✔
954
  ret = setExtWindowOutputBuf(pResultRowInfo, &win, &pResult, pInputBlock->info.id.groupId, pSup->pCtx, numOfOutput,
310✔
955
                              pSup->rowEntryInfoOffset, &pExtW->aggSup, pTaskInfo);
956
  if (ret != 0 || !pResult) {
310!
957
    T_LONG_JMP(pTaskInfo->env, ret);
×
958
  }
959

960
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
310!
961
  int32_t forwardRows = getNumOfRowsInTimeWindow(&pInputBlock->info, tsCols, startPos, ekey - 1, binarySearchForKey, NULL,
310✔
962
                                                 pExtW->binfo.inputTsOrder);
963

964
  updateTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &win, 1);
310✔
965
  ret = extWindowDoHashAgg(pOperator, startPos, forwardRows, pInputBlock);
310✔
966
  pExtW->lastWinId = getExtWinCurIdx(pTaskInfo);
310✔
967

968
  if (ret != 0) {
310!
969
    T_LONG_JMP(pTaskInfo->env, ret);
×
970
  }
971

972
  int32_t nextPosGot = 0;
310✔
973
  while (1) {
210✔
974
    int32_t prevEndPos = forwardRows + startPos - 1;
520✔
975
    if (prevEndPos >= pInputBlock->info.rows) {
520!
976
      break;
×
977
    }
978
    
979
    pWin = getExtNextWindow(pExtW, pTaskInfo);
520✔
980
    if (!pWin)
520✔
981
      break;
181✔
982
    else
983
      win = *pWin;
339✔
984
      
985
    qDebug("%s ext window2 start:%" PRId64 ", end:%" PRId64 ", ts:%" PRId64 ", ascScan:%d",
339✔
986
           GET_TASKID(pOperator->pTaskInfo), win.skey, win.ekey, ts, ascScan);
987
           
988
    nextPosGot = getNextStartPos(win, &pInputBlock->info, prevEndPos, pExtW->binfo.inputTsOrder, &startPos, tsCols);
339✔
989
    if (-1 == nextPosGot) {
339✔
990
      qDebug("%s ignore current block", GET_TASKID(pOperator->pTaskInfo));
129!
991
      break;
129✔
992
    }
993
    if (-2 == nextPosGot) {
210✔
994
      qDebug("%s skip current window", GET_TASKID(pOperator->pTaskInfo));
17!
995
      incExtWinCurIdx(pOperator);
17✔
996
      continue;
17✔
997
    }
998
    
999
    incExtWinCurIdx(pOperator);
193✔
1000

1001
    ret = extWindowProcessEmptyWins(pOperator, pInputBlock, false);
193✔
1002
    if (ret != 0) {
193!
1003
      T_LONG_JMP(pTaskInfo->env, ret);
×
1004
    }
1005

1006
    ekey = ascScan ? win.ekey : win.skey;
193!
1007
    forwardRows = getNumOfRowsInTimeWindow(&pInputBlock->info, tsCols, startPos, ekey - 1, binarySearchForKey, NULL,
193✔
1008
                                           pExtW->binfo.inputTsOrder);
1009

1010
    ret = setExtWindowOutputBuf(pResultRowInfo, &win, &pResult, pInputBlock->info.id.groupId, pSup->pCtx,
193✔
1011
                                numOfOutput, pSup->rowEntryInfoOffset, &pExtW->aggSup, pTaskInfo);
1012
    if (ret != 0 || !pResult) T_LONG_JMP(pTaskInfo->env, ret);
193!
1013

1014
    updateTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &win, 1);
193✔
1015
    ret = extWindowDoHashAgg(pOperator, startPos, forwardRows, pInputBlock);
193✔
1016
    pExtW->lastWinId = getExtWinCurIdx(pTaskInfo);  
193✔
1017
    if (ret != 0) {
193!
1018
      T_LONG_JMP(pTaskInfo->env, ret);
×
1019
    }
1020
  }
1021
}
1022

1023

1024
static int32_t doOpenExternalWindow(SOperatorInfo* pOperator) {
114✔
1025
  if (OPTR_IS_OPENED(pOperator)) return TSDB_CODE_SUCCESS;
114!
1026
  int32_t                  code = 0;
114✔
1027
  int32_t                  lino = 0;
114✔
1028
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
114✔
1029
  SOperatorInfo*           pDownstream = pOperator->pDownstream[0];
114✔
1030
  SExternalWindowOperator* pExtW = pOperator->info;
114✔
1031
  SExprSupp*               pSup = &pOperator->exprSupp;
114✔
1032
  pTaskInfo->pStreamRuntimeInfo->funcInfo.extWinProjMode = (pExtW->mode != EEXT_MODE_AGG);
114✔
1033

1034
  int32_t scanFlag = MAIN_SCAN;
114✔
1035
  int64_t st = taosGetTimestampUs();
114✔
1036

1037
  if (!pExtW->pWins) {
114!
1038
    size_t size = taosArrayGetSize(pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPesudoFuncVals);
114✔
1039
    pExtW->pWins = taosArrayInit(size, sizeof(STimeWindow));
114✔
1040
    if (!pExtW->pWins) QUERY_CHECK_CODE(terrno, lino, _exit);
114!
1041
    pExtW->pOutputBlocks = taosArrayInit(size, sizeof(SBlockList));
114✔
1042
    if (!pExtW->pOutputBlocks) QUERY_CHECK_CODE(terrno, lino, _exit);
114!
1043
    // pExtW->pOffsetList = taosArrayInit(size, sizeof(SLimitInfo));
1044
    // if (!pExtW->pOffsetList) QUERY_CHECK_CODE(terrno, lino, _end);
1045
    for (int32_t i = 0; i < size; ++i) {
475✔
1046
      SSTriggerCalcParam* pParam = taosArrayGet(pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPesudoFuncVals, i);
361✔
1047
      STimeWindow win = {.skey = pParam->wstart, .ekey = pParam->wend};
361✔
1048
      if (pTaskInfo->pStreamRuntimeInfo->funcInfo.triggerType != STREAM_TRIGGER_SLIDING) {
361✔
1049
        win.ekey++;
156✔
1050
      }
1051
      TSDB_CHECK_NULL(taosArrayPush(pExtW->pWins, &win), code, lino, _exit, terrno);
722!
1052

1053
      SBlockList bl = {.pSrcBlock = pExtW->binfo.pRes, .pBlocks = 0, .blockRowNumThreshold = 4096};
361✔
1054
      code = blockListInit(&bl, 4096);
361✔
1055
      if (code != 0) QUERY_CHECK_CODE(code, lino, _exit);
361!
1056
      TSDB_CHECK_NULL(taosArrayPush(pExtW->pOutputBlocks, &bl), code, lino, _exit, terrno);
722!
1057
    }
1058
    pExtW->outputWinId = pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx;
114✔
1059
    pExtW->lastWinId = -1;
114✔
1060
  }
1061

1062
  while (1) {
310✔
1063
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
424✔
1064
    if (pBlock == NULL) {
421✔
1065
      if (EEXT_MODE_AGG == pExtW->mode) {
111!
1066
        TAOS_CHECK_EXIT(extWindowProcessEmptyWins(pOperator, pBlock, true));
111!
1067
      }
1068
      break;
111✔
1069
    }
1070

1071
    printDataBlock(pBlock, __func__, "externalwindow");
310✔
1072

1073
    qDebug("ext windowpExtW->mode:%d", pExtW->mode);
310✔
1074
    switch (pExtW->mode) {
310!
1075
      case EEXT_MODE_SCALAR:
×
1076
        TAOS_CHECK_EXIT(hashExternalWindowProject(pOperator, pBlock));
×
1077
        break;
×
1078
      case EEXT_MODE_AGG:
310✔
1079
        hashExternalWindowAgg(pOperator, pBlock);
310✔
1080
        break;
310✔
1081
      case EEXT_MODE_INDEFR_FUNC:
×
1082
        TAOS_CHECK_EXIT(hashExternalWindowIndefRows(pOperator, pBlock));
×
1083
        break;
×
1084
      default:
×
1085
        break;
×
1086
    }
1087
  }
1088

1089
  OPTR_SET_OPENED(pOperator);
111✔
1090

1091
  if (pExtW->mode == EEXT_MODE_AGG) {
111!
1092
    qDebug("ext window before dump final rows num:%d", tSimpleHashGetSize(pExtW->aggSup.pResultRowHashTable));
111✔
1093

1094
    code = initGroupedResultInfo(&pExtW->groupResInfo, pExtW->aggSup.pResultRowHashTable, pExtW->binfo.inputTsOrder);
111✔
1095
    QUERY_CHECK_CODE(code, lino, _exit);
111!
1096

1097
    qDebug("ext window after dump final rows num:%d", tSimpleHashGetSize(pExtW->aggSup.pResultRowHashTable));
111✔
1098
  }
1099

1100
_exit:
19✔
1101

1102
  if (code != 0) {
111!
1103
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
1104
    pTaskInfo->code = code;
×
1105
    T_LONG_JMP(pTaskInfo->env, code);
×
1106
  }
1107
  return code;
111✔
1108
}
1109

1110
static int32_t externalWindowNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
194✔
1111
  int32_t                  code = 0;
194✔
1112
  int32_t                  lino = 0;
194✔
1113
  SExternalWindowOperator* pExtW = pOperator->info;
194✔
1114
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
194✔
1115
  SExprSupp*               pSup = &pOperator->exprSupp;
194✔
1116

1117
  if (pOperator->status == OP_EXEC_DONE) {
194✔
1118
    *ppRes = NULL;
80✔
1119
    return code;
80✔
1120
  }
1121

1122
  SSDataBlock* pBlock = pExtW->binfo.pRes;
114✔
1123
  blockDataCleanup(pBlock);
114✔
1124
  code = pOperator->fpSet._openFn(pOperator);
114✔
1125
  QUERY_CHECK_CODE(code, lino, _end);
111!
1126

1127
  while (1) {
1128
    if (pExtW->mode == EEXT_MODE_SCALAR || pExtW->mode == EEXT_MODE_INDEFR_FUNC) {
111!
1129
      if (pExtW->outputWinId >= taosArrayGetSize(pExtW->pOutputBlocks)) {
×
1130
        pBlock->info.rows = 0;
×
1131
        break;
×
1132
      }
1133
      SBlockList* pList = taosArrayGet(pExtW->pOutputBlocks, pExtW->outputWinId);
×
1134
      // SLimitInfo* pLimitInfo = taosArrayGet(pExtW->pOffsetList, pExtW->outputWinId);
1135
      if (pExtW->pOutputBlockListNode == NULL) {
×
1136
        pExtW->pOutputBlockListNode = tdListGetHead(pList->pBlocks);
×
1137
      } else {
1138
        if (!pExtW->pOutputBlockListNode->dl_next_) {
×
1139
          pExtW->pOutputBlockListNode = NULL;
×
1140
          pExtW->outputWinId++;
×
1141
          incExtWinOutIdx(pOperator);
×
1142
          continue;
×
1143
        }
1144
        pExtW->pOutputBlockListNode = pExtW->pOutputBlockListNode->dl_next_;
×
1145
      }
1146

1147
      if (pExtW->pOutputBlockListNode) {
×
1148
        code = blockDataMerge(pBlock, *(SSDataBlock**)pExtW->pOutputBlockListNode->data);
×
1149
        if (code != 0) goto _end;
×
1150

1151
        // int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock);
1152
        // if (status == EXTERNAL_RETRIEVE_CONTINUE) {
1153
        //   continue;
1154
        // } else if (status == EXTERNAL_RETRIEVE_NEXT_WINDOW) {
1155
        //   pExtW->pOutputBlockListNode = NULL;
1156
        //   pExtW->outputWinId++;
1157
        //   incExtWinOutIdx(pOperator);
1158
        //   continue;
1159
        // } else if (status == EXTERNAL_RETRIEVE_DONE) {
1160
        //   // do nothing, just break
1161
        // }
1162
      } else {
1163
        pExtW->outputWinId++;
×
1164
        incExtWinOutIdx(pOperator);
×
1165
        continue;
×
1166
      }
1167
      break;
×
1168
    } else {
1169
      doBuildResultDatablock(pOperator, &pExtW->binfo, &pExtW->groupResInfo, pExtW->aggSup.pResultBuf);
111✔
1170
      bool hasRemain = hasRemainResults(&pExtW->groupResInfo);
111✔
1171
      if (!hasRemain) {
111!
1172
        setOperatorCompleted(pOperator);
111✔
1173
        break;
111✔
1174
      }
UNCOV
1175
      if (pExtW->binfo.pRes->info.rows > 0) break;
×
1176
    }
1177
  }
1178

1179
  pOperator->resultInfo.totalRows += pExtW->binfo.pRes->info.rows;
111✔
1180
  
1181
_end:
111✔
1182

1183
  if (code != 0) {
111!
1184
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1185
    pTaskInfo->code = code;
×
1186
    T_LONG_JMP(pTaskInfo->env, code);
×
1187
  }
1188
  (*ppRes) = (pExtW->binfo.pRes->info.rows == 0) ? NULL : pExtW->binfo.pRes;
111!
1189

1190
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM && (*ppRes)) {
111!
1191
    printDataBlock(*ppRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
111✔
1192
  }
1193
  
1194
  return code;
111✔
1195
}
1196

1197
static int32_t setSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, const STimeWindow* pWin, SResultRow** pResult,
×
1198
                                       SExprSupp* pExprSup, SAggSupporter* pAggSup) {
1199
  if (*pResult == NULL) {
×
1200
    *pResult = getNewResultRow(pAggSup->pResultBuf, &pAggSup->currentPageId, pAggSup->resultRowSize);
×
1201
    if (!*pResult) {
×
1202
      qError("get new resultRow failed, err:%s", tstrerror(terrno));
×
1203
      return terrno;
×
1204
    }
1205
    pResultRowInfo->cur = (SResultRowPosition){.pageId = (*pResult)->pageId, .offset = (*pResult)->offset};
×
1206
  }
1207
  (*pResult)->win = *pWin;
×
1208
  return setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
×
1209
}
1210

1211
static int32_t doMergeAlignExtWindowAgg(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock, SSDataBlock* pResultBlock) {
×
1212
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
×
1213
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
×
1214

1215
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
×
1216
  SExprSupp*     pSup = &pOperator->exprSupp;
×
1217
  int32_t        code = 0;
×
1218

1219
  int32_t startPos = 0;
×
1220
  int64_t* tsCols = extractTsCol(pBlock, pExtW->primaryTsIndex, pTaskInfo);
×
1221
  TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
×
1222

1223
  const STimeWindow *pWin = getExtWindow(pOperator, pExtW, ts);
×
1224
  if (pWin == NULL) {
×
1225
    qError("failed to get time window for ts:%" PRId64 ", index:%d, error:%s", ts, pExtW->primaryTsIndex, tstrerror(terrno));
×
1226
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_INVALID_PARA);
×
1227
  }
1228
  code = setSingleOutputTupleBuf(pResultRowInfo, pWin, &pMlExtInfo->pResultRow, pSup, &pExtW->aggSup);
×
1229
  if (code != 0 || pMlExtInfo->pResultRow == NULL) {
×
1230
    T_LONG_JMP(pTaskInfo->env, code);
×
1231
  }
1232

1233
  int32_t currPos = startPos;
×
1234
  pMlExtInfo->curTs = pWin->skey;
×
1235
  while (++currPos < pBlock->info.rows) {
×
1236
    if (tsCols[currPos] == pMlExtInfo->curTs) continue;
×
1237

1238
    qDebug("current ts:%" PRId64 ", startPos:%d, currPos:%d, tsCols[currPos]:%" PRId64,
×
1239
      pMlExtInfo->curTs, startPos, currPos, tsCols[currPos]); 
1240
    code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pExtW->twAggSup.timeWindowData, startPos,
×
1241
                                           currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
×
1242
    if (code != 0) {
×
1243
      T_LONG_JMP(pTaskInfo->env, code);
×
1244
    }
1245

1246
    finalizeResultRows(pExtW->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
×
1247
    resetResultRow(pMlExtInfo->pResultRow, pExtW->aggSup.resultRowSize - sizeof(SResultRow));
×
1248

1249
    pWin = getExtNextWindow(pExtW, pTaskInfo);
×
1250
    if (!pWin) break;
×
1251
    qDebug("ext window align2 start:%" PRId64 ", end:%" PRId64, pWin->skey, pWin->ekey);
×
1252
    incExtWinCurIdx(pOperator);
×
1253
    startPos = currPos;
×
1254
    code = setSingleOutputTupleBuf(pResultRowInfo, pWin, &pMlExtInfo->pResultRow, pSup, &pExtW->aggSup);
×
1255
    if (code != 0 || pMlExtInfo->pResultRow == NULL) {
×
1256
      T_LONG_JMP(pTaskInfo->env, code);
×
1257
    }
1258
    pMlExtInfo->curTs = pWin->skey;
×
1259
  }
1260

1261
  code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pExtW->twAggSup.timeWindowData, startPos,
×
1262
                                         currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
×
1263
  if (code != 0) {
×
1264
    T_LONG_JMP(pTaskInfo->env, code);
×
1265
  }
1266
  return code;
×
1267
}
1268

1269
static int32_t doMergeAlignExtWindowProject(SOperatorInfo* pOperator, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
×
1270
                                            SSDataBlock* pResultBlock) {
1271
  SExternalWindowOperator* pExtW = pOperator->info;
×
1272
  SExprSupp*               pExprSup = &pExtW->scalarSupp;
×
1273
  int32_t code = projectApplyFunctions(pExprSup->pExprInfo, pResultBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
×
1274
                        GET_STM_RTINFO(pOperator->pTaskInfo));
×
1275
  return code;
×
1276
}
1277

1278
void doMergeAlignExternalWindow(SOperatorInfo* pOperator) {
×
1279
  SExecTaskInfo*                       pTaskInfo = pOperator->pTaskInfo;
×
1280
  SMergeAlignedExternalWindowOperator* pMlExtInfo = pOperator->info;
×
1281
  SExternalWindowOperator*             pExtW = pMlExtInfo->pExtW;
×
1282
  SResultRow*                          pResultRow = NULL;
×
1283
  int32_t                              code = 0;
×
1284
  SSDataBlock*                         pRes = pExtW->binfo.pRes;
×
1285
  SExprSupp*                           pSup = &pOperator->exprSupp;
×
1286
  int32_t                              lino = 0;
×
1287

1288
  while (1) {
×
1289
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
×
1290

1291
    if (pBlock == NULL) {
×
1292
      // close last time window
1293
      if (pMlExtInfo->curTs != INT64_MIN && EEXT_MODE_AGG == pExtW->mode) {
×
1294
        finalizeResultRows(pMlExtInfo->pExtW->aggSup.pResultBuf, &pExtW->binfo.resultRowInfo.cur, pSup, pRes, pTaskInfo);
×
1295
        resetResultRow(pMlExtInfo->pResultRow,pExtW->aggSup.resultRowSize - sizeof(SResultRow));
×
1296
      }
1297
      setOperatorCompleted(pOperator);
×
1298
      break;
×
1299
    }
1300

1301
    pRes->info.scanFlag = pBlock->info.scanFlag;
×
1302
    code = setInputDataBlock(pSup, pBlock, pExtW->binfo.inputTsOrder, pBlock->info.scanFlag, true);
×
1303
    QUERY_CHECK_CODE(code, lino, _end);
×
1304

1305
    printDataBlock(pBlock, __func__, "externalwindowAlign");
×
1306
    qDebug("ext windowpExtWAlign->scalarMode:%d", pExtW->mode);
×
1307

1308

1309
    if (EEXT_MODE_SCALAR == pExtW->mode) {
×
1310
      code = doMergeAlignExtWindowProject(pOperator, &pExtW->binfo.resultRowInfo, pBlock, pRes);
×
1311
    } else {
1312
      code = doMergeAlignExtWindowAgg(pOperator, &pExtW->binfo.resultRowInfo, pBlock, pRes);
×
1313
    }
1314

1315
    // int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock);
1316
    // if (status == EXTERNAL_RETRIEVE_CONTINUE) {
1317
    //   continue;
1318
    // } else if (status == EXTERNAL_RETRIEVE_NEXT_WINDOW) {
1319
    //   pExtW->pOutputBlockListNode = NULL;
1320
    //   pExtW->outputWinId++;
1321
    //   incExtWinOutIdx(pOperator);
1322
    //   continue;
1323
    // } else if (status == EXTERNAL_RETRIEVE_DONE) {
1324
    //   // do nothing, just break
1325
    // }
1326
  }
1327
_end:
×
1328
  if (code != 0) {
×
1329
    qError("%s failed at line %d since:%s", __func__, lino, tstrerror(code));
×
1330
    pTaskInfo->code = code;
×
1331
    T_LONG_JMP(pTaskInfo->env, code);
×
1332
  }
1333
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc