• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3626

28 Feb 2025 03:34AM UTC coverage: 63.764% (+0.1%) from 63.633%
#3626

push

travis-ci

web-flow
Merge pull request #29961 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

149233 of 299935 branches covered (49.76%)

Branch coverage included in aggregate %.

53 of 91 new or added lines in 8 files covered. (58.24%)

3267 existing lines in 138 files now uncovered.

233601 of 300457 relevant lines covered (77.75%)

17374158.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.21
/source/libs/executor/src/streamcountwindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "executorInt.h"
16
#include "function.h"
17
#include "functionMgt.h"
18
#include "operator.h"
19
#include "querytask.h"
20
#include "streamexecutorInt.h"
21
#include "tchecksum.h"
22
#include "tcommon.h"
23
#include "tdatablock.h"
24
#include "tglobal.h"
25
#include "tlog.h"
26
#include "ttime.h"
27

28
#define STREAM_COUNT_OP_STATE_NAME      "StreamCountHistoryState"
29
#define STREAM_COUNT_OP_CHECKPOINT_NAME "StreamCountOperator_Checkpoint"
30

31
typedef struct SCountWindowInfo {
32
  SResultWindowInfo winInfo;
33
  COUNT_TYPE*       pWindowCount;
34
} SCountWindowInfo;
35

36
typedef enum {
37
  NONE_WINDOW = 0,
38
  CREATE_NEW_WINDOW,
39
  MOVE_NEXT_WINDOW,
40
} BuffOp;
41
typedef struct SBuffInfo {
42
  bool             rebuildWindow;
43
  BuffOp           winBuffOp;
44
  SStreamStateCur* pCur;
45
} SBuffInfo;
46

47
void destroyStreamCountAggOperatorInfo(void* param) {
320✔
48
  if (param == NULL) {
320!
49
    return;
×
50
  }
51
  SStreamCountAggOperatorInfo* pInfo = (SStreamCountAggOperatorInfo*)param;
320✔
52
  cleanupBasicInfo(&pInfo->binfo);
320✔
53
  if (pInfo->pOperator) {
320!
54
    cleanupResultInfoInStream(pInfo->pOperator->pTaskInfo, pInfo->streamAggSup.pState, &pInfo->pOperator->exprSupp,
320✔
55
                              &pInfo->groupResInfo);
56
    pInfo->pOperator = NULL;
320✔
57
  }
58

59
  destroyStreamBasicInfo(&pInfo->basic);
320✔
60
  destroyStreamAggSupporter(&pInfo->streamAggSup);
320✔
61
  cleanupExprSupp(&pInfo->scalarSupp);
320✔
62
  clearGroupResInfo(&pInfo->groupResInfo);
320✔
63
  taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
320✔
64
  pInfo->pUpdated = NULL;
320✔
65

66
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
320✔
67
  blockDataDestroy(pInfo->pDelRes);
320✔
68
  tSimpleHashCleanup(pInfo->pStUpdated);
320✔
69
  tSimpleHashCleanup(pInfo->pStDeleted);
320✔
70
  cleanupGroupResInfo(&pInfo->groupResInfo);
320✔
71

72
  taosArrayDestroy(pInfo->historyWins);
320✔
73
  blockDataDestroy(pInfo->pCheckpointRes);
320✔
74

75
  tSimpleHashCleanup(pInfo->pPkDeleted);
320✔
76

77
  taosMemoryFreeClear(param);
320!
78
}
79

80
bool isSlidingCountWindow(SStreamAggSupporter* pAggSup) { return pAggSup->windowCount != pAggSup->windowSliding; }
10,853✔
81

82
int32_t setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, SCountWindowInfo* pCurWin,
5,340✔
83
                          SBuffInfo* pBuffInfo, int32_t* pWinCode) {
84
  int32_t code = TSDB_CODE_SUCCESS;
5,340✔
85
  int32_t lino = 0;
5,340✔
86
  int32_t size = pAggSup->resultRowSize;
5,340✔
87
  pCurWin->winInfo.sessionWin.groupId = groupId;
5,340✔
88
  pCurWin->winInfo.sessionWin.win.skey = ts;
5,340✔
89
  pCurWin->winInfo.sessionWin.win.ekey = ts;
5,340✔
90

91
  if (isSlidingCountWindow(pAggSup)) {
5,340✔
92
    if (pBuffInfo->winBuffOp == CREATE_NEW_WINDOW) {
408✔
93
      code =
94
          pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin,
34✔
95
                                                     pAggSup->windowCount, (void**)&pCurWin->winInfo.pStatePos, &size);
34✔
96
      QUERY_CHECK_CODE(code, lino, _end);
34!
97

98
      *pWinCode = TSDB_CODE_FAILED;
34✔
99
    } else if (pBuffInfo->winBuffOp == MOVE_NEXT_WINDOW) {
374✔
100
      QUERY_CHECK_NULL(pBuffInfo->pCur, code, lino, _end, terrno);
17!
101
      pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pBuffInfo->pCur);
17✔
102
      *pWinCode = pAggSup->stateStore.streamStateSessionGetKVByCur(pBuffInfo->pCur, &pCurWin->winInfo.sessionWin,
34✔
103
                                                                   (void**)&pCurWin->winInfo.pStatePos, &size);
17✔
104
      if (*pWinCode == TSDB_CODE_FAILED) {
17✔
105
        code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin,
1✔
106
                                                          pAggSup->windowCount, (void**)&pCurWin->winInfo.pStatePos,
1✔
107
                                                          &size);
108
        QUERY_CHECK_CODE(code, lino, _end);
1!
109
      } else {
110
        reuseOutputBuf(pAggSup->pState, pCurWin->winInfo.pStatePos, &pAggSup->stateStore);
16✔
111
      }
112
    } else {
113
      pBuffInfo->pCur = pAggSup->stateStore.streamStateCountSeekKeyPrev(pAggSup->pState, &pCurWin->winInfo.sessionWin,
714✔
114
                                                                        pAggSup->windowCount);
357✔
115
      *pWinCode = pAggSup->stateStore.streamStateSessionGetKVByCur(pBuffInfo->pCur, &pCurWin->winInfo.sessionWin,
714✔
116
                                                                   (void**)&pCurWin->winInfo.pStatePos, &size);
357✔
117
      if (*pWinCode == TSDB_CODE_FAILED) {
357✔
118
        code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin,
222✔
119
                                                          pAggSup->windowCount, (void**)&pCurWin->winInfo.pStatePos,
222✔
120
                                                          &size);
121
        QUERY_CHECK_CODE(code, lino, _end);
222!
122
      } else {
123
        reuseOutputBuf(pAggSup->pState, pCurWin->winInfo.pStatePos, &pAggSup->stateStore);
135✔
124
      }
125
    }
126
    if (ts < pCurWin->winInfo.sessionWin.win.ekey) {
408✔
127
      pBuffInfo->rebuildWindow = true;
9✔
128
    }
129
  } else {
130
    code = pAggSup->stateStore.streamStateCountWinAddIfNotExist(pAggSup->pState, &pCurWin->winInfo.sessionWin,
4,932✔
131
                                                                pAggSup->windowCount,
4,932✔
132
                                                                (void**)&pCurWin->winInfo.pStatePos, &size, pWinCode);
4,932✔
133
    QUERY_CHECK_CODE(code, lino, _end);
4,928!
134
  }
135

136
  if (*pWinCode == TSDB_CODE_SUCCESS) {
5,336✔
137
    pCurWin->winInfo.isOutput = true;
3,199✔
138
  }
139
  pCurWin->pWindowCount =
5,336✔
140
      (COUNT_TYPE*)((char*)pCurWin->winInfo.pStatePos->pRowBuff + (pAggSup->resultRowSize - sizeof(COUNT_TYPE)));
5,336✔
141

142
  if (*pCurWin->pWindowCount == pAggSup->windowCount) {
5,336✔
143
    pBuffInfo->rebuildWindow = true;
31✔
144
  }
145

146
_end:
5,305✔
147
  if (code != TSDB_CODE_SUCCESS) {
5,336!
148
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
149
  }
150
  return code;
5,336✔
151
}
152

153
static void removeCountResult(SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey* pKey) {
3,386✔
154
  SSessionKey key = {0};
3,386✔
155
  getSessionHashKey(pKey, &key);
3,386✔
156
  int32_t code = tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey));
3,386✔
157
  if (code != TSDB_CODE_SUCCESS) {
3,384✔
158
    qInfo("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
1,728!
159
  }
160

161
  code = tSimpleHashRemove(pResMap, &key, sizeof(SSessionKey));
3,386✔
162
  if (code != TSDB_CODE_SUCCESS) {
3,385✔
163
    qInfo("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
2,491!
164
  }
165
}
3,386✔
166

167
static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowInfo* pWinInfo, TSKEY* pTs,
5,304✔
168
                                     int32_t start, int32_t rows, int32_t maxRows, SSHashObj* pStUpdated,
169
                                     SSHashObj* pStDeleted, bool* pRebuild, int32_t* pWinRows) {
170
  int32_t     code = TSDB_CODE_SUCCESS;
5,304✔
171
  int32_t     lino = 0;
5,304✔
172
  SSessionKey sWinKey = pWinInfo->winInfo.sessionWin;
5,304✔
173
  int32_t     num = 0;
5,304✔
174
  for (int32_t i = start; i < rows; i++) {
5,455✔
175
    if (pTs[i] < pWinInfo->winInfo.sessionWin.win.ekey) {
5,435✔
176
      num++;
151✔
177
    } else {
178
      break;
5,284✔
179
    }
180
  }
181
  int32_t maxNum = TMIN(maxRows - *pWinInfo->pWindowCount, rows - start);
5,304✔
182
  if (num > maxNum) {
5,304!
183
    *pRebuild = true;
×
184
  }
185
  *pWinInfo->pWindowCount += maxNum;
5,304✔
186
  bool needDelState = false;
5,304✔
187
  if (pWinInfo->winInfo.sessionWin.win.skey > pTs[start]) {
5,304✔
188
    needDelState = true;
9✔
189
    if (pStDeleted && pWinInfo->winInfo.isOutput) {
9!
190
      code = saveDeleteRes(pStDeleted, pWinInfo->winInfo.sessionWin);
9✔
191
      QUERY_CHECK_CODE(code, lino, _end);
9!
192
    }
193

194
    pWinInfo->winInfo.sessionWin.win.skey = pTs[start];
9✔
195
  }
196

197
  if (pWinInfo->winInfo.sessionWin.win.ekey < pTs[maxNum + start - 1]) {
5,304✔
198
    needDelState = true;
3,376✔
199
    pWinInfo->winInfo.sessionWin.win.ekey = pTs[maxNum + start - 1];
3,376✔
200
  }
201

202
  if (needDelState) {
5,304✔
203
    memcpy(pWinInfo->winInfo.pStatePos->pKey, &pWinInfo->winInfo.sessionWin, sizeof(SSessionKey));
3,385✔
204
    removeCountResult(pStUpdated, pAggSup->pResultRows, &sWinKey);
3,385✔
205
    if (pWinInfo->winInfo.pStatePos->needFree) {
3,387✔
206
      pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &sWinKey);
45✔
207
    }
208
  }
209

210
  (*pWinRows) = maxNum;
5,306✔
211

212
_end:
5,306✔
213
  if (code != TSDB_CODE_SUCCESS) {
5,306!
214
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
215
  }
216
  return code;
5,306✔
217
}
218

219
void getCountWinRange(SStreamAggSupporter* pAggSup, const SSessionKey* pKey, EStreamType mode, SSessionKey* pDelRange) {
94✔
220
  *pDelRange = *pKey;
94✔
221
  SStreamStateCur* pCur = NULL;
94✔
222
  if (isSlidingCountWindow(pAggSup)) {
94✔
223
    pCur = pAggSup->stateStore.streamStateCountSeekKeyPrev(pAggSup->pState, pKey, pAggSup->windowCount);
16✔
224
  } else {
225
    pCur = pAggSup->stateStore.streamStateSessionSeekKeyCurrentNext(pAggSup->pState, pKey);
78✔
226
  }
227
  SSessionKey tmpKey = {.groupId = pKey->groupId, .win.ekey = INT64_MIN, .win.skey = INT64_MIN};
94✔
228
  int32_t     code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0);
94✔
229
  if (code != TSDB_CODE_SUCCESS) {
94!
230
    pAggSup->stateStore.streamStateFreeCur(pCur);
×
231
    return;
×
232
  }
233
  pDelRange->win = tmpKey.win;
94✔
234
  while (mode == STREAM_DELETE_DATA || mode == STREAM_PARTITION_DELETE_DATA) {
202✔
235
    pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pCur);
175✔
236
    code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0);
175✔
237
    if (code != TSDB_CODE_SUCCESS) {
175✔
238
      break;
67✔
239
    }
240
    pDelRange->win.ekey = TMAX(pDelRange->win.ekey, tmpKey.win.ekey);
108✔
241
  }
242
  pAggSup->stateStore.streamStateFreeCur(pCur);
94✔
243
}
244

245
static void destroySBuffInfo(SStreamAggSupporter* pAggSup, SBuffInfo* pBuffInfo) {
5,123✔
246
  pAggSup->stateStore.streamStateFreeCur(pBuffInfo->pCur);
5,123✔
247
}
5,123✔
248

249
bool inCountCalSlidingWindow(SStreamAggSupporter* pAggSup, STimeWindow* pWin, TSKEY sKey, TSKEY eKey) {
5,366✔
250
  if (pAggSup->windowCount == pAggSup->windowSliding) {
5,366✔
251
    return true;
4,953✔
252
  }
253
  if (sKey <= pWin->skey && pWin->ekey <= eKey) {
413✔
254
    return true;
408✔
255
  }
256
  return false;
5✔
257
}
258

259
bool inCountSlidingWindow(SStreamAggSupporter* pAggSup, STimeWindow* pWin, SDataBlockInfo* pBlockInfo) {
5,336✔
260
  return inCountCalSlidingWindow(pAggSup, pWin, pBlockInfo->calWin.skey, pBlockInfo->calWin.ekey);
5,336✔
261
}
262

263
static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pStUpdated,
5,122✔
264
                                 SSHashObj* pStDeleted) {
265
  int32_t                      code = TSDB_CODE_SUCCESS;
5,122✔
266
  int32_t                      lino = 0;
5,122✔
267
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
5,122✔
268
  SStreamCountAggOperatorInfo* pInfo = pOperator->info;
5,122✔
269
  int32_t                      numOfOutput = pOperator->exprSupp.numOfExprs;
5,122✔
270
  uint64_t                     groupId = pSDataBlock->info.id.groupId;
5,122✔
271
  SResultRow*                  pResult = NULL;
5,122✔
272
  int32_t                      rows = pSDataBlock->info.rows;
5,122✔
273
  int32_t                      winRows = 0;
5,122✔
274
  SStreamAggSupporter*         pAggSup = &pInfo->streamAggSup;
5,122✔
275
  SBuffInfo                    buffInfo = {.rebuildWindow = false, .winBuffOp = NONE_WINDOW, .pCur = NULL};
5,122✔
276

277
  pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version);
5,122✔
278
  pAggSup->winRange = pTaskInfo->streamInfo.fillHistoryWindow;
5,122✔
279
  if (pAggSup->winRange.ekey <= 0) {
5,122!
280
    pAggSup->winRange.ekey = INT64_MAX;
×
281
  }
282

283
  SColumnInfoData* pStartTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
5,122✔
284
  if (!pStartTsCol) {
5,122!
285
    code = TSDB_CODE_FAILED;
×
286
    QUERY_CHECK_CODE(code, lino, _end);
×
287
  }
288
  TSKEY* startTsCols = (int64_t*)pStartTsCol->pData;
5,122✔
289
  code = blockDataEnsureCapacity(pAggSup->pScanBlock, rows * 2);
5,122✔
290
  QUERY_CHECK_CODE(code, lino, _end);
5,122!
291

292
  SStreamStateCur* pCur = NULL;
5,122✔
293
  COUNT_TYPE       slidingRows = 0;
5,122✔
294

295
  for (int32_t i = 0; i < rows;) {
10,433✔
296
    if (pInfo->ignoreExpiredData &&
10,256✔
297
        checkExpiredData(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pUpdateInfo, &pInfo->twAggSup,
4,913✔
298
                         pSDataBlock->info.id.uid, startTsCols[i], NULL, 0)) {
4,913✔
299
      i++;
3✔
300
      continue;
6✔
301
    }
302
    SCountWindowInfo curWin = {0};
5,340✔
303
    int32_t          winCode = TSDB_CODE_SUCCESS;
5,340✔
304
    buffInfo.rebuildWindow = false;
5,340✔
305
    code = setCountOutputBuf(pAggSup, startTsCols[i], groupId, &curWin, &buffInfo, &winCode);
5,340✔
306
    QUERY_CHECK_CODE(code, lino, _end);
5,336!
307

308
    if (winCode != TSDB_CODE_SUCCESS &&
5,336✔
309
        BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_OPEN)) {
2,138!
310
      code = addCountAggNotifyEvent(SNOTIFY_EVENT_WINDOW_OPEN, &curWin.winInfo.sessionWin, &pInfo->basic.notifyEventSup,
×
311
                                    pTaskInfo->streamInfo.pNotifyEventStat);
312
      QUERY_CHECK_CODE(code, lino, _end);
×
313
    }
314

315
    if (!inCountSlidingWindow(pAggSup, &curWin.winInfo.sessionWin.win, &pSDataBlock->info)) {
5,336✔
316
      buffInfo.winBuffOp = MOVE_NEXT_WINDOW;
3✔
317
      continue;
3✔
318
    }
319
    setSessionWinOutputInfo(pStUpdated, &curWin.winInfo);
5,332✔
320
    slidingRows = *curWin.pWindowCount;
5,335✔
321
    if (!buffInfo.rebuildWindow) {
5,335✔
322
      code = updateCountWindowInfo(pAggSup, &curWin, startTsCols, i, rows, pAggSup->windowCount, pStUpdated, pStDeleted,
5,304✔
323
                                   &buffInfo.rebuildWindow, &winRows);
324
      QUERY_CHECK_CODE(code, lino, _end);
5,306!
325
    }
326
    if (buffInfo.rebuildWindow) {
5,337✔
327
      SSessionKey range = {0};
31✔
328
      if (isSlidingCountWindow(pAggSup)) {
31✔
329
        curWin.winInfo.sessionWin.win.skey = startTsCols[i];
6✔
330
        curWin.winInfo.sessionWin.win.ekey = startTsCols[i];
6✔
331
      }
332
      getCountWinRange(pAggSup, &curWin.winInfo.sessionWin, STREAM_DELETE_DATA, &range);
31✔
333
      range.win.skey = TMIN(startTsCols[i], range.win.skey);
31✔
334
      range.win.ekey = TMAX(startTsCols[rows - 1], range.win.ekey);
31✔
335
      uint64_t uid = 0;
31✔
336
      code =
337
          appendDataToSpecialBlock(pAggSup->pScanBlock, &range.win.skey, &range.win.ekey, &uid, &range.groupId, NULL);
31✔
338
      QUERY_CHECK_CODE(code, lino, _end);
31!
339
      break;
31✔
340
    }
341
    code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &curWin.winInfo, &pResult, i, winRows, rows, numOfOutput,
5,306✔
342
                              pOperator, 0);
343
    QUERY_CHECK_CODE(code, lino, _end);
5,306!
344

345
    code = saveSessionOutputBuf(pAggSup, &curWin.winInfo);
5,306✔
346
    QUERY_CHECK_CODE(code, lino, _end);
5,305!
347

348
    if (pInfo->destHasPrimaryKey && curWin.winInfo.isOutput && IS_NORMAL_COUNT_OP(pOperator)) {
5,305!
349
      code = saveDeleteRes(pInfo->pPkDeleted, curWin.winInfo.sessionWin);
1✔
350
      QUERY_CHECK_CODE(code, lino, _end);
1!
351
    }
352

353
    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pStUpdated) {
5,305!
354
      code = saveResult(curWin.winInfo, pStUpdated);
4,180✔
355
      QUERY_CHECK_CODE(code, lino, _end);
4,181!
356
    }
357
    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
5,306✔
358
      curWin.winInfo.pStatePos->beUpdated = true;
1,125✔
359
      SSessionKey key = {0};
1,125✔
360
      getSessionHashKey(&curWin.winInfo.sessionWin, &key);
1,125✔
361
      code =
362
          tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
1,125✔
363
      QUERY_CHECK_CODE(code, lino, _end);
1,124!
364
    }
365

366
    if (isSlidingCountWindow(pAggSup)) {
5,305✔
367
      if (slidingRows <= pAggSup->windowSliding) {
399✔
368
        if (slidingRows + winRows > pAggSup->windowSliding) {
385✔
369
          buffInfo.winBuffOp = CREATE_NEW_WINDOW;
34✔
370
          winRows = pAggSup->windowSliding - slidingRows;
34✔
371
        }
372
      } else {
373
        buffInfo.winBuffOp = MOVE_NEXT_WINDOW;
14✔
374
        winRows = 0;
14✔
375
      }
376
    }
377
    i += winRows;
5,305✔
378
  }
379

380
_end:
5,092✔
381
  if (code != TSDB_CODE_SUCCESS) {
5,123!
382
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
383
  }
384
  destroySBuffInfo(pAggSup, &buffInfo);
5,123✔
385
}
5,123✔
386

387
static int32_t buildCountResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
5,854✔
388
  int32_t                      code = TSDB_CODE_SUCCESS;
5,854✔
389
  int32_t                      lino = 0;
5,854✔
390
  SStreamCountAggOperatorInfo* pInfo = pOperator->info;
5,854✔
391
  SStreamAggSupporter*         pAggSup = &pInfo->streamAggSup;
5,854✔
392
  SOptrBasicInfo*              pBInfo = &pInfo->binfo;
5,854✔
393
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
5,854✔
394
  SStreamNotifyEventSupp*      pNotifySup = &pInfo->basic.notifyEventSup;
5,854✔
395
  STaskNotifyEventStat*        pNotifyEventStat = pTaskInfo->streamInfo.pNotifyEventStat;
5,854✔
396
  bool                         addNotifyEvent = false;
5,854✔
397
  addNotifyEvent = BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_CLOSE);
5,854✔
398
  doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator, &pInfo->groupResInfo);
5,854✔
399
  if (pInfo->pDelRes->info.rows > 0) {
5,854✔
400
    printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
49✔
401
    if (addNotifyEvent) {
49!
402
      code = addAggDeleteNotifyEvent(pInfo->pDelRes, pNotifySup, pNotifyEventStat);
×
403
      QUERY_CHECK_CODE(code, lino, _end);
×
404
    }
405
    (*ppRes) = pInfo->pDelRes;
49✔
406
    return code;
49✔
407
  }
408

409
  doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes,
5,805!
410
                       addNotifyEvent ? pNotifySup->pSessionKeys : NULL);
411
  if (pBInfo->pRes->info.rows > 0) {
5,807✔
412
    printDataBlock(pBInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
2,347✔
413
    if (addNotifyEvent) {
2,347!
414
      code = addAggResultNotifyEvent(pBInfo->pRes, pNotifySup->pSessionKeys, pTaskInfo->streamInfo.notifyResultSchema,
×
415
                                     pNotifySup, pNotifyEventStat);
416
      QUERY_CHECK_CODE(code, lino, _end);
×
417
    }
418
    (*ppRes) = pBInfo->pRes;
2,347✔
419
    return code;
2,347✔
420
  }
421

422
  code = buildNotifyEventBlock(pTaskInfo, pNotifySup, pNotifyEventStat);
3,460✔
423
  QUERY_CHECK_CODE(code, lino, _end);
3,459!
424
  if (pNotifySup->pEventBlock && pNotifySup->pEventBlock->info.rows > 0) {
3,459!
425
    printDataBlock(pNotifySup->pEventBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
426
    (*ppRes) = pNotifySup->pEventBlock;
×
427
    return code;
×
428
  }
429

430
  code = removeOutdatedNotifyEvents(&pInfo->twAggSup, pNotifySup, pNotifyEventStat);
3,459✔
431
  QUERY_CHECK_CODE(code, lino, _end);
3,460!
432

433
_end:
3,460✔
434
  if (code != TSDB_CODE_SUCCESS) {
3,460!
435
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
436
  }
437
  (*ppRes) = NULL;
3,460✔
438
  return code;
3,460✔
439
}
440

UNCOV
441
int32_t doStreamCountEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOperator, bool isParent) {
×
UNCOV
442
  SStreamCountAggOperatorInfo* pInfo = pOperator->info;
×
UNCOV
443
  if (!pInfo) {
×
444
    return 0;
×
445
  }
446

UNCOV
447
  void* pData = (buf == NULL) ? NULL : *buf;
×
448

449
  // 1.streamAggSup.pResultRows
UNCOV
450
  int32_t tlen = 0;
×
UNCOV
451
  int32_t mapSize = tSimpleHashGetSize(pInfo->streamAggSup.pResultRows);
×
UNCOV
452
  tlen += taosEncodeFixedI32(buf, mapSize);
×
UNCOV
453
  void*   pIte = NULL;
×
UNCOV
454
  size_t  keyLen = 0;
×
UNCOV
455
  int32_t iter = 0;
×
UNCOV
456
  while ((pIte = tSimpleHashIterate(pInfo->streamAggSup.pResultRows, pIte, &iter)) != NULL) {
×
UNCOV
457
    void* key = tSimpleHashGetKey(pIte, &keyLen);
×
UNCOV
458
    tlen += encodeSSessionKey(buf, key);
×
UNCOV
459
    tlen += encodeSResultWindowInfo(buf, pIte, pInfo->streamAggSup.resultRowSize);
×
460
  }
461

462
  // 2.twAggSup
UNCOV
463
  tlen += encodeSTimeWindowAggSupp(buf, &pInfo->twAggSup);
×
464

465
  // 3.dataVersion
UNCOV
466
  tlen += taosEncodeFixedI32(buf, pInfo->dataVersion);
×
467

468
  // 4.basicInfo
UNCOV
469
  tlen += encodeStreamBasicInfo(buf, &pInfo->basic);
×
470

471
  // 5.checksum
UNCOV
472
  if (isParent) {
×
UNCOV
473
    if (buf) {
×
UNCOV
474
      uint32_t cksum = taosCalcChecksum(0, pData, len - sizeof(uint32_t));
×
UNCOV
475
      tlen += taosEncodeFixedU32(buf, cksum);
×
476
    } else {
UNCOV
477
      tlen += sizeof(uint32_t);
×
478
    }
479
  }
480

UNCOV
481
  return tlen;
×
482
}
483

484
int32_t doStreamCountDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperator, bool isParent) {
×
485
  int32_t                      code = TSDB_CODE_SUCCESS;
×
486
  int32_t                      lino = 0;
×
487
  SStreamCountAggOperatorInfo* pInfo = pOperator->info;
×
488
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
×
489
  void*                        pDataEnd = POINTER_SHIFT(buf, len);
×
490
  if (!pInfo) {
×
491
    code = TSDB_CODE_FAILED;
×
492
    QUERY_CHECK_CODE(code, lino, _end);
×
493
  }
494

495
  // 5.checksum
496
  if (isParent) {
×
497
    int32_t dataLen = len - sizeof(uint32_t);
×
498
    void*   pCksum = POINTER_SHIFT(buf, dataLen);
×
499
    if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) {
×
500
      code = TSDB_CODE_FAILED;
×
501
      QUERY_CHECK_CODE(code, lino, _end);
×
502
    }
503
    pDataEnd = pCksum;
×
504
  }
505

506
  // 1.streamAggSup.pResultRows
507
  int32_t mapSize = 0;
×
508
  buf = taosDecodeFixedI32(buf, &mapSize);
×
509
  for (int32_t i = 0; i < mapSize; i++) {
×
510
    SSessionKey      key = {0};
×
511
    SCountWindowInfo curWin = {0};
×
512
    int32_t          winCode = TSDB_CODE_SUCCESS;
×
513
    buf = decodeSSessionKey(buf, &key);
×
514
    SBuffInfo buffInfo = {.rebuildWindow = false, .winBuffOp = NONE_WINDOW, .pCur = NULL};
×
515
    code = setCountOutputBuf(&pInfo->streamAggSup, key.win.skey, key.groupId, &curWin, &buffInfo, &winCode);
×
516
    QUERY_CHECK_CODE(code, lino, _end);
×
517

518
    buf = decodeSResultWindowInfo(buf, &curWin.winInfo, pInfo->streamAggSup.resultRowSize);
×
519
    code = tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo,
×
520
                          sizeof(SResultWindowInfo));
521
    QUERY_CHECK_CODE(code, lino, _end);
×
522
  }
523

524
  // 2.twAggSup
525
  buf = decodeSTimeWindowAggSupp(buf, &pInfo->twAggSup);
×
526

527
  // 3.dataVersion
528
  buf = taosDecodeFixedI64(buf, &pInfo->dataVersion);
×
529

530
  // 4.basicInfo
531
  if (buf < pDataEnd) {
×
532
    code = decodeStreamBasicInfo(&buf, &pInfo->basic);
×
533
    QUERY_CHECK_CODE(code, lino, _end);
×
534
  }
535

536
_end:
×
537
  if (code != TSDB_CODE_SUCCESS) {
×
538
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
539
  }
540
  return code;
×
541
}
542

543
void doStreamCountSaveCheckpoint(SOperatorInfo* pOperator) {
79✔
544
  int32_t                      code = TSDB_CODE_SUCCESS;
79✔
545
  int32_t                      lino = 0;
79✔
546
  void*                        pBuf = NULL;
79✔
547
  SStreamCountAggOperatorInfo* pInfo = pOperator->info;
79✔
548
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
79✔
549
  if (needSaveStreamOperatorInfo(&pInfo->basic)) {
79!
UNCOV
550
    int32_t len = doStreamCountEncodeOpState(NULL, 0, pOperator, true);
×
UNCOV
551
    pBuf = taosMemoryCalloc(1, len);
×
UNCOV
552
    if (!pBuf) {
×
553
      code = terrno;
×
554
      QUERY_CHECK_CODE(code, lino, _end);
×
555
    }
UNCOV
556
    void* pTmpBuf = pBuf;
×
UNCOV
557
    len = doStreamCountEncodeOpState(&pTmpBuf, len, pOperator, true);
×
UNCOV
558
    pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_COUNT_OP_CHECKPOINT_NAME,
×
559
                                                       strlen(STREAM_COUNT_OP_CHECKPOINT_NAME), pBuf, len);
UNCOV
560
    saveStreamOperatorStateComplete(&pInfo->basic);
×
561
  }
562

563
_end:
79✔
564
  taosMemoryFreeClear(pBuf);
79!
565
  if (code != TSDB_CODE_SUCCESS) {
79!
566
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
567
  }
568
}
79✔
569

570
void doResetCountWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock) {
27✔
571
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
27✔
572
  TSKEY*           startDatas = (TSKEY*)pStartTsCol->pData;
27✔
573
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
27✔
574
  TSKEY*           endDatas = (TSKEY*)pEndTsCol->pData;
27✔
575
  SColumnInfoData* pCalStartTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
27✔
576
  TSKEY*           calStartDatas = (TSKEY*)pStartTsCol->pData;
27✔
577
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
27✔
578
  TSKEY*           calEndDatas = (TSKEY*)pEndTsCol->pData;
27✔
579
  SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
27✔
580
  uint64_t*        gpDatas = (uint64_t*)pGroupCol->pData;
27✔
581

582
  SRowBuffPos* pPos = NULL;
27✔
583
  int32_t      size = 0;
27✔
584
  for (int32_t i = 0; i < pBlock->info.rows; i++) {
54✔
585
    SSessionKey      key = {.groupId = gpDatas[i], .win.skey = startDatas[i], .win.ekey = endDatas[i]};
27✔
586
    SStreamStateCur* pCur = NULL;
27✔
587
    if (isSlidingCountWindow(pAggSup)) {
27✔
588
      pCur = pAggSup->stateStore.streamStateCountSeekKeyPrev(pAggSup->pState, &key, pAggSup->windowCount);
2✔
589
    } else {
590
      pCur = pAggSup->stateStore.streamStateSessionSeekKeyCurrentNext(pAggSup->pState, &key);
25✔
591
    }
592
    while (1) {
30✔
593
      SSessionKey tmpKey = {.groupId = gpDatas[i], .win.skey = INT64_MIN, .win.ekey = INT64_MIN};
57✔
594
      int32_t     code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &tmpKey, (void**)&pPos, &size);
57✔
595
      if (code != TSDB_CODE_SUCCESS || tmpKey.win.skey > endDatas[i]) {
57✔
596
        pAggSup->stateStore.streamStateFreeCur(pCur);
27✔
597
        break;
27✔
598
      }
599
      if (!inCountCalSlidingWindow(pAggSup, &tmpKey.win, calStartDatas[i], calEndDatas[i])) {
30✔
600
        pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pCur);
2✔
601
        continue;
2✔
602
      }
603
      pAggSup->stateStore.streamStateSessionReset(pAggSup->pState, pPos->pRowBuff);
28✔
604
      pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pCur);
28✔
605
    }
606
  }
607
}
27✔
608

609
int32_t doDeleteCountWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SArray* result) {
12✔
610
  int32_t          code = TSDB_CODE_SUCCESS;
12✔
611
  int32_t          lino = 0;
12✔
612
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
12✔
613
  TSKEY*           startDatas = (TSKEY*)pStartTsCol->pData;
12✔
614
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
12✔
615
  TSKEY*           endDatas = (TSKEY*)pEndTsCol->pData;
12✔
616
  SColumnInfoData* pCalStartTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
12✔
617
  TSKEY*           calStartDatas = (TSKEY*)pStartTsCol->pData;
12✔
618
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
12✔
619
  TSKEY*           calEndDatas = (TSKEY*)pEndTsCol->pData;
12✔
620
  SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
12✔
621
  uint64_t*        gpDatas = (uint64_t*)pGroupCol->pData;
12✔
622
  for (int32_t i = 0; i < pBlock->info.rows; i++) {
26✔
623
    SSessionKey key = {.win.skey = startDatas[i], .win.ekey = endDatas[i], .groupId = gpDatas[i]};
14✔
624
    while (1) {
24✔
625
      SSessionKey curWin = {0};
38✔
626
      int32_t     winCode = pAggSup->stateStore.streamStateCountGetKeyByRange(pAggSup->pState, &key, &curWin);
38✔
627
      if (winCode != TSDB_CODE_SUCCESS) {
38✔
628
        break;
14✔
629
      }
630
      doDeleteSessionWindow(pAggSup, &curWin);
24✔
631
      if (result) {
24!
632
        code = saveDeleteInfo(result, curWin);
24✔
633
        QUERY_CHECK_CODE(code, lino, _end);
24!
634
      }
635
    }
636
  }
637

638
_end:
12✔
639
  if (code != TSDB_CODE_SUCCESS) {
12!
640
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
641
  }
642
  return code;
12✔
643
}
644

645
int32_t deleteCountWinState(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHashObj* pMapUpdate,
60✔
646
                            SSHashObj* pMapDelete, SSHashObj* pPkDelete, bool needAdd) {
647
  int32_t code = TSDB_CODE_SUCCESS;
60✔
648
  int32_t lino = 0;
60✔
649
  SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
60✔
650
  if (!pWins) {
60!
651
    code = terrno;
×
652
    QUERY_CHECK_CODE(code, lino, _end);
×
653
  }
654

655
  if (isSlidingCountWindow(pAggSup)) {
60✔
656
    code = doDeleteCountWindows(pAggSup, pBlock, pWins);
12✔
657
    QUERY_CHECK_CODE(code, lino, _end);
12!
658
  } else {
659
    code = doDeleteTimeWindows(pAggSup, pBlock, pWins);
48✔
660
    QUERY_CHECK_CODE(code, lino, _end);
48!
661
  }
662
  removeSessionResults(pAggSup, pMapUpdate, pWins);
60✔
663
  code = copyDeleteWindowInfo(pWins, pMapDelete);
60✔
664
  QUERY_CHECK_CODE(code, lino, _end);
60!
665
  if (needAdd) {
60!
666
    code = copyDeleteWindowInfo(pWins, pPkDelete);
×
667
    QUERY_CHECK_CODE(code, lino, _end);
×
668
  }
669
  taosArrayDestroy(pWins);
60✔
670

671
_end:
60✔
672
  if (code != TSDB_CODE_SUCCESS) {
60!
673
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
674
  }
675
  return code;
60✔
676
}
677

678
static int32_t doStreamCountAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
7,547✔
679
  int32_t                      code = TSDB_CODE_SUCCESS;
7,547✔
680
  int32_t                      lino = 0;
7,547✔
681
  SExprSupp*                   pSup = &pOperator->exprSupp;
7,547✔
682
  SStreamCountAggOperatorInfo* pInfo = pOperator->info;
7,547✔
683
  SOptrBasicInfo*              pBInfo = &pInfo->binfo;
7,547✔
684
  SStreamAggSupporter*         pAggSup = &pInfo->streamAggSup;
7,547✔
685
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
7,547✔
686
  qDebug("stask:%s  %s status: %d", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status);
7,547✔
687
  if (pOperator->status == OP_EXEC_DONE) {
7,547!
688
    (*ppRes) = NULL;
×
689
    return code;
×
690
  } else if (pOperator->status == OP_RES_TO_RETURN) {
7,547✔
691
    SSDataBlock* opRes = NULL;
2,396✔
692
    code = buildCountResult(pOperator, &opRes);
2,396✔
693
    QUERY_CHECK_CODE(code, lino, _end);
2,396!
694
    if (opRes) {
2,396✔
695
      (*ppRes) = opRes;
1,205✔
696
      return code;
2,396✔
697
    }
698

699
    if (pInfo->recvGetAll) {
1,191!
700
      pInfo->recvGetAll = false;
×
701
      resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows);
×
702
    }
703

704
    if (pInfo->reCkBlock) {
1,191!
705
      pInfo->reCkBlock = false;
×
706
      printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
707
      (*ppRes) = pInfo->pCheckpointRes;
×
708
      return code;
×
709
    }
710

711
    setStreamOperatorCompleted(pOperator);
1,191✔
712
    (*ppRes) = NULL;
1,191✔
713
    return code;
1,191✔
714
  }
715

716
  SOperatorInfo* downstream = pOperator->pDownstream[0];
5,151✔
717
  if (!pInfo->pUpdated) {
5,151✔
718
    pInfo->pUpdated = taosArrayInit(16, sizeof(SResultWindowInfo));
3,460✔
719
    QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
3,460!
720
  }
721
  if (!pInfo->pStUpdated) {
5,151✔
722
    _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
3,460✔
723
    pInfo->pStUpdated = tSimpleHashInit(64, hashFn);
3,460✔
724
    QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _end, terrno);
3,459!
725
  }
726
  while (1) {
5,289✔
727
    SSDataBlock* pBlock = NULL;
10,439✔
728
    code = downstream->fpSet.getNextFn(downstream, &pBlock);
10,439✔
729
    QUERY_CHECK_CODE(code, lino, _end);
10,437!
730

731
    if (pBlock == NULL) {
10,437✔
732
      break;
3,459✔
733
    }
734

735
    printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
6,978✔
736
    setStreamOperatorState(&pInfo->basic, pBlock->info.type);
6,979✔
737

738
    if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
6,979✔
739
      bool add = pInfo->destHasPrimaryKey && IS_NORMAL_COUNT_OP(pOperator);
60!
740
      code = deleteCountWinState(&pInfo->streamAggSup, pBlock, pInfo->pStUpdated, pInfo->pStDeleted, pInfo->pPkDeleted,
60✔
741
                                 add);
742
      QUERY_CHECK_CODE(code, lino, _end);
60!
743
      continue;
166✔
744
    } else if (pBlock->info.type == STREAM_CLEAR) {
6,919✔
745
      doResetCountWindows(&pInfo->streamAggSup, pBlock);
27✔
746
      continue;
27✔
747
    } else if (pBlock->info.type == STREAM_GET_ALL) {
6,892!
748
      pInfo->recvGetAll = true;
×
749
      code = getAllSessionWindow(pAggSup->pResultRows, pInfo->pStUpdated);
×
750
      QUERY_CHECK_CODE(code, lino, _end);
×
751
      continue;
×
752
    } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
6,892✔
753
      (*ppRes) = pBlock;
1,691✔
754
      return code;
1,691✔
755
    } else if (pBlock->info.type == STREAM_CHECKPOINT) {
5,201✔
756
      pAggSup->stateStore.streamStateCommit(pAggSup->pState);
79✔
757
      doStreamCountSaveCheckpoint(pOperator);
79✔
758
      code = copyDataBlock(pInfo->pCheckpointRes, pBlock);
79✔
759
      QUERY_CHECK_CODE(code, lino, _end);
79!
760
      continue;
79✔
761
    } else {
762
      if (pBlock->info.type != STREAM_NORMAL && pBlock->info.type != STREAM_INVALID) {
5,122!
763
        code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
764
        QUERY_CHECK_CODE(code, lino, _end);
×
765
      }
766
    }
767

768
    if (pInfo->scalarSupp.pExprInfo != NULL) {
5,122!
769
      SExprSupp* pExprSup = &pInfo->scalarSupp;
×
770
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
×
771
      QUERY_CHECK_CODE(code, lino, _end);
×
772
    }
773
    // the pDataBlock are always the same one, no need to call this again
774
    code = setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
5,122✔
775
    QUERY_CHECK_CODE(code, lino, _end);
5,122!
776
    doStreamCountAggImpl(pOperator, pBlock, pInfo->pStUpdated, pInfo->pStDeleted);
5,122✔
777
    pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
5,123✔
778
    pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.watermark);
5,123✔
779
  }
780
  // restore the value
781
  pOperator->status = OP_RES_TO_RETURN;
3,459✔
782

783
  code = closeSessionWindow(pAggSup->pResultRows, &pInfo->twAggSup, pInfo->pStUpdated);
3,459✔
784
  QUERY_CHECK_CODE(code, lino, _end);
3,459!
785

786
  code = copyUpdateResult(&pInfo->pStUpdated, pInfo->pUpdated, sessionKeyCompareAsc);
3,459✔
787
  QUERY_CHECK_CODE(code, lino, _end);
3,459!
788

789
  removeSessionDeleteResults(pInfo->pStDeleted, pInfo->pUpdated);
3,459✔
790
  initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
3,459✔
791
  pInfo->pUpdated = NULL;
3,459✔
792
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
3,459✔
793
  QUERY_CHECK_CODE(code, lino, _end);
3,458!
794

795
  if (pInfo->destHasPrimaryKey && IS_NORMAL_COUNT_OP(pOperator)) {
3,458!
796
    code = copyDeleteSessionKey(pInfo->pPkDeleted, pInfo->pStDeleted);
3✔
797
    QUERY_CHECK_CODE(code, lino, _end);
3!
798
  }
799

800
  SSDataBlock* opRes = NULL;
3,458✔
801
  code = buildCountResult(pOperator, &opRes);
3,458✔
802
  QUERY_CHECK_CODE(code, lino, _end);
3,460!
803
  if (opRes) {
3,460✔
804
    (*ppRes) = opRes;
1,191✔
805
    return code;
1,191✔
806
  }
807

808
_end:
2,269✔
809
  if (code != TSDB_CODE_SUCCESS) {
2,269!
810
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
811
    pTaskInfo->code = code;
×
812
    T_LONG_JMP(pTaskInfo->env, code);
×
813
  }
814
  setStreamOperatorCompleted(pOperator);
2,269✔
815
  (*ppRes) = NULL;
2,269✔
816
  return code;
2,269✔
817
}
818

819
void streamCountReleaseState(SOperatorInfo* pOperator) {
78✔
820
  int32_t                      code = TSDB_CODE_SUCCESS;
78✔
821
  int32_t                      lino = 0;
78✔
822
  SStreamEventAggOperatorInfo* pInfo = pOperator->info;
78✔
823
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
78✔
824
  int32_t                      resSize = sizeof(TSKEY);
78✔
825
  char*                        pBuff = taosMemoryCalloc(1, resSize);
78!
826
  QUERY_CHECK_NULL(pBuff, code, lino, _end, terrno);
79!
827

828
  memcpy(pBuff, &pInfo->twAggSup.maxTs, sizeof(TSKEY));
79✔
829
  qDebug("===stream=== count window operator relase state. ");
79✔
830
  pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_COUNT_OP_STATE_NAME,
79✔
831
                                                     strlen(STREAM_COUNT_OP_STATE_NAME), pBuff, resSize);
832
  pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState);
79✔
833
  taosMemoryFreeClear(pBuff);
79!
834
  SOperatorInfo* downstream = pOperator->pDownstream[0];
79✔
835
  if (downstream->fpSet.releaseStreamStateFn) {
79!
836
    downstream->fpSet.releaseStreamStateFn(downstream);
79✔
837
  }
838
_end:
×
839
  if (code != TSDB_CODE_SUCCESS) {
79!
840
    terrno = code;
×
841
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
842
  }
843
}
79✔
844

845
void streamCountReloadState(SOperatorInfo* pOperator) {
79✔
846
  int32_t                      code = TSDB_CODE_SUCCESS;
79✔
847
  int32_t                      lino = 0;
79✔
848
  SStreamCountAggOperatorInfo* pInfo = pOperator->info;
79✔
849
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
79✔
850
  SStreamAggSupporter*         pAggSup = &pInfo->streamAggSup;
79✔
851
  int32_t                      size = 0;
79✔
852
  void*                        pBuf = NULL;
79✔
853

854
  code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_COUNT_OP_STATE_NAME,
79✔
855
                                                strlen(STREAM_COUNT_OP_STATE_NAME), &pBuf, &size);
856
  QUERY_CHECK_CODE(code, lino, _end);
79!
857

858
  TSKEY ts = *(TSKEY*)pBuf;
79✔
859
  pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts);
79✔
860
  taosMemoryFree(pBuf);
79!
861

862
  SOperatorInfo* downstream = pOperator->pDownstream[0];
79✔
863
  if (downstream->fpSet.reloadStreamStateFn) {
79!
864
    downstream->fpSet.reloadStreamStateFn(downstream);
79✔
865
  }
866
  reloadAggSupFromDownStream(downstream, &pInfo->streamAggSup);
79✔
867

868
_end:
79✔
869
  if (code != TSDB_CODE_SUCCESS) {
79!
870
    terrno = code;
×
871
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
872
  }
873
}
79✔
874

875
int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
320✔
876
                                         SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
877
  QRY_PARAM_CHECK(pOptrInfo);
320!
878

879
  SCountWinodwPhysiNode*       pCountNode = (SCountWinodwPhysiNode*)pPhyNode;
320✔
880
  int32_t                      numOfCols = 0;
320✔
881
  int32_t                      code = TSDB_CODE_SUCCESS;
320✔
882
  int32_t                      lino = 0;
320✔
883
  SStreamCountAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamCountAggOperatorInfo));
320!
884
  SOperatorInfo*               pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
320!
885
  if (pInfo == NULL || pOperator == NULL) {
320!
886
    code = terrno;
×
887
    QUERY_CHECK_CODE(code, lino, _error);
×
888
  }
889

890
  pOperator->pTaskInfo = pTaskInfo;
320✔
891

892
  initResultSizeInfo(&pOperator->resultInfo, 4096);
320✔
893
  if (pCountNode->window.pExprs != NULL) {
320!
894
    int32_t    numOfScalar = 0;
×
895
    SExprInfo* pScalarExprInfo = NULL;
×
896
    code = createExprInfo(pCountNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
×
897
    QUERY_CHECK_CODE(code, lino, _error);
×
898

899
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
×
900
    QUERY_CHECK_CODE(code, lino, _error);
×
901
  }
902
  SExprSupp* pExpSup = &pOperator->exprSupp;
320✔
903

904
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
320✔
905
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
320!
906
  pInfo->binfo.pRes = pResBlock;
320✔
907

908
  SExprInfo* pExprInfo = NULL;
320✔
909
  code = createExprInfo(pCountNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
320✔
910
  QUERY_CHECK_CODE(code, lino, _error);
320!
911

912
  code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
320✔
913
  QUERY_CHECK_CODE(code, lino, _error);
320!
914

915
  pInfo->twAggSup = (STimeWindowAggSupp){
320✔
916
      .waterMark = pCountNode->window.watermark,
320✔
917
      .calTrigger = pCountNode->window.triggerType,
320✔
918
      .maxTs = INT64_MIN,
919
      .minTs = INT64_MAX,
920
      .deleteMark = getDeleteMark(&pCountNode->window, 0),
320✔
921
  };
922

923
  pInfo->primaryTsIndex = ((SColumnNode*)pCountNode->window.pTspk)->slotId;
320✔
924
  code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState,
320✔
925
                                sizeof(COUNT_TYPE), 0, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup,
926
                                GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex,
320✔
927
                                STREAM_STATE_BUFF_SORT, 1);
928
  QUERY_CHECK_CODE(code, lino, _error);
320!
929

930
  pInfo->streamAggSup.windowCount = pCountNode->windowCount;
320✔
931
  pInfo->streamAggSup.windowSliding = pCountNode->windowSliding;
320✔
932

933
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
320✔
934
  QUERY_CHECK_CODE(code, lino, _error);
320!
935

936
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
320✔
937
  pInfo->pStDeleted = tSimpleHashInit(64, hashFn);
320✔
938
  QUERY_CHECK_NULL(pInfo->pStDeleted, code, lino, _error, terrno);
320!
939
  pInfo->pDelIterator = NULL;
320✔
940

941
  code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
320✔
942
  QUERY_CHECK_CODE(code, lino, _error);
320!
943

944
  pInfo->ignoreExpiredData = pCountNode->window.igExpired;
320✔
945
  pInfo->ignoreExpiredDataSaved = false;
320✔
946
  pInfo->pUpdated = NULL;
320✔
947
  pInfo->pStUpdated = NULL;
320✔
948
  pInfo->dataVersion = 0;
320✔
949
  pInfo->historyWins = taosArrayInit(4, sizeof(SSessionKey));
320✔
950
  if (!pInfo->historyWins) {
320!
951
    code = terrno;
×
952
    QUERY_CHECK_CODE(code, lino, _error);
×
953
  }
954

955
  code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes);
320✔
956
  QUERY_CHECK_CODE(code, lino, _error);
320!
957

958
  pInfo->recvGetAll = false;
320✔
959
  pInfo->pPkDeleted = tSimpleHashInit(64, hashFn);
320✔
960
  QUERY_CHECK_NULL(pInfo->pPkDeleted, code, lino, _error, terrno);
320!
961
  pInfo->destHasPrimaryKey = pCountNode->window.destHasPrimaryKey;
320✔
962

963
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT;
320✔
964
  setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT, true,
320✔
965
                  OP_NOT_OPENED, pInfo, pTaskInfo);
966
  // for stream
967
  void*   buff = NULL;
320✔
968
  int32_t len = 0;
320✔
969
  int32_t res =
970
      pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_COUNT_OP_CHECKPOINT_NAME,
320✔
971
                                                        strlen(STREAM_COUNT_OP_CHECKPOINT_NAME), &buff, &len);
972
  if (res == TSDB_CODE_SUCCESS) {
320!
973
    code = doStreamCountDecodeOpState(buff, len, pOperator, true);
×
974
    QUERY_CHECK_CODE(code, lino, _error);
×
975
    taosMemoryFree(buff);
×
976
  }
977
  pInfo->pOperator = pOperator;
320✔
978
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamCountAggNext, NULL, destroyStreamCountAggOperatorInfo,
320✔
979
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
980
  setOperatorStreamStateFn(pOperator, streamCountReleaseState, streamCountReloadState);
320✔
981

982
  code = initStreamBasicInfo(&pInfo->basic, pOperator);
320✔
983
  QUERY_CHECK_CODE(code, lino, _error);
320!
984

985
  if (downstream) {
320!
986
    code = initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex,
320✔
987
                          &pInfo->twAggSup, &pInfo->basic);
320✔
988
    QUERY_CHECK_CODE(code, lino, _error);
320!
989

990
    code = appendDownstream(pOperator, &downstream, 1);
320✔
991
    QUERY_CHECK_CODE(code, lino, _error);
320!
992
  }
993

994
  *pOptrInfo = pOperator;
320✔
995
  return TSDB_CODE_SUCCESS;
320✔
996

997
_error:
×
998
  if (pInfo != NULL) {
×
999
    destroyStreamCountAggOperatorInfo(pInfo);
×
1000
  }
1001

1002
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1003
  pTaskInfo->code = code;
×
1004
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1005
  return code;
×
1006
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc