• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3535

23 Nov 2024 02:07AM UTC coverage: 60.85% (+0.03%) from 60.825%
#3535

push

travis-ci

web-flow
Merge pull request #28893 from taosdata/doc/internal

refact: rename taos lib name

120252 of 252737 branches covered (47.58%)

Branch coverage included in aggregate %.

201187 of 275508 relevant lines covered (73.02%)

15886166.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.86
/source/libs/executor/src/streamcountwindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "executorInt.h"
16
#include "function.h"
17
#include "functionMgt.h"
18
#include "operator.h"
19
#include "querytask.h"
20
#include "streamexecutorInt.h"
21
#include "tchecksum.h"
22
#include "tcommon.h"
23
#include "tdatablock.h"
24
#include "tglobal.h"
25
#include "tlog.h"
26
#include "ttime.h"
27

28
#define IS_NORMAL_COUNT_OP(op)          ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT)
29
#define STREAM_COUNT_OP_STATE_NAME      "StreamCountHistoryState"
30
#define STREAM_COUNT_OP_CHECKPOINT_NAME "StreamCountOperator_Checkpoint"
31

32
typedef struct SCountWindowInfo {
33
  SResultWindowInfo winInfo;
34
  COUNT_TYPE*       pWindowCount;
35
} SCountWindowInfo;
36

37
typedef enum {
38
  NONE_WINDOW = 0,
39
  CREATE_NEW_WINDOW,
40
  MOVE_NEXT_WINDOW,
41
} BuffOp;
42
typedef struct SBuffInfo {
43
  bool             rebuildWindow;
44
  BuffOp           winBuffOp;
45
  SStreamStateCur* pCur;
46
} SBuffInfo;
47

48
void destroyStreamCountAggOperatorInfo(void* param) {
290✔
49
  if (param == NULL) {
290!
50
    return;
×
51
  }
52
  SStreamCountAggOperatorInfo* pInfo = (SStreamCountAggOperatorInfo*)param;
290✔
53
  cleanupBasicInfo(&pInfo->binfo);
290✔
54
  if (pInfo->pOperator) {
290!
55
    cleanupResultInfoInStream(pInfo->pOperator->pTaskInfo, pInfo->streamAggSup.pState, &pInfo->pOperator->exprSupp,
290✔
56
                              &pInfo->groupResInfo);
57
    pInfo->pOperator = NULL;
290✔
58
  }
59
  destroyStreamAggSupporter(&pInfo->streamAggSup);
290✔
60
  cleanupExprSupp(&pInfo->scalarSupp);
290✔
61
  clearGroupResInfo(&pInfo->groupResInfo);
290✔
62
  taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
290✔
63
  pInfo->pUpdated = NULL;
290✔
64

65
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
290✔
66
  blockDataDestroy(pInfo->pDelRes);
290✔
67
  tSimpleHashCleanup(pInfo->pStUpdated);
290✔
68
  tSimpleHashCleanup(pInfo->pStDeleted);
290✔
69
  cleanupGroupResInfo(&pInfo->groupResInfo);
290✔
70

71
  taosArrayDestroy(pInfo->historyWins);
290✔
72
  blockDataDestroy(pInfo->pCheckpointRes);
290✔
73

74
  tSimpleHashCleanup(pInfo->pPkDeleted);
290✔
75

76
  taosMemoryFreeClear(param);
290!
77
}
78

79
bool isSlidingCountWindow(SStreamAggSupporter* pAggSup) { return pAggSup->windowCount != pAggSup->windowSliding; }
10,498✔
80

81
int32_t setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, SCountWindowInfo* pCurWin,
5,140✔
82
                          SBuffInfo* pBuffInfo) {
83
  int32_t code = TSDB_CODE_SUCCESS;
5,140✔
84
  int32_t lino = 0;
5,140✔
85
  int32_t winCode = TSDB_CODE_SUCCESS;
5,140✔
86
  int32_t size = pAggSup->resultRowSize;
5,140✔
87
  pCurWin->winInfo.sessionWin.groupId = groupId;
5,140✔
88
  pCurWin->winInfo.sessionWin.win.skey = ts;
5,140✔
89
  pCurWin->winInfo.sessionWin.win.ekey = ts;
5,140✔
90

91
  if (isSlidingCountWindow(pAggSup)) {
5,140✔
92
    if (pBuffInfo->winBuffOp == CREATE_NEW_WINDOW) {
100✔
93
      code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin, pAggSup->windowCount,
33✔
94
                                                        (void**)&pCurWin->winInfo.pStatePos, &size);
33✔
95
      QUERY_CHECK_CODE(code, lino, _end);
33!
96

97
      winCode = TSDB_CODE_FAILED;
33✔
98
    } else if (pBuffInfo->winBuffOp == MOVE_NEXT_WINDOW) {
67✔
99
      QUERY_CHECK_NULL(pBuffInfo->pCur, code, lino, _end, terrno);
17!
100
      pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pBuffInfo->pCur);
17✔
101
      winCode = pAggSup->stateStore.streamStateSessionGetKVByCur(pBuffInfo->pCur, &pCurWin->winInfo.sessionWin,
34✔
102
                                                                 (void**)&pCurWin->winInfo.pStatePos, &size);
17✔
103
      if (winCode == TSDB_CODE_FAILED) {
17✔
104
        code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin, pAggSup->windowCount,
1✔
105
                                                          (void**)&pCurWin->winInfo.pStatePos, &size);
1✔
106
        QUERY_CHECK_CODE(code, lino, _end);
1!
107
      } else {
108
        reuseOutputBuf(pAggSup->pState, pCurWin->winInfo.pStatePos, &pAggSup->stateStore);
16✔
109
      }
110
    } else {
111
      pBuffInfo->pCur = pAggSup->stateStore.streamStateCountSeekKeyPrev(pAggSup->pState, &pCurWin->winInfo.sessionWin,
100✔
112
                                                                        pAggSup->windowCount);
50✔
113
      winCode = pAggSup->stateStore.streamStateSessionGetKVByCur(pBuffInfo->pCur, &pCurWin->winInfo.sessionWin,
100✔
114
                                                                 (void**)&pCurWin->winInfo.pStatePos, &size);
50✔
115
      if (winCode == TSDB_CODE_FAILED) {
50✔
116
        code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin, pAggSup->windowCount,
8✔
117
                                                          (void**)&pCurWin->winInfo.pStatePos, &size);
8✔
118
        QUERY_CHECK_CODE(code, lino, _end);
8!
119
      } else {
120
        reuseOutputBuf(pAggSup->pState, pCurWin->winInfo.pStatePos, &pAggSup->stateStore);
42✔
121
      }
122
    }
123
    if (ts < pCurWin->winInfo.sessionWin.win.ekey) {
101✔
124
      pBuffInfo->rebuildWindow = true;
8✔
125
    }
126
  } else {
127
    code = pAggSup->stateStore.streamStateCountWinAddIfNotExist(pAggSup->pState, &pCurWin->winInfo.sessionWin,
5,039✔
128
                                                                pAggSup->windowCount,
5,039✔
129
                                                                (void**)&pCurWin->winInfo.pStatePos, &size, &winCode);
5,039✔
130
    QUERY_CHECK_CODE(code, lino, _end);
5,041!
131
  }
132

133
  if (winCode == TSDB_CODE_SUCCESS) {
5,142✔
134
    pCurWin->winInfo.isOutput = true;
2,971✔
135
  }
136
  pCurWin->pWindowCount =
5,142✔
137
      (COUNT_TYPE*)((char*)pCurWin->winInfo.pStatePos->pRowBuff + (pAggSup->resultRowSize - sizeof(COUNT_TYPE)));
5,142✔
138

139
  if (*pCurWin->pWindowCount == pAggSup->windowCount) {
5,142✔
140
    pBuffInfo->rebuildWindow = true;
47✔
141
  }
142

143
_end:
5,095✔
144
  if (code != TSDB_CODE_SUCCESS) {
5,142!
145
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
146
  }
147
  return code;
5,142✔
148
}
149

150
static void removeCountResult(SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey* pKey) {
3,099✔
151
  SSessionKey key = {0};
3,099✔
152
  getSessionHashKey(pKey, &key);
3,099✔
153
  int32_t code = tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey));
3,095✔
154
  if (code != TSDB_CODE_SUCCESS) {
3,100✔
155
    qInfo("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
1,713!
156
  }
157

158
  code = tSimpleHashRemove(pResMap, &key, sizeof(SSessionKey));
3,103✔
159
  if (code != TSDB_CODE_SUCCESS) {
3,101✔
160
    qInfo("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
2,375!
161
  }
162
}
3,101✔
163

164
static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowInfo* pWinInfo, TSKEY* pTs,
5,092✔
165
                                     int32_t start, int32_t rows, int32_t maxRows, SSHashObj* pStUpdated,
166
                                     SSHashObj* pStDeleted, bool* pRebuild, int32_t* pWinRows) {
167
  int32_t     code = TSDB_CODE_SUCCESS;
5,092✔
168
  int32_t     lino = 0;
5,092✔
169
  SSessionKey sWinKey = pWinInfo->winInfo.sessionWin;
5,092✔
170
  int32_t     num = 0;
5,092✔
171
  for (int32_t i = start; i < rows; i++) {
5,252✔
172
    if (pTs[i] < pWinInfo->winInfo.sessionWin.win.ekey) {
5,233✔
173
      num++;
160✔
174
    } else {
175
      break;
5,073✔
176
    }
177
  }
178
  int32_t maxNum = TMIN(maxRows - *pWinInfo->pWindowCount, rows - start);
5,092✔
179
  if (num > maxNum) {
5,092!
180
    *pRebuild = true;
×
181
  }
182
  *pWinInfo->pWindowCount += maxNum;
5,092✔
183
  bool needDelState = false;
5,092✔
184
  if (pWinInfo->winInfo.sessionWin.win.skey > pTs[start]) {
5,092✔
185
    needDelState = true;
11✔
186
    if (pStDeleted && pWinInfo->winInfo.isOutput) {
11!
187
      code = saveDeleteRes(pStDeleted, pWinInfo->winInfo.sessionWin);
11✔
188
      QUERY_CHECK_CODE(code, lino, _end);
11!
189
    }
190

191
    pWinInfo->winInfo.sessionWin.win.skey = pTs[start];
11✔
192
  }
193

194
  if (pWinInfo->winInfo.sessionWin.win.ekey < pTs[maxNum + start - 1]) {
5,092✔
195
    needDelState = true;
3,088✔
196
    pWinInfo->winInfo.sessionWin.win.ekey = pTs[maxNum + start - 1];
3,088✔
197
  }
198

199
  if (needDelState) {
5,092✔
200
    memcpy(pWinInfo->winInfo.pStatePos->pKey, &pWinInfo->winInfo.sessionWin, sizeof(SSessionKey));
3,100✔
201
    removeCountResult(pStUpdated, pAggSup->pResultRows, &sWinKey);
3,100✔
202
    if (pWinInfo->winInfo.pStatePos->needFree) {
3,101✔
203
      pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &sWinKey);
53✔
204
    }
205
  }
206

207
  (*pWinRows) = maxNum;
5,096✔
208

209
_end:
5,096✔
210
  if (code != TSDB_CODE_SUCCESS) {
5,096!
211
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
212
  }
213
  return code;
5,095✔
214
}
215

216
void getCountWinRange(SStreamAggSupporter* pAggSup, const SSessionKey* pKey, EStreamType mode, SSessionKey* pDelRange) {
117✔
217
  *pDelRange = *pKey;
117✔
218
  SStreamStateCur* pCur = NULL;
117✔
219
  if (isSlidingCountWindow(pAggSup)) {
117✔
220
    pCur = pAggSup->stateStore.streamStateCountSeekKeyPrev(pAggSup->pState, pKey, pAggSup->windowCount);
9✔
221
  } else {
222
    pCur = pAggSup->stateStore.streamStateSessionSeekKeyCurrentNext(pAggSup->pState, pKey);
108✔
223
  }
224
  SSessionKey tmpKey = {.groupId = pKey->groupId, .win.ekey = INT64_MIN, .win.skey = INT64_MIN};
117✔
225
  int32_t     code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0);
117✔
226
  if (code != TSDB_CODE_SUCCESS) {
117!
227
    pAggSup->stateStore.streamStateFreeCur(pCur);
×
228
    return;
×
229
  }
230
  pDelRange->win = tmpKey.win;
117✔
231
  while (mode == STREAM_DELETE_DATA || mode == STREAM_PARTITION_DELETE_DATA) {
190✔
232
    pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pCur);
158✔
233
    code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0);
158✔
234
    if (code != TSDB_CODE_SUCCESS) {
158✔
235
      break;
85✔
236
    }
237
    pDelRange->win.ekey = TMAX(pDelRange->win.ekey, tmpKey.win.ekey);
73✔
238
  }
239
  pAggSup->stateStore.streamStateFreeCur(pCur);
117✔
240
}
241

242
static void destroySBuffInfo(SStreamAggSupporter* pAggSup, SBuffInfo* pBuffInfo) {
4,972✔
243
  pAggSup->stateStore.streamStateFreeCur(pBuffInfo->pCur);
4,972✔
244
}
4,972✔
245

246
bool inCountCalSlidingWindow(SStreamAggSupporter* pAggSup, STimeWindow* pWin, TSKEY sKey, TSKEY eKey) {
5,176✔
247
  if (pAggSup->windowCount == pAggSup->windowSliding) {
5,176✔
248
    return true;
5,071✔
249
  }
250
  if (sKey <= pWin->skey && pWin->ekey <= eKey) {
105✔
251
    return true;
100✔
252
  }
253
  return false;
5✔
254
}
255

256
bool inCountSlidingWindow(SStreamAggSupporter* pAggSup, STimeWindow* pWin, SDataBlockInfo* pBlockInfo) {
5,141✔
257
  return inCountCalSlidingWindow(pAggSup, pWin, pBlockInfo->calWin.skey, pBlockInfo->calWin.ekey);
5,141✔
258
}
259

260
static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pStUpdated,
4,972✔
261
                                 SSHashObj* pStDeleted) {
262
  int32_t                      code = TSDB_CODE_SUCCESS;
4,972✔
263
  int32_t                      lino = 0;
4,972✔
264
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
4,972✔
265
  SStreamCountAggOperatorInfo* pInfo = pOperator->info;
4,972✔
266
  int32_t                      numOfOutput = pOperator->exprSupp.numOfExprs;
4,972✔
267
  uint64_t                     groupId = pSDataBlock->info.id.groupId;
4,972✔
268
  SResultRow*                  pResult = NULL;
4,972✔
269
  int32_t                      rows = pSDataBlock->info.rows;
4,972✔
270
  int32_t                      winRows = 0;
4,972✔
271
  SStreamAggSupporter*         pAggSup = &pInfo->streamAggSup;
4,972✔
272
  SBuffInfo                    buffInfo = {.rebuildWindow = false, .winBuffOp = NONE_WINDOW, .pCur = NULL};
4,972✔
273

274
  pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version);
4,972✔
275
  pAggSup->winRange = pTaskInfo->streamInfo.fillHistoryWindow;
4,972✔
276
  if (pAggSup->winRange.ekey <= 0) {
4,972!
277
    pAggSup->winRange.ekey = INT64_MAX;
×
278
  }
279

280
  SColumnInfoData* pStartTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
4,972✔
281
  if (!pStartTsCol) {
4,972!
282
    code = TSDB_CODE_FAILED;
×
283
    QUERY_CHECK_CODE(code, lino, _end);
×
284
  }
285
  TSKEY* startTsCols = (int64_t*)pStartTsCol->pData;
4,972✔
286
  code = blockDataEnsureCapacity(pAggSup->pScanBlock, rows * 2);
4,972✔
287
  QUERY_CHECK_CODE(code, lino, _end);
4,974!
288

289
  SStreamStateCur* pCur = NULL;
4,974✔
290
  COUNT_TYPE       slidingRows = 0;
4,974✔
291

292
  for (int32_t i = 0; i < rows;) {
10,074✔
293
    if (pInfo->ignoreExpiredData &&
9,875✔
294
        checkExpiredData(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pUpdateInfo, &pInfo->twAggSup,
4,728✔
295
                         pSDataBlock->info.id.uid, startTsCols[i], NULL, 0)) {
4,728✔
296
      i++;
4✔
297
      continue;
7✔
298
    }
299
    SCountWindowInfo curWin = {0};
5,143✔
300
    buffInfo.rebuildWindow = false;
5,143✔
301
    code = setCountOutputBuf(pAggSup, startTsCols[i], groupId, &curWin, &buffInfo);
5,143✔
302
    QUERY_CHECK_CODE(code, lino, _end);
5,142!
303

304
    if (!inCountSlidingWindow(pAggSup, &curWin.winInfo.sessionWin.win, &pSDataBlock->info)) {
5,142✔
305
      buffInfo.winBuffOp = MOVE_NEXT_WINDOW;
3✔
306
      continue;
3✔
307
    }
308
    setSessionWinOutputInfo(pStUpdated, &curWin.winInfo);
5,138✔
309
    slidingRows = *curWin.pWindowCount;
5,139✔
310
    if (!buffInfo.rebuildWindow) {
5,139✔
311
      code = updateCountWindowInfo(pAggSup, &curWin, startTsCols, i, rows, pAggSup->windowCount, pStUpdated, pStDeleted,
5,094✔
312
                                   &buffInfo.rebuildWindow, &winRows);
313
      QUERY_CHECK_CODE(code, lino, _end);
5,096!
314
    }
315
    if (buffInfo.rebuildWindow) {
5,141✔
316
      SSessionKey range = {0};
46✔
317
      if (isSlidingCountWindow(pAggSup)) {
46✔
318
        curWin.winInfo.sessionWin.win.skey = startTsCols[i];
5✔
319
        curWin.winInfo.sessionWin.win.ekey = startTsCols[i];
5✔
320
      }
321
      getCountWinRange(pAggSup, &curWin.winInfo.sessionWin, STREAM_DELETE_DATA, &range);
46✔
322
      range.win.skey = TMIN(startTsCols[i], range.win.skey);
46✔
323
      range.win.ekey = TMAX(startTsCols[rows - 1], range.win.ekey);
46✔
324
      uint64_t uid = 0;
46✔
325
      code =
326
          appendDataToSpecialBlock(pAggSup->pScanBlock, &range.win.skey, &range.win.ekey, &uid, &range.groupId, NULL);
46✔
327
      QUERY_CHECK_CODE(code, lino, _end);
46!
328
      break;
46✔
329
    }
330
    code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &curWin.winInfo, &pResult, i, winRows, rows, numOfOutput,
5,095✔
331
                              pOperator, 0);
332
    QUERY_CHECK_CODE(code, lino, _end);
5,095!
333

334
    code = saveSessionOutputBuf(pAggSup, &curWin.winInfo);
5,095✔
335
    QUERY_CHECK_CODE(code, lino, _end);
5,092!
336

337
    if (pInfo->destHasPrimaryKey && curWin.winInfo.isOutput && IS_NORMAL_COUNT_OP(pOperator)) {
5,092!
338
      code = saveDeleteRes(pInfo->pPkDeleted, curWin.winInfo.sessionWin);
×
339
      QUERY_CHECK_CODE(code, lino, _end);
×
340
    }
341

342
    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pStUpdated) {
5,092!
343
      code = saveResult(curWin.winInfo, pStUpdated);
4,117✔
344
      QUERY_CHECK_CODE(code, lino, _end);
4,117!
345
    }
346
    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
5,092✔
347
      curWin.winInfo.pStatePos->beUpdated = true;
975✔
348
      SSessionKey key = {0};
975✔
349
      getSessionHashKey(&curWin.winInfo.sessionWin, &key);
975✔
350
      code =
351
          tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
975✔
352
      QUERY_CHECK_CODE(code, lino, _end);
976!
353
    }
354

355
    if (isSlidingCountWindow(pAggSup)) {
5,093✔
356
      if (slidingRows <= pAggSup->windowSliding) {
92✔
357
        if (slidingRows + winRows > pAggSup->windowSliding) {
78✔
358
          buffInfo.winBuffOp = CREATE_NEW_WINDOW;
33✔
359
          winRows = pAggSup->windowSliding - slidingRows;
33✔
360
        }
361
      } else {
362
        buffInfo.winBuffOp = MOVE_NEXT_WINDOW;
14✔
363
        winRows = 0;
14✔
364
      }
365
    }
366
    i += winRows;
5,093✔
367
  }
368

369
_end:
4,929✔
370
  if (code != TSDB_CODE_SUCCESS) {
4,975!
371
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
372
  }
373
  destroySBuffInfo(pAggSup, &buffInfo);
4,975✔
374
}
4,973✔
375

376
static int32_t buildCountResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
8,198✔
377
  int32_t                      code = TSDB_CODE_SUCCESS;
8,198✔
378
  SStreamCountAggOperatorInfo* pInfo = pOperator->info;
8,198✔
379
  SStreamAggSupporter*         pAggSup = &pInfo->streamAggSup;
8,198✔
380
  SOptrBasicInfo*              pBInfo = &pInfo->binfo;
8,198✔
381
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
8,198✔
382
  doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
8,198✔
383
  if (pInfo->pDelRes->info.rows > 0) {
8,206✔
384
    printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
48✔
385
    (*ppRes) = pInfo->pDelRes;
48✔
386
    return code;
48✔
387
  }
388

389
  doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
8,158✔
390
  if (pBInfo->pRes->info.rows > 0) {
8,162✔
391
    printDataBlock(pBInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
2,585✔
392
    (*ppRes) = pBInfo->pRes;
2,585✔
393
    return code;
2,585✔
394
  }
395
  (*ppRes) = NULL;
5,577✔
396
  return code;
5,577✔
397
}
398

399
int32_t doStreamCountEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOperator, bool isParent) {
×
400
  SStreamCountAggOperatorInfo* pInfo = pOperator->info;
×
401
  if (!pInfo) {
×
402
    return 0;
×
403
  }
404

405
  void* pData = (buf == NULL) ? NULL : *buf;
×
406

407
  // 1.streamAggSup.pResultRows
408
  int32_t tlen = 0;
×
409
  int32_t mapSize = tSimpleHashGetSize(pInfo->streamAggSup.pResultRows);
×
410
  tlen += taosEncodeFixedI32(buf, mapSize);
×
411
  void*   pIte = NULL;
×
412
  size_t  keyLen = 0;
×
413
  int32_t iter = 0;
×
414
  while ((pIte = tSimpleHashIterate(pInfo->streamAggSup.pResultRows, pIte, &iter)) != NULL) {
×
415
    void* key = tSimpleHashGetKey(pIte, &keyLen);
×
416
    tlen += encodeSSessionKey(buf, key);
×
417
    tlen += encodeSResultWindowInfo(buf, pIte, pInfo->streamAggSup.resultRowSize);
×
418
  }
419

420
  // 2.twAggSup
421
  tlen += encodeSTimeWindowAggSupp(buf, &pInfo->twAggSup);
×
422

423
  // 3.dataVersion
424
  tlen += taosEncodeFixedI32(buf, pInfo->dataVersion);
×
425

426
  // 4.checksum
427
  if (isParent) {
×
428
    if (buf) {
×
429
      uint32_t cksum = taosCalcChecksum(0, pData, len - sizeof(uint32_t));
×
430
      tlen += taosEncodeFixedU32(buf, cksum);
×
431
    } else {
432
      tlen += sizeof(uint32_t);
×
433
    }
434
  }
435

436
  return tlen;
×
437
}
438

439
int32_t doStreamCountDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperator, bool isParent) {
×
440
  int32_t                      code = TSDB_CODE_SUCCESS;
×
441
  int32_t                      lino = 0;
×
442
  SStreamCountAggOperatorInfo* pInfo = pOperator->info;
×
443
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
×
444
  if (!pInfo) {
×
445
    code = TSDB_CODE_FAILED;
×
446
    QUERY_CHECK_CODE(code, lino, _end);
×
447
  }
448

449
  // 4.checksum
450
  if (isParent) {
×
451
    int32_t dataLen = len - sizeof(uint32_t);
×
452
    void*   pCksum = POINTER_SHIFT(buf, dataLen);
×
453
    if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) {
×
454
      code = TSDB_CODE_FAILED;
×
455
      QUERY_CHECK_CODE(code, lino, _end);
×
456
    }
457
  }
458

459
  // 1.streamAggSup.pResultRows
460
  int32_t mapSize = 0;
×
461
  buf = taosDecodeFixedI32(buf, &mapSize);
×
462
  for (int32_t i = 0; i < mapSize; i++) {
×
463
    SSessionKey      key = {0};
×
464
    SCountWindowInfo curWin = {0};
×
465
    buf = decodeSSessionKey(buf, &key);
×
466
    SBuffInfo buffInfo = {.rebuildWindow = false, .winBuffOp = NONE_WINDOW, .pCur = NULL};
×
467
    code = setCountOutputBuf(&pInfo->streamAggSup, key.win.skey, key.groupId, &curWin, &buffInfo);
×
468
    QUERY_CHECK_CODE(code, lino, _end);
×
469

470
    buf = decodeSResultWindowInfo(buf, &curWin.winInfo, pInfo->streamAggSup.resultRowSize);
×
471
    code = tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo,
×
472
                          sizeof(SResultWindowInfo));
473
    QUERY_CHECK_CODE(code, lino, _end);
×
474
  }
475

476
  // 2.twAggSup
477
  buf = decodeSTimeWindowAggSupp(buf, &pInfo->twAggSup);
×
478

479
  // 3.dataVersion
480
  buf = taosDecodeFixedI64(buf, &pInfo->dataVersion);
×
481

482
_end:
×
483
  if (code != TSDB_CODE_SUCCESS) {
×
484
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
485
  }
486
  return code;
×
487
}
488

489
void doStreamCountSaveCheckpoint(SOperatorInfo* pOperator) {
79✔
490
  int32_t                      code = TSDB_CODE_SUCCESS;
79✔
491
  int32_t                      lino = 0;
79✔
492
  void*                        pBuf = NULL;
79✔
493
  SStreamCountAggOperatorInfo* pInfo = pOperator->info;
79✔
494
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
79✔
495
  if (needSaveStreamOperatorInfo(&pInfo->basic)) {
79!
496
    int32_t len = doStreamCountEncodeOpState(NULL, 0, pOperator, true);
×
497
    pBuf = taosMemoryCalloc(1, len);
×
498
    if (!pBuf) {
×
499
      code = terrno;
×
500
      QUERY_CHECK_CODE(code, lino, _end);
×
501
    }
502
    void* pTmpBuf = pBuf;
×
503
    len = doStreamCountEncodeOpState(&pTmpBuf, len, pOperator, true);
×
504
    pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_COUNT_OP_CHECKPOINT_NAME,
×
505
                                                       strlen(STREAM_COUNT_OP_CHECKPOINT_NAME), pBuf, len);
506
    saveStreamOperatorStateComplete(&pInfo->basic);
×
507
  }
508

509
_end:
79✔
510
  taosMemoryFreeClear(pBuf);
79!
511
  if (code != TSDB_CODE_SUCCESS) {
79!
512
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
513
  }
514
}
79✔
515

516
void doResetCountWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock) {
32✔
517
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
32✔
518
  TSKEY*           startDatas = (TSKEY*)pStartTsCol->pData;
32✔
519
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
32✔
520
  TSKEY*           endDatas = (TSKEY*)pEndTsCol->pData;
32✔
521
  SColumnInfoData* pCalStartTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
32✔
522
  TSKEY*           calStartDatas = (TSKEY*)pStartTsCol->pData;
32✔
523
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
32✔
524
  TSKEY*           calEndDatas = (TSKEY*)pEndTsCol->pData;
32✔
525
  SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
32✔
526
  uint64_t*        gpDatas = (uint64_t*)pGroupCol->pData;
32✔
527

528
  SRowBuffPos* pPos = NULL;
32✔
529
  int32_t      size = 0;
32✔
530
  for (int32_t i = 0; i < pBlock->info.rows; i++) {
64✔
531
    SSessionKey      key = {.groupId = gpDatas[i], .win.skey = startDatas[i], .win.ekey = endDatas[i]};
32✔
532
    SStreamStateCur* pCur = NULL;
32✔
533
    if (isSlidingCountWindow(pAggSup)) {
32✔
534
      pCur = pAggSup->stateStore.streamStateCountSeekKeyPrev(pAggSup->pState, &key, pAggSup->windowCount);
2✔
535
    } else {
536
      pCur = pAggSup->stateStore.streamStateSessionSeekKeyCurrentNext(pAggSup->pState, &key);
30✔
537
    }
538
    while (1) {
35✔
539
      SSessionKey tmpKey = {.groupId = gpDatas[i], .win.skey = INT64_MIN, .win.ekey = INT64_MIN};
67✔
540
      int32_t     code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &tmpKey, (void**)&pPos, &size);
67✔
541
      if (code != TSDB_CODE_SUCCESS || tmpKey.win.skey > endDatas[i]) {
67✔
542
        pAggSup->stateStore.streamStateFreeCur(pCur);
32✔
543
        break;
32✔
544
      }
545
      if (!inCountCalSlidingWindow(pAggSup, &tmpKey.win, calStartDatas[i], calEndDatas[i])) {
35✔
546
        pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pCur);
2✔
547
        continue;
2✔
548
      }
549
      pAggSup->stateStore.streamStateSessionReset(pAggSup->pState, pPos->pRowBuff);
33✔
550
      pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pCur);
33✔
551
    }
552
  }
553
}
32✔
554

555
int32_t doDeleteCountWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SArray* result) {
5✔
556
  int32_t          code = TSDB_CODE_SUCCESS;
5✔
557
  int32_t          lino = 0;
5✔
558
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
5✔
559
  TSKEY*           startDatas = (TSKEY*)pStartTsCol->pData;
5✔
560
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
5✔
561
  TSKEY*           endDatas = (TSKEY*)pEndTsCol->pData;
5✔
562
  SColumnInfoData* pCalStartTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
5✔
563
  TSKEY*           calStartDatas = (TSKEY*)pStartTsCol->pData;
5✔
564
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
5✔
565
  TSKEY*           calEndDatas = (TSKEY*)pEndTsCol->pData;
5✔
566
  SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
5✔
567
  uint64_t*        gpDatas = (uint64_t*)pGroupCol->pData;
5✔
568
  for (int32_t i = 0; i < pBlock->info.rows; i++) {
12✔
569
    SSessionKey key = {.win.skey = startDatas[i], .win.ekey = endDatas[i], .groupId = gpDatas[i]};
7✔
570
    while (1) {
17✔
571
      SSessionKey curWin = {0};
24✔
572
      int32_t     winCode = pAggSup->stateStore.streamStateCountGetKeyByRange(pAggSup->pState, &key, &curWin);
24✔
573
      if (winCode != TSDB_CODE_SUCCESS) {
24✔
574
        break;
7✔
575
      }
576
      doDeleteSessionWindow(pAggSup, &curWin);
17✔
577
      if (result) {
17!
578
        code = saveDeleteInfo(result, curWin);
17✔
579
        QUERY_CHECK_CODE(code, lino, _end);
17!
580
      }
581
    }
582
  }
583

584
_end:
5✔
585
  if (code != TSDB_CODE_SUCCESS) {
5!
586
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
587
  }
588
  return code;
5✔
589
}
590

591
int32_t deleteCountWinState(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHashObj* pMapUpdate,
78✔
592
                            SSHashObj* pMapDelete, SSHashObj* pPkDelete, bool needAdd) {
593
  int32_t code = TSDB_CODE_SUCCESS;
78✔
594
  int32_t lino = 0;
78✔
595
  SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
78✔
596
  if (!pWins) {
78!
597
    code = terrno;
×
598
    QUERY_CHECK_CODE(code, lino, _end);
×
599
  }
600

601
  if (isSlidingCountWindow(pAggSup)) {
78✔
602
    code = doDeleteCountWindows(pAggSup, pBlock, pWins);
5✔
603
    QUERY_CHECK_CODE(code, lino, _end);
5!
604
  } else {
605
    code = doDeleteTimeWindows(pAggSup, pBlock, pWins);
73✔
606
    QUERY_CHECK_CODE(code, lino, _end);
73!
607
  }
608
  removeSessionResults(pAggSup, pMapUpdate, pWins);
78✔
609
  code = copyDeleteWindowInfo(pWins, pMapDelete);
78✔
610
  QUERY_CHECK_CODE(code, lino, _end);
78!
611
  if (needAdd) {
78!
612
    code = copyDeleteWindowInfo(pWins, pPkDelete);
×
613
    QUERY_CHECK_CODE(code, lino, _end);
×
614
  }
615
  taosArrayDestroy(pWins);
78✔
616

617
_end:
78✔
618
  if (code != TSDB_CODE_SUCCESS) {
78!
619
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
620
  }
621
  return code;
78✔
622
}
623

624
static int32_t doStreamCountAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
9,900✔
625
  int32_t                      code = TSDB_CODE_SUCCESS;
9,900✔
626
  int32_t                      lino = 0;
9,900✔
627
  SExprSupp*                   pSup = &pOperator->exprSupp;
9,900✔
628
  SStreamCountAggOperatorInfo* pInfo = pOperator->info;
9,900✔
629
  SOptrBasicInfo*              pBInfo = &pInfo->binfo;
9,900✔
630
  SStreamAggSupporter*         pAggSup = &pInfo->streamAggSup;
9,900✔
631
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
9,900✔
632
  qDebug("stask:%s  %s status: %d", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status);
9,900✔
633
  if (pOperator->status == OP_EXEC_DONE) {
9,907!
634
    (*ppRes) = NULL;
×
635
    return code;
×
636
  } else if (pOperator->status == OP_RES_TO_RETURN) {
9,907✔
637
    SSDataBlock* opRes = NULL;
2,634✔
638
    code = buildCountResult(pOperator, &opRes);
2,634✔
639
    QUERY_CHECK_CODE(code, lino, _end);
2,634!
640
    if (opRes) {
2,634✔
641
      (*ppRes) = opRes;
1,002✔
642
      return code;
2,634✔
643
    }
644

645
    if (pInfo->recvGetAll) {
1,632!
646
      pInfo->recvGetAll = false;
×
647
      resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows);
×
648
    }
649

650
    if (pInfo->reCkBlock) {
1,632!
651
      pInfo->reCkBlock = false;
×
652
      printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
653
      (*ppRes) = pInfo->pCheckpointRes;
×
654
      return code;
×
655
    }
656

657
    setStreamOperatorCompleted(pOperator);
1,632✔
658
    (*ppRes) = NULL;
1,632✔
659
    return code;
1,632✔
660
  }
661

662
  SOperatorInfo* downstream = pOperator->pDownstream[0];
7,273✔
663
  if (!pInfo->pUpdated) {
7,273✔
664
    pInfo->pUpdated = taosArrayInit(16, sizeof(SResultWindowInfo));
5,575✔
665
    QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
5,573!
666
  }
667
  if (!pInfo->pStUpdated) {
7,271✔
668
    _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
5,574✔
669
    pInfo->pStUpdated = tSimpleHashInit(64, hashFn);
5,573✔
670
    QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _end, terrno);
5,580!
671
  }
672
  while (1) {
5,161✔
673
    SSDataBlock* pBlock = NULL;
12,438✔
674
    code = downstream->fpSet.getNextFn(downstream, &pBlock);
12,438✔
675
    QUERY_CHECK_CODE(code, lino, _end);
12,422!
676

677
    if (pBlock == NULL) {
12,422✔
678
      break;
5,562✔
679
    }
680

681
    printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
6,860✔
682
    setStreamOperatorState(&pInfo->basic, pBlock->info.type);
6,858✔
683

684
    if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
6,861✔
685
      bool add = pInfo->destHasPrimaryKey && IS_NORMAL_COUNT_OP(pOperator);
78!
686
      code = deleteCountWinState(&pInfo->streamAggSup, pBlock, pInfo->pStUpdated, pInfo->pStDeleted, pInfo->pPkDeleted,
78✔
687
                                 add);
688
      QUERY_CHECK_CODE(code, lino, _end);
78!
689
      continue;
189✔
690
    } else if (pBlock->info.type == STREAM_CLEAR) {
6,783✔
691
      doResetCountWindows(&pInfo->streamAggSup, pBlock);
32✔
692
      continue;
32✔
693
    } else if (pBlock->info.type == STREAM_GET_ALL) {
6,751!
694
      pInfo->recvGetAll = true;
×
695
      code = getAllSessionWindow(pAggSup->pResultRows, pInfo->pStUpdated);
×
696
      QUERY_CHECK_CODE(code, lino, _end);
×
697
      continue;
×
698
    } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
6,751✔
699
      (*ppRes) = pBlock;
1,703✔
700
      return code;
1,703✔
701
    } else if (pBlock->info.type == STREAM_CHECKPOINT) {
5,048✔
702
      pAggSup->stateStore.streamStateCommit(pAggSup->pState);
79✔
703
      doStreamCountSaveCheckpoint(pOperator);
79✔
704
      code = copyDataBlock(pInfo->pCheckpointRes, pBlock);
79✔
705
      QUERY_CHECK_CODE(code, lino, _end);
79!
706
      continue;
79✔
707
    } else {
708
      if (pBlock->info.type != STREAM_NORMAL && pBlock->info.type != STREAM_INVALID) {
4,969!
709
        code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
710
        QUERY_CHECK_CODE(code, lino, _end);
×
711
      }
712
    }
713

714
    if (pInfo->scalarSupp.pExprInfo != NULL) {
4,969!
715
      SExprSupp* pExprSup = &pInfo->scalarSupp;
×
716
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
×
717
      QUERY_CHECK_CODE(code, lino, _end);
×
718
    }
719
    // the pDataBlock are always the same one, no need to call this again
720
    code = setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
4,969✔
721
    QUERY_CHECK_CODE(code, lino, _end);
4,972!
722
    doStreamCountAggImpl(pOperator, pBlock, pInfo->pStUpdated, pInfo->pStDeleted);
4,972✔
723
    pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
4,972✔
724
    pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.watermark);
4,972✔
725
  }
726
  // restore the value
727
  pOperator->status = OP_RES_TO_RETURN;
5,562✔
728

729
  code = closeSessionWindow(pAggSup->pResultRows, &pInfo->twAggSup, pInfo->pStUpdated);
5,562✔
730
  QUERY_CHECK_CODE(code, lino, _end);
5,571!
731

732
  code = copyUpdateResult(&pInfo->pStUpdated, pInfo->pUpdated, sessionKeyCompareAsc);
5,571✔
733
  QUERY_CHECK_CODE(code, lino, _end);
5,565!
734

735
  removeSessionDeleteResults(pInfo->pStDeleted, pInfo->pUpdated);
5,565✔
736
  initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
5,579✔
737
  pInfo->pUpdated = NULL;
5,564✔
738
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
5,564✔
739
  QUERY_CHECK_CODE(code, lino, _end);
5,573!
740

741
  if (pInfo->destHasPrimaryKey && IS_NORMAL_COUNT_OP(pOperator)) {
5,573!
742
    code = copyDeleteSessionKey(pInfo->pPkDeleted, pInfo->pStDeleted);
×
743
    QUERY_CHECK_CODE(code, lino, _end);
×
744
  }
745

746
  SSDataBlock* opRes = NULL;
5,573✔
747
  code = buildCountResult(pOperator, &opRes);
5,573✔
748
  QUERY_CHECK_CODE(code, lino, _end);
5,575!
749
  if (opRes) {
5,575✔
750
    (*ppRes) = opRes;
1,631✔
751
    return code;
1,631✔
752
  }
753

754
_end:
3,944✔
755
  if (code != TSDB_CODE_SUCCESS) {
3,944!
756
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
757
    pTaskInfo->code = code;
×
758
    T_LONG_JMP(pTaskInfo->env, code);
×
759
  }
760
  setStreamOperatorCompleted(pOperator);
3,944✔
761
  (*ppRes) = NULL;
3,942✔
762
  return code;
3,942✔
763
}
764

765
void streamCountReleaseState(SOperatorInfo* pOperator) {
79✔
766
  int32_t                      code = TSDB_CODE_SUCCESS;
79✔
767
  int32_t                      lino = 0;
79✔
768
  SStreamEventAggOperatorInfo* pInfo = pOperator->info;
79✔
769
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
79✔
770
  int32_t                      resSize = sizeof(TSKEY);
79✔
771
  char*                        pBuff = taosMemoryCalloc(1, resSize);
79✔
772
  QUERY_CHECK_NULL(pBuff, code, lino, _end, terrno);
79!
773

774
  memcpy(pBuff, &pInfo->twAggSup.maxTs, sizeof(TSKEY));
79✔
775
  qDebug("===stream=== count window operator relase state. ");
79✔
776
  pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_COUNT_OP_STATE_NAME,
79✔
777
                                                     strlen(STREAM_COUNT_OP_STATE_NAME), pBuff, resSize);
778
  pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState);
79✔
779
  taosMemoryFreeClear(pBuff);
79!
780
  SOperatorInfo* downstream = pOperator->pDownstream[0];
79✔
781
  if (downstream->fpSet.releaseStreamStateFn) {
79!
782
    downstream->fpSet.releaseStreamStateFn(downstream);
79✔
783
  }
784
_end:
×
785
  if (code != TSDB_CODE_SUCCESS) {
79!
786
    terrno = code;
×
787
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
788
  }
789
}
79✔
790

791
void streamCountReloadState(SOperatorInfo* pOperator) {
79✔
792
  int32_t                      code = TSDB_CODE_SUCCESS;
79✔
793
  int32_t                      lino = 0;
79✔
794
  SStreamCountAggOperatorInfo* pInfo = pOperator->info;
79✔
795
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
79✔
796
  SStreamAggSupporter*         pAggSup = &pInfo->streamAggSup;
79✔
797
  int32_t                      size = 0;
79✔
798
  void*                        pBuf = NULL;
79✔
799

800
  code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_COUNT_OP_STATE_NAME,
79✔
801
                                                strlen(STREAM_COUNT_OP_STATE_NAME), &pBuf, &size);
802
  QUERY_CHECK_CODE(code, lino, _end);
79!
803

804
  TSKEY ts = *(TSKEY*)pBuf;
79✔
805
  pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts);
79✔
806
  taosMemoryFree(pBuf);
79✔
807

808
  SOperatorInfo* downstream = pOperator->pDownstream[0];
79✔
809
  if (downstream->fpSet.reloadStreamStateFn) {
79!
810
    downstream->fpSet.reloadStreamStateFn(downstream);
79✔
811
  }
812
  reloadAggSupFromDownStream(downstream, &pInfo->streamAggSup);
79✔
813

814
_end:
79✔
815
  if (code != TSDB_CODE_SUCCESS) {
79!
816
    terrno = code;
×
817
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
818
  }
819
}
79✔
820

821
int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
308✔
822
                                         SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
823
  QRY_PARAM_CHECK(pOptrInfo);
308!
824

825
  SCountWinodwPhysiNode*       pCountNode = (SCountWinodwPhysiNode*)pPhyNode;
308✔
826
  int32_t                      numOfCols = 0;
308✔
827
  int32_t                      code = TSDB_CODE_SUCCESS;
308✔
828
  int32_t                      lino = 0;
308✔
829
  SStreamCountAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamCountAggOperatorInfo));
308✔
830
  SOperatorInfo*               pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
308✔
831
  if (pInfo == NULL || pOperator == NULL) {
308!
832
    code = terrno;
×
833
    QUERY_CHECK_CODE(code, lino, _error);
×
834
  }
835

836
  pOperator->pTaskInfo = pTaskInfo;
308✔
837

838
  initResultSizeInfo(&pOperator->resultInfo, 4096);
308✔
839
  if (pCountNode->window.pExprs != NULL) {
308!
840
    int32_t    numOfScalar = 0;
×
841
    SExprInfo* pScalarExprInfo = NULL;
×
842
    code = createExprInfo(pCountNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
×
843
    QUERY_CHECK_CODE(code, lino, _error);
×
844

845
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
×
846
    QUERY_CHECK_CODE(code, lino, _error);
×
847
  }
848
  SExprSupp* pExpSup = &pOperator->exprSupp;
308✔
849

850
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
308✔
851
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
308!
852
  pInfo->binfo.pRes = pResBlock;
308✔
853

854
  SExprInfo*   pExprInfo = NULL;
308✔
855
  code = createExprInfo(pCountNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
308✔
856
  QUERY_CHECK_CODE(code, lino, _error);
308!
857

858
  code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
308✔
859
  QUERY_CHECK_CODE(code, lino, _error);
308!
860

861
  pInfo->twAggSup = (STimeWindowAggSupp){
308✔
862
      .waterMark = pCountNode->window.watermark,
308✔
863
      .calTrigger = pCountNode->window.triggerType,
308✔
864
      .maxTs = INT64_MIN,
865
      .minTs = INT64_MAX,
866
      .deleteMark = getDeleteMark(&pCountNode->window, 0),
308✔
867
  };
868

869
  pInfo->primaryTsIndex = ((SColumnNode*)pCountNode->window.pTspk)->slotId;
308✔
870
  code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState,
308✔
871
                                sizeof(COUNT_TYPE), 0, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup,
872
                                GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex,
308✔
873
                                STREAM_STATE_BUFF_SORT, 1);
874
  QUERY_CHECK_CODE(code, lino, _error);
308!
875

876
  pInfo->streamAggSup.windowCount = pCountNode->windowCount;
308✔
877
  pInfo->streamAggSup.windowSliding = pCountNode->windowSliding;
308✔
878

879
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
308✔
880
  QUERY_CHECK_CODE(code, lino, _error);
308!
881

882
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
308✔
883
  pInfo->pStDeleted = tSimpleHashInit(64, hashFn);
308✔
884
  QUERY_CHECK_NULL(pInfo->pStDeleted, code, lino, _error, terrno);
308!
885
  pInfo->pDelIterator = NULL;
308✔
886

887
  code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
308✔
888
  QUERY_CHECK_CODE(code, lino, _error);
308!
889

890
  pInfo->ignoreExpiredData = pCountNode->window.igExpired;
308✔
891
  pInfo->ignoreExpiredDataSaved = false;
308✔
892
  pInfo->pUpdated = NULL;
308✔
893
  pInfo->pStUpdated = NULL;
308✔
894
  pInfo->dataVersion = 0;
308✔
895
  pInfo->historyWins = taosArrayInit(4, sizeof(SSessionKey));
308✔
896
  if (!pInfo->historyWins) {
308!
897
    code = terrno;
×
898
    QUERY_CHECK_CODE(code, lino, _error);
×
899
  }
900

901
  code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes);
308✔
902
  QUERY_CHECK_CODE(code, lino, _error);
308!
903

904
  pInfo->recvGetAll = false;
308✔
905
  pInfo->pPkDeleted = tSimpleHashInit(64, hashFn);
308✔
906
  QUERY_CHECK_NULL(pInfo->pPkDeleted, code, lino, _error, terrno);
308!
907
  pInfo->destHasPrimaryKey = pCountNode->window.destHasPrimaryKey;
308✔
908

909
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT;
308✔
910
  setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT, true,
308✔
911
                  OP_NOT_OPENED, pInfo, pTaskInfo);
912
  // for stream
913
  void*   buff = NULL;
308✔
914
  int32_t len = 0;
308✔
915
  int32_t res =
916
      pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_COUNT_OP_CHECKPOINT_NAME,
308✔
917
                                                        strlen(STREAM_COUNT_OP_CHECKPOINT_NAME), &buff, &len);
918
  if (res == TSDB_CODE_SUCCESS) {
308!
919
    code = doStreamCountDecodeOpState(buff, len, pOperator, true);
×
920
    QUERY_CHECK_CODE(code, lino, _error);
×
921
    taosMemoryFree(buff);
×
922
  }
923
  pInfo->pOperator = pOperator;
308✔
924
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamCountAggNext, NULL, destroyStreamCountAggOperatorInfo,
308✔
925
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
926
  setOperatorStreamStateFn(pOperator, streamCountReleaseState, streamCountReloadState);
308✔
927

928
  if (downstream) {
308!
929
    code = initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex,
308✔
930
                          &pInfo->twAggSup, &pInfo->basic);
308✔
931
    QUERY_CHECK_CODE(code, lino, _error);
308!
932

933
    code = appendDownstream(pOperator, &downstream, 1);
308✔
934
    QUERY_CHECK_CODE(code, lino, _error);
308!
935
  }
936

937
  *pOptrInfo = pOperator;
308✔
938
  return TSDB_CODE_SUCCESS;
308✔
939

940
_error:
×
941
  if (pInfo != NULL) {
×
942
    destroyStreamCountAggOperatorInfo(pInfo);
×
943
  }
944

945
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
946
  pTaskInfo->code = code;
×
947
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
948
  return code;
×
949
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc