• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5049

11 May 2026 06:30AM UTC coverage: 73.313% (+0.09%) from 73.222%
#5049

push

travis-ci

web-flow
feat: refactor taosdump code to improve backup speed and compression ratio (#35292)

6625 of 8435 new or added lines in 28 files covered. (78.54%)

2491 existing lines in 142 files now uncovered.

281233 of 383605 relevant lines covered (73.31%)

132489999.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.63
/source/libs/executor/src/countwindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "functionMgt.h"
20
#include "operator.h"
21
#include "querytask.h"
22
#include "tcommon.h"
23
#include "tcompare.h"
24
#include "tdatablock.h"
25
#include "ttime.h"
26

27
typedef struct SCountWindowResult {
28
  int32_t    winRows;
29
  TSKEY      winSKey;
30
  SResultRow row;
31
} SCountWindowResult;
32

33
typedef struct SCountWindowSupp {
34
  SArray* pWinStates;
35
  int32_t stateIndex;
36
  int32_t curStateIndex;
37
  TSKEY   lastTs; // this ts is used to record the last timestamp, so that we can know whether the new row's ts is duplicated
38
} SCountWindowSupp;
39

40
typedef struct SCountWindowOperatorInfo {
41
  SOptrBasicInfo     binfo;
42
  SAggSupporter      aggSup;
43
  SExprSupp          scalarSup;
44
  int32_t            tsSlotId;  // primary timestamp column slot id
45
  STimeWindowAggSupp twAggSup;
46
  uint64_t           groupId;  // current group id, used to identify the data block from different groups
47
  SResultRow*        pRow;
48
  int32_t            windowCount;
49
  int32_t            windowSliding;
50
  SCountWindowSupp   countSup;
51
  SSDataBlock*       pPreDataBlock;
52
  int32_t            preStateIndex;
53
  bool               indefRowsMode;
54
  SIndefRowsRuntime  indefRows;
55
  struct SOperatorInfo* pOperator;
56
} SCountWindowOperatorInfo;
57

58
void destroyCountWindowOperatorInfo(void* param) {
290,842✔
59
  SCountWindowOperatorInfo* pInfo = (SCountWindowOperatorInfo*)param;
290,842✔
60
  if (pInfo == NULL) {
290,842✔
61
    return;
×
62
  }
63
  cleanupBasicInfo(&pInfo->binfo);
290,842✔
64
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
290,842✔
65
  cleanupIndefRowsRuntime(&pInfo->indefRows, pInfo->pOperator);
290,842✔
66

67
  cleanupAggSup(&pInfo->aggSup);
290,842✔
68
  cleanupExprSupp(&pInfo->scalarSup);
290,842✔
69
  taosArrayDestroy(pInfo->countSup.pWinStates);
290,842✔
70
  taosMemoryFreeClear(param);
290,842✔
71
}
72

73
static int32_t countWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
74

75
static void clearWinStateBuff(SCountWindowResult* pBuff) {
133,700,533✔
76
  pBuff->winRows = 0;
133,700,533✔
77
  pBuff->winSKey = 0;
133,700,533✔
78
}
133,700,533✔
79

80
static SCountWindowResult* getCountWinStateInfo(SCountWindowSupp* pCountSup) {
136,532,721✔
81
  SCountWindowResult* pBuffInfo = taosArrayGet(pCountSup->pWinStates, pCountSup->stateIndex);
136,532,721✔
82
  pCountSup->curStateIndex = pCountSup->stateIndex;
136,532,721✔
83
  if (!pBuffInfo) {
136,532,721✔
84
    terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
85
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
86
    return NULL;
×
87
  }
88
  int32_t size = taosArrayGetSize(pCountSup->pWinStates);
136,532,721✔
89
  if (size == 0) {
136,532,721✔
90
    terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
91
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
92
    return NULL;
×
93
  }
94
  pCountSup->stateIndex = (pCountSup->stateIndex + 1) % size;
136,532,721✔
95
  return pBuffInfo;
136,532,721✔
96
}
97

98
static int32_t setCountWindowOutputBuff(SExprSupp* pExprSup, SCountWindowSupp* pCountSup, SResultRow** pResult,
136,445,005✔
99
                                        SCountWindowResult** ppResBuff) {
100
  int32_t             code = TSDB_CODE_SUCCESS;
136,445,005✔
101
  int32_t             lino = 0;
136,445,005✔
102
  SCountWindowResult* pBuff = getCountWinStateInfo(pCountSup);
136,445,005✔
103
  QUERY_CHECK_NULL(pBuff, code, lino, _end, terrno);
136,445,005✔
104
  (*pResult) = &pBuff->row;
136,445,005✔
105
  code = setResultRowInitCtx(*pResult, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
136,445,005✔
106
  (*ppResBuff) = pBuff;
136,445,005✔
107

108
_end:
136,445,005✔
109
  if (code != TSDB_CODE_SUCCESS) {
136,445,005✔
110
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
111
  }
112
  return code;
136,445,005✔
113
}
114

115
static int32_t updateCountWindowInfo(int32_t start, int32_t blockRows, int32_t countWinRows, int32_t* pCurrentRows) {
135,969,498✔
116
  int32_t rows = TMIN(countWinRows - (*pCurrentRows), blockRows - start);
135,969,498✔
117
  (*pCurrentRows) += rows;
135,969,498✔
118
  return rows;
135,969,498✔
119
}
120

121
void doCountWindowAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
3,054,890✔
122
  int32_t                   code = TSDB_CODE_SUCCESS;
3,054,890✔
123
  int32_t                   lino = 0;
3,054,890✔
124
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
3,054,890✔
125
  SExprSupp*                pExprSup = &pOperator->exprSupp;
3,054,890✔
126
  SCountWindowOperatorInfo* pInfo = pOperator->info;
3,054,890✔
127
  SSDataBlock*              pRes = pInfo->binfo.pRes;
3,054,890✔
128
  SColumnInfoData*          pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
3,054,890✔
129
  QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
3,054,890✔
130
  TSKEY* tsCols = (TSKEY*)pColInfoData->pData;
3,054,890✔
131
  int32_t numOfBuff = taosArrayGetSize(pInfo->countSup.pWinStates);
3,054,890✔
132
  if (numOfBuff == 0) {
3,054,890✔
133
    code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
134
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
135
    T_LONG_JMP(pTaskInfo->env, code);
×
136
  }
137
  pInfo->countSup.stateIndex = (pInfo->preStateIndex + 1) % numOfBuff;
3,054,890✔
138

139
  int32_t newSize = pRes->info.rows + pBlock->info.rows / pInfo->windowSliding + 1;
3,054,890✔
140
  if (!pInfo->indefRowsMode && newSize > pRes->info.capacity) {
3,054,890✔
141
    code = blockDataEnsureCapacity(pRes, newSize);
×
142
    QUERY_CHECK_CODE(code, lino, _end);
×
143
  }
144

145
  for (int32_t i = 0; i < pBlock->info.rows; i++) {
2,147,483,647✔
146
    if (pBlock->info.scanFlag != PRE_SCAN) {
2,147,483,647✔
147
      if (pInfo->countSup.lastTs == INT64_MIN) {
2,147,483,647✔
148
        pInfo->countSup.lastTs = tsCols[i];
358,634✔
149
      } else {
150
        if (tsCols[i] == pInfo->countSup.lastTs) {
2,147,483,647✔
151
          qError("duplicate timestamp found in count window operator" PRId64 ", timestamp: %" PRId64, tsCols[i]);
11,985✔
152
          code = TSDB_CODE_QRY_WINDOW_DUP_TIMESTAMP;
11,985✔
153
          QUERY_CHECK_CODE(code, lino, _end);
11,985✔
154
        } else {
155
          pInfo->countSup.lastTs = tsCols[i];
2,147,483,647✔
156
        }
157
      }
158
    }
159
  }
160

161
  for (int32_t i = 0; i < pBlock->info.rows;) {
139,012,403✔
162
    SCountWindowResult* pBuffInfo = NULL;
135,969,498✔
163
    if (!pInfo->indefRowsMode) {
135,969,498✔
164
      code = setCountWindowOutputBuff(pExprSup, &pInfo->countSup, &pInfo->pRow, &pBuffInfo);
135,909,302✔
165
      if (code != TSDB_CODE_SUCCESS) {
135,909,302✔
166
        qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
167
        T_LONG_JMP(pTaskInfo->env, code);
×
168
      }
169
    } else {
170
      pBuffInfo = getCountWinStateInfo(&pInfo->countSup);
60,196✔
171
      QUERY_CHECK_NULL(pBuffInfo, code, lino, _end, terrno);
60,196✔
172
    }
173
    int32_t prevRows = pBuffInfo->winRows;
135,969,498✔
174
    int32_t num = updateCountWindowInfo(i, pBlock->info.rows, pInfo->windowCount, &pBuffInfo->winRows);
135,969,498✔
175
    int32_t step = num;
135,969,498✔
176
    if (prevRows == 0) {
135,969,498✔
177
      pBuffInfo->winSKey = tsCols[i];
133,698,679✔
178
      if (!pInfo->indefRowsMode) {
133,698,679✔
179
        pInfo->pRow->win.skey = tsCols[i];
133,638,483✔
180
      }
181
    }
182
    STimeWindow win = {.skey = pBuffInfo->winSKey, .ekey = tsCols[num + i - 1]};
135,969,498✔
183
    if (pInfo->indefRowsMode) {
135,969,498✔
184
      SIndefRowsWindowState* pState = NULL;
60,196✔
185
      code = applyIndefRowsFuncOnWindowState(pOperator, &pInfo->indefRows, &pState, pInfo->binfo.pRes,
60,196✔
186
                                             pBlock->info.id.groupId, &win, pBlock, i, num, pInfo->binfo.inputTsOrder,
187
                                             pInfo->aggSup.resultRowSize);
188
      QUERY_CHECK_CODE(code, lino, _end);
60,196✔
189
    } else {
190
      pInfo->pRow->win.ekey = win.ekey;
135,909,302✔
191
      updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->pRow->win, 0);
135,909,302✔
192
      code = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &pInfo->twAggSup.timeWindowData, i, num,
271,818,604✔
193
                                             pBlock->info.rows, pExprSup->numOfExprs);
135,909,302✔
194
      QUERY_CHECK_CODE(code, lino, _end);
135,909,302✔
195
    }
196
    if (pInfo->windowCount != pInfo->windowSliding) {
135,969,498✔
197
      if (prevRows <= pInfo->windowSliding) {
51,931✔
198
        if (pBuffInfo->winRows > pInfo->windowSliding) {
51,931✔
199
          step = pInfo->windowSliding - prevRows;
42,702✔
200
        } else {
201
          step = pInfo->windowSliding;
9,229✔
202
        }
203
      } else {
204
        step = 0;
×
205
      }
206
    }
207
    if (pBuffInfo->winRows == pInfo->windowCount) {
135,969,498✔
208
      if (pInfo->indefRowsMode) {
133,403,721✔
209
        SIndefRowsWindowState* pState = findIndefRowsWindowState(&pInfo->indefRows, pBlock->info.id.groupId,
43,658✔
210
                                                                 pBuffInfo->winSKey);
43,658✔
211
        QUERY_CHECK_NULL(pState, code, lino, _end, TSDB_CODE_QRY_WINDOW_STATE_NOT_EXIST);
43,658✔
212
        code = closeIndefRowsWindowState(pOperator, &pInfo->indefRows, pState);
43,658✔
213
        QUERY_CHECK_CODE(code, lino, _end);
43,658✔
214
      } else {
215
        doUpdateNumOfRows(pExprSup->pCtx, pInfo->pRow, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
133,360,063✔
216
        code = copyResultrowToDataBlock(pExprSup->pExprInfo, pExprSup->numOfExprs, pInfo->pRow, pExprSup->pCtx, pRes,
133,359,608✔
217
                                        pExprSup->rowEntryInfoOffset, pTaskInfo);
133,359,608✔
218
        QUERY_CHECK_CODE(code, lino, _end);
133,360,063✔
219
        pRes->info.rows += pInfo->pRow->numOfRows;
133,360,063✔
220
      }
221
      clearWinStateBuff(pBuffInfo);
133,403,721✔
222
      pInfo->preStateIndex = pInfo->countSup.curStateIndex;
133,403,721✔
223
      if (!pInfo->indefRowsMode) {
133,403,721✔
224
        clearResultRowInitFlag(pExprSup->pCtx, pExprSup->numOfExprs);
133,360,063✔
225
      }
226
    }
227
    i += step;
135,969,498✔
228
  }
229

230
  if (!pInfo->indefRowsMode) {
3,042,905✔
231
    code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
3,021,076✔
232
    QUERY_CHECK_CODE(code, lino, _end);
3,021,076✔
233
  }
234

235
_end:
3,042,905✔
236
  if (code != TSDB_CODE_SUCCESS) {
3,054,890✔
237
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
11,985✔
238
    pTaskInfo->code = code;
11,985✔
239
    T_LONG_JMP(pTaskInfo->env, code);
11,985✔
240
  }
241
}
3,042,905✔
242

243
static void buildCountResult(SOperatorInfo* pOperator, SCountWindowOperatorInfo* pInfo, uint64_t groupId, SSDataBlock* pBlock) {
500,827✔
244
  SExprSupp*        pExprSup = &pOperator->exprSupp;
500,827✔
245
  SCountWindowSupp* pCountSup = &pInfo->countSup;
500,827✔
246
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
500,827✔
247
  SFilterInfo*      pFilterInfo = pOperator->exprSupp.pFilterInfo;
500,827✔
248
  SResultRow* pResultRow = NULL;
500,827✔
249
  int32_t     code = TSDB_CODE_SUCCESS;
500,827✔
250
  int32_t     lino = 0;
500,827✔
251
  int32_t     numOfBuff = taosArrayGetSize(pCountSup->pWinStates);
500,827✔
252
  int32_t     newSize = pBlock->info.rows + numOfBuff;
500,827✔
253
  if (!pInfo->indefRowsMode && newSize > pBlock->info.capacity) {
500,827✔
254
    code = blockDataEnsureCapacity(pBlock, newSize);
×
255
    QUERY_CHECK_CODE(code, lino, _end);
×
256
  }
257
  pCountSup->stateIndex = (pInfo->preStateIndex + 1) % numOfBuff;
500,827✔
258
  for (int32_t i = 0; i < numOfBuff; i++) {
1,064,050✔
259
    SCountWindowResult* pBuff = NULL;
563,223✔
260
    if (!pInfo->indefRowsMode) {
563,223✔
261
      code = setCountWindowOutputBuff(pExprSup, pCountSup, &pResultRow, &pBuff);
535,703✔
262
      QUERY_CHECK_CODE(code, lino, _end);
535,703✔
263
    } else {
264
      pBuff = getCountWinStateInfo(pCountSup);
27,520✔
265
      QUERY_CHECK_NULL(pBuff, code, lino, _end, terrno);
27,520✔
266
    }
267
    if (pBuff->winRows == 0) {
563,223✔
268
      continue;
268,265✔
269
    }
270
    if (pInfo->indefRowsMode) {
294,958✔
271
      SIndefRowsWindowState* pState = findIndefRowsWindowState(&pInfo->indefRows, groupId, pBuff->winSKey);
16,538✔
272
      QUERY_CHECK_NULL(pState, code, lino, _end, TSDB_CODE_QRY_WINDOW_STATE_NOT_EXIST);
16,538✔
273
      code = closeIndefRowsWindowState(pOperator, &pInfo->indefRows, pState);
16,538✔
274
      QUERY_CHECK_CODE(code, lino, _end);
16,538✔
275
    } else {
276
      doUpdateNumOfRows(pExprSup->pCtx, pResultRow, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
278,420✔
277
      code = copyResultrowToDataBlock(pExprSup->pExprInfo, pExprSup->numOfExprs, pResultRow, pExprSup->pCtx, pBlock,
278,420✔
278
                                      pExprSup->rowEntryInfoOffset, pTaskInfo);
278,420✔
279
      QUERY_CHECK_CODE(code, lino, _end);
278,420✔
280
      pBlock->info.rows += pResultRow->numOfRows;
278,420✔
281
    }
282
    clearWinStateBuff(pBuff);
294,958✔
283
    if (!pInfo->indefRowsMode) {
294,958✔
284
      clearResultRowInitFlag(pExprSup->pCtx, pExprSup->numOfExprs);
278,420✔
285
    }
286
  }
287
  if (!pInfo->indefRowsMode) {
500,827✔
288
    code = doFilter(pBlock, pFilterInfo, NULL, NULL);
473,307✔
289
    QUERY_CHECK_CODE(code, lino, _end);
473,307✔
290
  }
291

292
_end:
500,827✔
293
  if (code != TSDB_CODE_SUCCESS) {
500,827✔
294
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
295
    T_LONG_JMP(pTaskInfo->env, code);
×
296
  }
297
}
500,827✔
298

299
static int32_t countWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
573,259✔
300
  int32_t                   code = TSDB_CODE_SUCCESS;
573,259✔
301
  int32_t                   lino = 0;
573,259✔
302
  SCountWindowOperatorInfo* pInfo = pOperator->info;
573,259✔
303
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
573,259✔
304
  SExprSupp*                pExprSup = &pOperator->exprSupp;
573,259✔
305
  int32_t                   order = pInfo->binfo.inputTsOrder;
573,259✔
306
  SSDataBlock*              pRes = pInfo->binfo.pRes;
573,259✔
307

308
  if (pOperator->status == OP_EXEC_DONE) {
573,259✔
309
    *ppRes = NULL;
×
310
    return code;
×
311
  }
312

313
  if (pInfo->indefRowsMode) {
573,259✔
314
    (*ppRes) = getNextIndefRowsResultBlock(&pInfo->indefRows, pOperator);
72,594✔
315
    if ((*ppRes) != NULL) {
72,594✔
316
      return code;
22,310✔
317
    }
318
  }
319

320
  blockDataCleanup(pRes);
550,949✔
321

322
  while (1) {
3,021,076✔
323
    SSDataBlock* pBlock = NULL;
3,572,025✔
324
    if (pInfo->pPreDataBlock == NULL) { 
3,572,025✔
325
      pBlock = getNextBlockFromDownstream(pOperator, 0);
3,444,637✔
326
    } else {
327
      pBlock = pInfo->pPreDataBlock;
127,388✔
328
      pInfo->pPreDataBlock = NULL;
127,388✔
329
    }
330

331
    if (pBlock == NULL) {
3,572,025✔
332
      break;
368,049✔
333
    }
334

335
    pRes->info.scanFlag = pBlock->info.scanFlag;
3,203,976✔
336
    code = setInputDataBlock(pExprSup, pBlock, order, MAIN_SCAN, true);
3,203,976✔
337
    QUERY_CHECK_CODE(code, lino, _end);
3,203,976✔
338

339
    code = blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
3,203,976✔
340
    QUERY_CHECK_CODE(code, lino, _end);
3,203,976✔
341

342
    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
343
    if (pInfo->scalarSup.pExprInfo != NULL) {
3,203,976✔
344
      code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
118,754✔
345
                                   pInfo->scalarSup.numOfExprs, NULL, GET_STM_RTINFO(pOperator->pTaskInfo), pOperator->pTaskInfo);
59,377✔
346
      QUERY_CHECK_CODE(code, lino, _end);
59,377✔
347
    }
348

349
    if (pInfo->groupId == 0) {
3,187,668✔
350
      pInfo->groupId = pBlock->info.id.groupId;
2,172,902✔
351
    } else if (pInfo->groupId != pBlock->info.id.groupId) {
1,014,766✔
352
      pInfo->pPreDataBlock = pBlock;
132,778✔
353
      pRes->info.id.groupId = pInfo->groupId;
132,778✔
354
      buildCountResult(pOperator, pInfo, pInfo->groupId, pRes);
132,778✔
355
      pInfo->groupId = pBlock->info.id.groupId;
132,778✔
356
      pInfo->countSup.lastTs = INT64_MIN;
132,778✔
357
      if (pInfo->indefRowsMode) {
132,778✔
358
        (*ppRes) = getNextIndefRowsResultBlock(&pInfo->indefRows, pOperator);
11,328✔
359
        if ((*ppRes) != NULL) {
11,328✔
360
          return code;
11,328✔
361
        }
362
        continue;
×
363
      }
364
      if (pRes->info.rows > 0) {
121,450✔
365
        (*ppRes) = pRes;
121,450✔
366
        return code;
121,450✔
367
      }
368
    }
369

370
    doCountWindowAggImpl(pOperator, pBlock);
3,054,890✔
371
    if (pInfo->indefRowsMode) {
3,042,905✔
372
      (*ppRes) = getNextIndefRowsResultBlock(&pInfo->indefRows, pOperator);
21,829✔
373
      if ((*ppRes) != NULL) {
21,829✔
374
        return code;
21,348✔
375
      }
376
      continue;
481✔
377
    }
378

379
    if (pRes->info.rows >= pOperator->resultInfo.threshold) {
3,021,076✔
380
      pRes->info.id.groupId = pInfo->groupId;
481✔
381
      (*ppRes) = pRes;
481✔
382
      return code;
481✔
383
    }
384
  }
385

386
  pRes->info.id.groupId = pInfo->groupId;
368,049✔
387
  buildCountResult(pOperator, pInfo, pInfo->groupId, pRes);
368,049✔
388

389
  if (pInfo->indefRowsMode) {
368,049✔
390
    (*ppRes) = getNextIndefRowsResultBlock(&pInfo->indefRows, pOperator);
16,192✔
391
    if ((*ppRes) == NULL) {
16,192✔
392
      setOperatorCompleted(pOperator);
10,982✔
393
    }
394
    return code;
16,192✔
395
  }
396

397
_end:
351,857✔
398
  if (code != TSDB_CODE_SUCCESS) {
368,165✔
399
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
16,308✔
400
    pTaskInfo->code = code;
16,308✔
401
    T_LONG_JMP(pTaskInfo->env, code);
16,308✔
402
  }
403
  (*ppRes) = pRes->info.rows == 0 ? NULL : pRes;
351,857✔
404
  return code;
351,857✔
405
}
406

407
static int32_t resetCountWindowOperatorState(SOperatorInfo* pOper) {
1,854✔
408
  SCountWindowOperatorInfo* pCount = pOper->info;
1,854✔
409
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
1,854✔
410
  SCountWindowPhysiNode* pPhynode = (SCountWindowPhysiNode*)pOper->pPhyNode;
1,854✔
411
  pOper->status = OP_NOT_OPENED;
1,854✔
412
  
413
  resetBasicOperatorState(&pCount->binfo);
1,854✔
414
  pCount->groupId = 0;
1,854✔
415
  pCount->countSup.stateIndex = 0;
1,854✔
416
  pCount->countSup.lastTs = INT64_MIN;
1,854✔
417
  pCount->pPreDataBlock = NULL;
1,854✔
418
  pCount->preStateIndex = 0;
1,854✔
419
  pCount->pRow = NULL;
1,854✔
420
  resetIndefRowsRuntime(&pCount->indefRows, pOper);
1,854✔
421
  int32_t numOfBuff = taosArrayGetSize(pCount->countSup.pWinStates);
1,854✔
422
  for (int32_t i = 0; i < numOfBuff; ++i) {
3,708✔
423
    SCountWindowResult* pBuff = taosArrayGet(pCount->countSup.pWinStates, i);
1,854✔
424
    if (pBuff != NULL) {
1,854✔
425
      clearWinStateBuff(pBuff);
1,854✔
426
      memset(&pBuff->row, 0, pCount->aggSup.resultRowSize);
1,854✔
427
    }
428
  }
429

430
  colDataDestroy(&pCount->twAggSup.timeWindowData);
1,854✔
431
  int32_t code = initExecTimeWindowInfo(&pCount->twAggSup.timeWindowData, &pTaskInfo->window);
1,854✔
432
  
433
  if (code == 0) {
1,854✔
434
    code = resetAggSup(&pOper->exprSupp, &pCount->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
3,708✔
435
                        sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, NULL,
1,854✔
436
                        &pTaskInfo->storageAPI.functionStore);
437
  }
438
  if (code == 0) {
1,854✔
439
    code = resetExprSupp(&pCount->scalarSup, pTaskInfo, pPhynode->window.pExprs, NULL,
1,854✔
440
                          &pTaskInfo->storageAPI.functionStore);
441
  }
442
  return code;
1,854✔
443
}
444

445
int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode,
290,354✔
446
                                             SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
447
  QRY_PARAM_CHECK(pOptrInfo);
290,354✔
448

449
  int32_t                   code = TSDB_CODE_SUCCESS;
290,354✔
450
  int32_t                   lino = 0;
290,354✔
451
  SCountWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SCountWindowOperatorInfo));
290,354✔
452
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
290,354✔
453
  if (pInfo == NULL || pOperator == NULL) {
290,842✔
UNCOV
454
    code = terrno;
×
455
    goto _error;
×
456
  }
457
  initOperatorCostInfo(pOperator);
290,842✔
458

459
  pOperator->pPhyNode = physiNode;
290,842✔
460
  pOperator->exprSupp.hasWindowOrGroup = true;
290,842✔
461
  pOperator->exprSupp.hasWindow = true;
290,842✔
462

463
  SCountWindowPhysiNode* pCountWindowNode = (SCountWindowPhysiNode*)physiNode;
290,842✔
464

465
  pInfo->tsSlotId = ((SColumnNode*)pCountWindowNode->window.pTspk)->slotId;
290,842✔
466

467
  if (pCountWindowNode->window.pExprs != NULL) {
290,842✔
468
    int32_t    numOfScalarExpr = 0;
70,145✔
469
    SExprInfo* pScalarExprInfo = NULL;
70,145✔
470
    code = createExprInfo(pCountWindowNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
70,145✔
471
    QUERY_CHECK_CODE(code, lino, _error);
70,145✔
472
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
70,145✔
473
    QUERY_CHECK_CODE(code, lino, _error);
70,145✔
474
  }
475

476
  size_t     keyBufSize = 0;
290,842✔
477
  int32_t    num = 0;
290,842✔
478
  SExprInfo* pExprInfo = NULL;
290,842✔
479
  code = createExprInfo(pCountWindowNode->window.pFuncs, NULL, &pExprInfo, &num);
290,842✔
480
  QUERY_CHECK_CODE(code, lino, _error);
290,842✔
481

482
  initResultSizeInfo(&pOperator->resultInfo, 4096);
290,842✔
483

484
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
290,842✔
485
                    NULL, &pTaskInfo->storageAPI.functionStore);
486
  QUERY_CHECK_CODE(code, lino, _error);
290,842✔
487

488
  pInfo->indefRowsMode = pCountWindowNode->window.indefRowsFunc;
290,842✔
489
  if (pInfo->indefRowsMode) {
290,842✔
490
    code = initIndefRowsRuntime(&pInfo->indefRows, pOperator->exprSupp.pCtx, num, pOperator->resultInfo.capacity);
12,398✔
491
    QUERY_CHECK_CODE(code, lino, _error);
12,398✔
492
  }
493

494
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pCountWindowNode->window.node.pOutputDataBlockDesc);
290,842✔
495
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
290,842✔
496
  initBasicInfo(&pInfo->binfo, pResBlock);
290,842✔
497

498
  code = blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
290,842✔
499
  QUERY_CHECK_CODE(code, lino, _error);
290,842✔
500

501
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
290,842✔
502
  pInfo->binfo.inputTsOrder = physiNode->inputTsOrder;
290,842✔
503
  pInfo->binfo.outputTsOrder = physiNode->outputTsOrder;
290,842✔
504
  pInfo->windowCount = pCountWindowNode->windowCount;
290,842✔
505
  pInfo->windowSliding = pCountWindowNode->windowSliding;
290,842✔
506
  // sizeof(SCountWindowResult)
507
  int32_t itemSize = sizeof(SCountWindowResult) - sizeof(SResultRow) + pInfo->aggSup.resultRowSize;
290,842✔
508
  int32_t numOfItem = 1;
290,842✔
509
  if (pInfo->windowCount != pInfo->windowSliding) {
290,842✔
510
    numOfItem = pInfo->windowCount / pInfo->windowSliding + 1;
19,694✔
511
  }
512

513
  pInfo->countSup.pWinStates = taosArrayInit_s(itemSize, numOfItem);
290,842✔
514
  if (!pInfo->countSup.pWinStates) {
290,842✔
515
    goto _error;
×
516
  }
517

518
  pInfo->countSup.stateIndex = 0;
290,842✔
519
  pInfo->pPreDataBlock = NULL;
290,842✔
520
  pInfo->preStateIndex = 0;
290,842✔
521
  pInfo->countSup.lastTs = INT64_MIN;
290,842✔
522
  pInfo->pOperator = pOperator;
290,842✔
523

524
  code = filterInitFromNode((SNode*)pCountWindowNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
290,842✔
525
                            pTaskInfo->pStreamRuntimeInfo);
290,842✔
526
  QUERY_CHECK_CODE(code, lino, _error);
290,842✔
527
  filterSetExecContext(pOperator->exprSupp.pFilterInfo, pTaskInfo, isTaskKilled);
290,842✔
528

529
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
290,842✔
530
  QUERY_CHECK_CODE(code, lino, _error);
290,842✔
531

532
  setOperatorInfo(pOperator, "CountWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT, true, OP_NOT_OPENED, pInfo,
290,842✔
533
                  pTaskInfo);
534
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, countWindowAggregateNext, NULL, destroyCountWindowOperatorInfo,
290,842✔
535
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
536
  setOperatorResetStateFn(pOperator, resetCountWindowOperatorState);                                    
290,842✔
537
  code = appendDownstream(pOperator, &downstream, 1);
290,842✔
538
  if (code != TSDB_CODE_SUCCESS) {
290,842✔
539
    goto _error;
×
540
  }
541

542
  *pOptrInfo = pOperator;
290,842✔
543
  return TSDB_CODE_SUCCESS;
290,842✔
544

545
_error:
×
546
  if (pInfo != NULL) {
×
547
    destroyCountWindowOperatorInfo(pInfo);
×
548
  }
549

550
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
551
  pTaskInfo->code = code;
×
552
  return code;
×
553
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc