• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.3
/source/libs/executor/src/countwindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "functionMgt.h"
20
#include "operator.h"
21
#include "querytask.h"
22
#include "tcommon.h"
23
#include "tcompare.h"
24
#include "tdatablock.h"
25
#include "ttime.h"
26

27
typedef struct SCountWindowResult {
28
  int32_t    winRows;
29
  SResultRow row;
30
} SCountWindowResult;
31

32
typedef struct SCountWindowSupp {
33
  SArray* pWinStates;
34
  int32_t stateIndex;
35
  int32_t curStateIndex;
36
} SCountWindowSupp;
37

38
typedef struct SCountWindowOperatorInfo {
39
  SOptrBasicInfo     binfo;
40
  SAggSupporter      aggSup;
41
  SExprSupp          scalarSup;
42
  int32_t            tsSlotId;  // primary timestamp column slot id
43
  STimeWindowAggSupp twAggSup;
44
  uint64_t           groupId;  // current group id, used to identify the data block from different groups
45
  SResultRow*        pRow;
46
  int32_t            windowCount;
47
  int32_t            windowSliding;
48
  SCountWindowSupp   countSup;
49
  SSDataBlock*       pPreDataBlock;
50
  int32_t            preStateIndex;
51
} SCountWindowOperatorInfo;
52

53
void destroyCountWindowOperatorInfo(void* param) {
6,889✔
54
  SCountWindowOperatorInfo* pInfo = (SCountWindowOperatorInfo*)param;
6,889✔
55
  if (pInfo == NULL) {
6,889!
56
    return;
×
57
  }
58
  cleanupBasicInfo(&pInfo->binfo);
6,889✔
59
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
6,894✔
60

61
  cleanupAggSup(&pInfo->aggSup);
6,893✔
62
  cleanupExprSupp(&pInfo->scalarSup);
6,893✔
63
  taosArrayDestroy(pInfo->countSup.pWinStates);
6,894✔
64
  taosMemoryFreeClear(param);
6,894!
65
}
66

67
static int32_t countWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
68

69
static void clearWinStateBuff(SCountWindowResult* pBuff) { pBuff->winRows = 0; }
70,133✔
70

71
static SCountWindowResult* getCountWinStateInfo(SCountWindowSupp* pCountSup) {
141,225✔
72
  SCountWindowResult* pBuffInfo = taosArrayGet(pCountSup->pWinStates, pCountSup->stateIndex);
141,225✔
73
  pCountSup->curStateIndex = pCountSup->stateIndex;
141,219✔
74
  if (!pBuffInfo) {
141,219!
75
    terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
76
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
77
    return NULL;
×
78
  }
79
  int32_t size = taosArrayGetSize(pCountSup->pWinStates);
141,219✔
80
  if (size == 0) {
141,218✔
81
    terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
4✔
82
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
83
    return NULL;
×
84
  }
85
  pCountSup->stateIndex = (pCountSup->stateIndex + 1) % size;
141,214✔
86
  return pBuffInfo;
141,214✔
87
}
88

89
static int32_t setCountWindowOutputBuff(SExprSupp* pExprSup, SCountWindowSupp* pCountSup, SResultRow** pResult,
141,223✔
90
                                        SCountWindowResult** ppResBuff) {
91
  int32_t             code = TSDB_CODE_SUCCESS;
141,223✔
92
  int32_t             lino = 0;
141,223✔
93
  SCountWindowResult* pBuff = getCountWinStateInfo(pCountSup);
141,223✔
94
  QUERY_CHECK_NULL(pBuff, code, lino, _end, terrno);
141,211!
95
  (*pResult) = &pBuff->row;
141,211✔
96
  code = setResultRowInitCtx(*pResult, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
141,211✔
97
  (*ppResBuff) = pBuff;
141,229✔
98

99
_end:
141,229✔
100
  if (code != TSDB_CODE_SUCCESS) {
141,229!
101
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
102
  }
103
  return code;
141,223✔
104
}
105

106
static int32_t updateCountWindowInfo(int32_t start, int32_t blockRows, int32_t countWinRows, int32_t* pCurrentRows) {
70,973✔
107
  int32_t rows = TMIN(countWinRows - (*pCurrentRows), blockRows - start);
70,973✔
108
  (*pCurrentRows) += rows;
70,973✔
109
  return rows;
70,973✔
110
}
111

112
void doCountWindowAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
56,276✔
113
  int32_t                   code = TSDB_CODE_SUCCESS;
56,276✔
114
  int32_t                   lino = 0;
56,276✔
115
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
56,276✔
116
  SExprSupp*                pExprSup = &pOperator->exprSupp;
56,276✔
117
  SCountWindowOperatorInfo* pInfo = pOperator->info;
56,276✔
118
  SSDataBlock*              pRes = pInfo->binfo.pRes;
56,276✔
119
  SColumnInfoData*          pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
56,276✔
120
  QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
56,276!
121
  TSKEY* tsCols = (TSKEY*)pColInfoData->pData;
56,276✔
122
  int32_t numOfBuff = taosArrayGetSize(pInfo->countSup.pWinStates);
56,276✔
123
  if (numOfBuff == 0) {
56,276!
124
    code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
125
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
126
    T_LONG_JMP(pTaskInfo->env, code);
×
127
  }
128
  pInfo->countSup.stateIndex = (pInfo->preStateIndex + 1) % numOfBuff;
56,276✔
129

130
  int32_t newSize = pRes->info.rows + pBlock->info.rows / pInfo->windowSliding + 1;
56,276✔
131
  if (newSize > pRes->info.capacity) {
56,276!
132
    code = blockDataEnsureCapacity(pRes, newSize);
×
133
    QUERY_CHECK_CODE(code, lino, _end);
×
134
  }
135

136
  for (int32_t i = 0; i < pBlock->info.rows;) {
127,249✔
137
    SCountWindowResult* pBuffInfo = NULL;
70,973✔
138
    code = setCountWindowOutputBuff(pExprSup, &pInfo->countSup, &pInfo->pRow, &pBuffInfo);
70,973✔
139
    if (code != TSDB_CODE_SUCCESS) {
70,973!
140
      qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
141
      T_LONG_JMP(pTaskInfo->env, code);
×
142
    }
143
    int32_t prevRows = pBuffInfo->winRows;
70,973✔
144
    int32_t num = updateCountWindowInfo(i, pBlock->info.rows, pInfo->windowCount, &pBuffInfo->winRows);
70,973✔
145
    int32_t step = num;
70,973✔
146
    if (prevRows == 0) {
70,973✔
147
      pInfo->pRow->win.skey = tsCols[i];
70,133✔
148
    }
149
    pInfo->pRow->win.ekey = tsCols[num + i - 1];
70,973✔
150

151
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->pRow->win, 0);
70,973✔
152
    code = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &pInfo->twAggSup.timeWindowData, i, num,
70,973✔
153
                                           pBlock->info.rows, pExprSup->numOfExprs);
70,973✔
154
    QUERY_CHECK_CODE(code, lino, _end);
70,972!
155
    if (pInfo->windowCount != pInfo->windowSliding) {
70,972✔
156
      if (prevRows <= pInfo->windowSliding) {
2,998!
157
        if (pBuffInfo->winRows > pInfo->windowSliding) {
2,998✔
158
          step = pInfo->windowSliding - prevRows;
519✔
159
        } else {
160
          step = pInfo->windowSliding;
2,479✔
161
        }
162
      } else {
163
        step = 0;
×
164
      }
165
    }
166
    if (pBuffInfo->winRows == pInfo->windowCount) {
70,972✔
167
      doUpdateNumOfRows(pExprSup->pCtx, pInfo->pRow, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
15,764✔
168
      code = copyResultrowToDataBlock(pExprSup->pExprInfo, pExprSup->numOfExprs, pInfo->pRow, pExprSup->pCtx, pRes,
15,764✔
169
                                      pExprSup->rowEntryInfoOffset, pTaskInfo);
15,764✔
170
      QUERY_CHECK_CODE(code, lino, _end);
15,765!
171
      pRes->info.rows += pInfo->pRow->numOfRows;
15,765✔
172
      clearWinStateBuff(pBuffInfo);
15,765✔
173
      pInfo->preStateIndex = pInfo->countSup.curStateIndex;
15,765✔
174
      clearResultRowInitFlag(pExprSup->pCtx, pExprSup->numOfExprs);
15,765✔
175
    }
176
    i += step;
70,973✔
177
  }
178

179
  code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
56,276✔
180
  QUERY_CHECK_CODE(code, lino, _end);
56,276!
181

182
_end:
56,276✔
183
  if (code != TSDB_CODE_SUCCESS) {
56,276!
184
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
185
    pTaskInfo->code = code;
×
186
    T_LONG_JMP(pTaskInfo->env, code);
×
187
  }
188
}
56,276✔
189

190
static void buildCountResult(SExprSupp* pExprSup, SCountWindowSupp* pCountSup, SExecTaskInfo* pTaskInfo,
62,270✔
191
                             SFilterInfo* pFilterInfo, int32_t preStateIndex, SSDataBlock* pBlock) {
192
  SResultRow* pResultRow = NULL;
62,270✔
193
  int32_t     code = TSDB_CODE_SUCCESS;
62,270✔
194
  int32_t     lino = 0;
62,270✔
195
  int32_t     numOfBuff = taosArrayGetSize(pCountSup->pWinStates);
62,270✔
196
  int32_t     newSize = pBlock->info.rows + numOfBuff;
62,271✔
197
  if (newSize > pBlock->info.capacity) {
62,271!
UNCOV
198
    code = blockDataEnsureCapacity(pBlock, newSize);
×
199
    QUERY_CHECK_CODE(code, lino, _end);
×
200
  }
201
  pCountSup->stateIndex = (preStateIndex + 1) % numOfBuff;
62,275✔
202
  for (int32_t i = 0; i < numOfBuff; i++) {
132,529✔
203
    SCountWindowResult* pBuff = NULL;
70,251✔
204
    code = setCountWindowOutputBuff(pExprSup, pCountSup, &pResultRow, &pBuff);
70,251✔
205
    QUERY_CHECK_CODE(code, lino, _end);
70,250!
206
    if (pBuff->winRows == 0) {
70,250✔
207
      continue;
15,886✔
208
    }
209
    doUpdateNumOfRows(pExprSup->pCtx, pResultRow, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
54,364✔
210
    code = copyResultrowToDataBlock(pExprSup->pExprInfo, pExprSup->numOfExprs, pResultRow, pExprSup->pCtx, pBlock,
54,368✔
211
                                    pExprSup->rowEntryInfoOffset, pTaskInfo);
54,368✔
212
    QUERY_CHECK_CODE(code, lino, _end);
54,368!
213
    pBlock->info.rows += pResultRow->numOfRows;
54,368✔
214
    clearWinStateBuff(pBuff);
54,368✔
215
    clearResultRowInitFlag(pExprSup->pCtx, pExprSup->numOfExprs);
54,368✔
216
  }
217
  code = doFilter(pBlock, pFilterInfo, NULL);
62,278✔
218
  QUERY_CHECK_CODE(code, lino, _end);
62,262!
219

220
_end:
62,262✔
221
  if (code != TSDB_CODE_SUCCESS) {
62,262!
222
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
223
    T_LONG_JMP(pTaskInfo->env, code);
×
224
  }
225
}
62,262✔
226

227
static int32_t countWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
61,393✔
228
  int32_t                   code = TSDB_CODE_SUCCESS;
61,393✔
229
  int32_t                   lino = 0;
61,393✔
230
  SCountWindowOperatorInfo* pInfo = pOperator->info;
61,393✔
231
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
61,393✔
232
  SExprSupp*                pExprSup = &pOperator->exprSupp;
61,393✔
233
  int32_t                   order = pInfo->binfo.inputTsOrder;
61,393✔
234
  SSDataBlock*              pRes = pInfo->binfo.pRes;
61,393✔
235

236
  blockDataCleanup(pRes);
61,393✔
237

238
  while (1) {
56,276✔
239
    SSDataBlock* pBlock = NULL;
117,714✔
240
    if (pInfo->pPreDataBlock == NULL) { 
117,714✔
241
      pBlock = getNextBlockFromDownstream(pOperator, 0);
65,232✔
242
    } else {
243
      pBlock = pInfo->pPreDataBlock;
52,482✔
244
      pInfo->pPreDataBlock = NULL;
52,482✔
245
    }
246

247
    if (pBlock == NULL) {
117,719✔
248
      break;
9,783✔
249
    }
250

251
    pRes->info.scanFlag = pBlock->info.scanFlag;
107,936✔
252
    code = setInputDataBlock(pExprSup, pBlock, order, MAIN_SCAN, true);
107,936✔
253
    QUERY_CHECK_CODE(code, lino, _end);
107,936!
254

255
    code = blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
107,936✔
256
    QUERY_CHECK_CODE(code, lino, _end);
107,936!
257

258
    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
259
    if (pInfo->scalarSup.pExprInfo != NULL) {
107,936!
260
      code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
×
261
                                   pInfo->scalarSup.numOfExprs, NULL);
262
      QUERY_CHECK_CODE(code, lino, _end);
×
263
    }
264

265
    if (pInfo->groupId == 0) {
107,936✔
266
      pInfo->groupId = pBlock->info.id.groupId;
2,954✔
267
    } else if (pInfo->groupId != pBlock->info.id.groupId) {
104,982✔
268
      pInfo->pPreDataBlock = pBlock;
52,482✔
269
      pRes->info.id.groupId = pInfo->groupId;
52,482✔
270
      buildCountResult(pExprSup, &pInfo->countSup, pTaskInfo, pOperator->exprSupp.pFilterInfo, pInfo->preStateIndex, pRes);
52,482✔
271
      pInfo->groupId = pBlock->info.id.groupId;
52,482✔
272
      if (pRes->info.rows > 0) {
52,482✔
273
        (*ppRes) = pRes;
51,660✔
274
        return code;
51,660✔
275
      }
276
    }
277

278
    doCountWindowAggImpl(pOperator, pBlock);
56,276✔
279
    if (pRes->info.rows >= pOperator->resultInfo.threshold) {
56,276!
280
      pRes->info.id.groupId = pInfo->groupId;
×
281
      (*ppRes) = pRes;
×
282
      return code;
×
283
    }
284
  }
285

286
  pRes->info.id.groupId = pInfo->groupId;
9,783✔
287
  buildCountResult(pExprSup, &pInfo->countSup, pTaskInfo, pOperator->exprSupp.pFilterInfo, pInfo->preStateIndex, pRes);
9,783✔
288

289
_end:
9,774✔
290
  if (code != TSDB_CODE_SUCCESS) {
9,774!
291
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
292
    pTaskInfo->code = code;
×
293
    T_LONG_JMP(pTaskInfo->env, code);
×
294
  }
295
  (*ppRes) = pRes->info.rows == 0 ? NULL : pRes;
9,774✔
296
  return code;
9,774✔
297
}
298

299
int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode,
6,847✔
300
                                             SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
301
  QRY_PARAM_CHECK(pOptrInfo);
6,847!
302

303
  int32_t                   code = TSDB_CODE_SUCCESS;
6,847✔
304
  int32_t                   lino = 0;
6,847✔
305
  SCountWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SCountWindowOperatorInfo));
6,847!
306
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
6,861!
307
  if (pInfo == NULL || pOperator == NULL) {
6,855!
308
    code = terrno;
×
309
    goto _error;
×
310
  }
311

312
  pOperator->exprSupp.hasWindowOrGroup = true;
6,859✔
313

314
  SCountWinodwPhysiNode* pCountWindowNode = (SCountWinodwPhysiNode*)physiNode;
6,859✔
315

316
  pInfo->tsSlotId = ((SColumnNode*)pCountWindowNode->window.pTspk)->slotId;
6,859✔
317

318
  if (pCountWindowNode->window.pExprs != NULL) {
6,859!
319
    int32_t    numOfScalarExpr = 0;
×
320
    SExprInfo* pScalarExprInfo = NULL;
×
321
    code = createExprInfo(pCountWindowNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
×
322
    QUERY_CHECK_CODE(code, lino, _error);
×
323
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
×
324
    QUERY_CHECK_CODE(code, lino, _error);
×
325
  }
326

327
  size_t     keyBufSize = 0;
6,859✔
328
  int32_t    num = 0;
6,859✔
329
  SExprInfo* pExprInfo = NULL;
6,859✔
330
  code = createExprInfo(pCountWindowNode->window.pFuncs, NULL, &pExprInfo, &num);
6,859✔
331
  QUERY_CHECK_CODE(code, lino, _error);
6,885!
332

333
  initResultSizeInfo(&pOperator->resultInfo, 4096);
6,885✔
334

335
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
6,888✔
336
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
6,888✔
337
  QUERY_CHECK_CODE(code, lino, _error);
6,879!
338

339
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pCountWindowNode->window.node.pOutputDataBlockDesc);
6,879✔
340
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
6,890!
341
  initBasicInfo(&pInfo->binfo, pResBlock);
6,890✔
342

343
  code = blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
6,890✔
344
  QUERY_CHECK_CODE(code, lino, _error);
6,892!
345

346
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
6,892✔
347
  pInfo->binfo.inputTsOrder = physiNode->inputTsOrder;
6,892✔
348
  pInfo->binfo.outputTsOrder = physiNode->outputTsOrder;
6,892✔
349
  pInfo->windowCount = pCountWindowNode->windowCount;
6,892✔
350
  pInfo->windowSliding = pCountWindowNode->windowSliding;
6,892✔
351
  // sizeof(SCountWindowResult)
352
  int32_t itemSize = sizeof(int32_t) + pInfo->aggSup.resultRowSize;
6,892✔
353
  int32_t numOfItem = 1;
6,892✔
354
  if (pInfo->windowCount != pInfo->windowSliding) {
6,892✔
355
    numOfItem = pInfo->windowCount / pInfo->windowSliding + 1;
187✔
356
  }
357

358
  pInfo->countSup.pWinStates = taosArrayInit_s(itemSize, numOfItem);
6,892✔
359
  if (!pInfo->countSup.pWinStates) {
6,887!
360
    goto _error;
×
361
  }
362

363
  pInfo->countSup.stateIndex = 0;
6,887✔
364
  pInfo->pPreDataBlock = NULL;
6,887✔
365
  pInfo->preStateIndex = 0;
6,887✔
366

367
  code = filterInitFromNode((SNode*)pCountWindowNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
6,887✔
368
  QUERY_CHECK_CODE(code, lino, _error);
6,884!
369

370
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
6,884✔
371
  QUERY_CHECK_CODE(code, lino, _error);
6,885!
372

373
  setOperatorInfo(pOperator, "CountWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT, true, OP_NOT_OPENED, pInfo,
6,885✔
374
                  pTaskInfo);
375
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, countWindowAggregateNext, NULL, destroyCountWindowOperatorInfo,
6,881✔
376
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
377

378
  code = appendDownstream(pOperator, &downstream, 1);
6,879✔
379
  if (code != TSDB_CODE_SUCCESS) {
6,886!
380
    goto _error;
×
381
  }
382

383
  *pOptrInfo = pOperator;
6,886✔
384
  return TSDB_CODE_SUCCESS;
6,886✔
385

386
_error:
×
387
  if (pInfo != NULL) {
×
388
    destroyCountWindowOperatorInfo(pInfo);
×
389
  }
390

391
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
392
  pTaskInfo->code = code;
×
393
  return code;
×
394
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc