• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3561

19 Dec 2024 03:15AM UTC coverage: 58.812% (-1.3%) from 60.124%
#3561

push

travis-ci

web-flow
Merge pull request #29213 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

130770 of 287658 branches covered (45.46%)

Branch coverage included in aggregate %.

32 of 78 new or added lines in 4 files covered. (41.03%)

7347 existing lines in 166 files now uncovered.

205356 of 283866 relevant lines covered (72.34%)

7187865.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.55
/source/libs/executor/src/streamcountwindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "executorInt.h"
16
#include "function.h"
17
#include "functionMgt.h"
18
#include "operator.h"
19
#include "querytask.h"
20
#include "streamexecutorInt.h"
21
#include "tchecksum.h"
22
#include "tcommon.h"
23
#include "tdatablock.h"
24
#include "tglobal.h"
25
#include "tlog.h"
26
#include "ttime.h"
27

28
#define IS_NORMAL_COUNT_OP(op)          ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT)
29
#define STREAM_COUNT_OP_STATE_NAME      "StreamCountHistoryState"
30
#define STREAM_COUNT_OP_CHECKPOINT_NAME "StreamCountOperator_Checkpoint"
31

32
typedef struct SCountWindowInfo {
33
  SResultWindowInfo winInfo;
34
  COUNT_TYPE*       pWindowCount;
35
} SCountWindowInfo;
36

37
typedef enum {
38
  NONE_WINDOW = 0,
39
  CREATE_NEW_WINDOW,
40
  MOVE_NEXT_WINDOW,
41
} BuffOp;
42
typedef struct SBuffInfo {
43
  bool             rebuildWindow;
44
  BuffOp           winBuffOp;
45
  SStreamStateCur* pCur;
46
} SBuffInfo;
47

48
void destroyStreamCountAggOperatorInfo(void* param) {
290✔
49
  if (param == NULL) {
290!
50
    return;
×
51
  }
52
  SStreamCountAggOperatorInfo* pInfo = (SStreamCountAggOperatorInfo*)param;
290✔
53
  cleanupBasicInfo(&pInfo->binfo);
290✔
54
  if (pInfo->pOperator) {
290!
55
    cleanupResultInfoInStream(pInfo->pOperator->pTaskInfo, pInfo->streamAggSup.pState, &pInfo->pOperator->exprSupp,
290✔
56
                              &pInfo->groupResInfo);
57
    pInfo->pOperator = NULL;
290✔
58
  }
59
  destroyStreamAggSupporter(&pInfo->streamAggSup);
290✔
60
  cleanupExprSupp(&pInfo->scalarSupp);
290✔
61
  clearGroupResInfo(&pInfo->groupResInfo);
290✔
62
  taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
290✔
63
  pInfo->pUpdated = NULL;
290✔
64

65
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
290✔
66
  blockDataDestroy(pInfo->pDelRes);
290✔
67
  tSimpleHashCleanup(pInfo->pStUpdated);
290✔
68
  tSimpleHashCleanup(pInfo->pStDeleted);
290✔
69
  cleanupGroupResInfo(&pInfo->groupResInfo);
290✔
70

71
  taosArrayDestroy(pInfo->historyWins);
290✔
72
  blockDataDestroy(pInfo->pCheckpointRes);
290✔
73

74
  tSimpleHashCleanup(pInfo->pPkDeleted);
290✔
75

76
  taosMemoryFreeClear(param);
290!
77
}
78

79
bool isSlidingCountWindow(SStreamAggSupporter* pAggSup) { return pAggSup->windowCount != pAggSup->windowSliding; }
10,382✔
80

81
int32_t setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, SCountWindowInfo* pCurWin,
5,091✔
82
                          SBuffInfo* pBuffInfo) {
83
  int32_t code = TSDB_CODE_SUCCESS;
5,091✔
84
  int32_t lino = 0;
5,091✔
85
  int32_t winCode = TSDB_CODE_SUCCESS;
5,091✔
86
  int32_t size = pAggSup->resultRowSize;
5,091✔
87
  pCurWin->winInfo.sessionWin.groupId = groupId;
5,091✔
88
  pCurWin->winInfo.sessionWin.win.skey = ts;
5,091✔
89
  pCurWin->winInfo.sessionWin.win.ekey = ts;
5,091✔
90

91
  if (isSlidingCountWindow(pAggSup)) {
5,091✔
92
    if (pBuffInfo->winBuffOp == CREATE_NEW_WINDOW) {
405✔
93
      code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin, pAggSup->windowCount,
33✔
94
                                                        (void**)&pCurWin->winInfo.pStatePos, &size);
33✔
95
      QUERY_CHECK_CODE(code, lino, _end);
33!
96

97
      winCode = TSDB_CODE_FAILED;
33✔
98
    } else if (pBuffInfo->winBuffOp == MOVE_NEXT_WINDOW) {
372✔
99
      QUERY_CHECK_NULL(pBuffInfo->pCur, code, lino, _end, terrno);
17!
100
      pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pBuffInfo->pCur);
17✔
101
      winCode = pAggSup->stateStore.streamStateSessionGetKVByCur(pBuffInfo->pCur, &pCurWin->winInfo.sessionWin,
34✔
102
                                                                 (void**)&pCurWin->winInfo.pStatePos, &size);
17✔
103
      if (winCode == TSDB_CODE_FAILED) {
17✔
104
        code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin, pAggSup->windowCount,
1✔
105
                                                          (void**)&pCurWin->winInfo.pStatePos, &size);
1✔
106
        QUERY_CHECK_CODE(code, lino, _end);
1!
107
      } else {
108
        reuseOutputBuf(pAggSup->pState, pCurWin->winInfo.pStatePos, &pAggSup->stateStore);
16✔
109
      }
110
    } else {
111
      pBuffInfo->pCur = pAggSup->stateStore.streamStateCountSeekKeyPrev(pAggSup->pState, &pCurWin->winInfo.sessionWin,
710✔
112
                                                                        pAggSup->windowCount);
355✔
113
      winCode = pAggSup->stateStore.streamStateSessionGetKVByCur(pBuffInfo->pCur, &pCurWin->winInfo.sessionWin,
710✔
114
                                                                 (void**)&pCurWin->winInfo.pStatePos, &size);
355✔
115
      if (winCode == TSDB_CODE_FAILED) {
355✔
116
        code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin, pAggSup->windowCount,
221✔
117
                                                          (void**)&pCurWin->winInfo.pStatePos, &size);
221✔
118
        QUERY_CHECK_CODE(code, lino, _end);
221!
119
      } else {
120
        reuseOutputBuf(pAggSup->pState, pCurWin->winInfo.pStatePos, &pAggSup->stateStore);
134✔
121
      }
122
    }
123
    if (ts < pCurWin->winInfo.sessionWin.win.ekey) {
411✔
124
      pBuffInfo->rebuildWindow = true;
9✔
125
    }
126
  } else {
127
    code = pAggSup->stateStore.streamStateCountWinAddIfNotExist(pAggSup->pState, &pCurWin->winInfo.sessionWin,
4,692✔
128
                                                                pAggSup->windowCount,
4,692✔
129
                                                                (void**)&pCurWin->winInfo.pStatePos, &size, &winCode);
4,692✔
130
    QUERY_CHECK_CODE(code, lino, _end);
4,689!
131
  }
132

133
  if (winCode == TSDB_CODE_SUCCESS) {
5,100✔
134
    pCurWin->winInfo.isOutput = true;
2,990✔
135
  }
136
  pCurWin->pWindowCount =
5,100✔
137
      (COUNT_TYPE*)((char*)pCurWin->winInfo.pStatePos->pRowBuff + (pAggSup->resultRowSize - sizeof(COUNT_TYPE)));
5,100✔
138

139
  if (*pCurWin->pWindowCount == pAggSup->windowCount) {
5,100✔
140
    pBuffInfo->rebuildWindow = true;
37✔
141
  }
142

143
_end:
5,063✔
144
  if (code != TSDB_CODE_SUCCESS) {
5,100!
145
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
146
  }
147
  return code;
5,102✔
148
}
149

150
static void removeCountResult(SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey* pKey) {
3,118✔
151
  SSessionKey key = {0};
3,118✔
152
  getSessionHashKey(pKey, &key);
3,118✔
153
  int32_t code = tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey));
3,106✔
154
  if (code != TSDB_CODE_SUCCESS) {
3,116✔
155
    qInfo("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
1,474!
156
  }
157

158
  code = tSimpleHashRemove(pResMap, &key, sizeof(SSessionKey));
3,122✔
159
  if (code != TSDB_CODE_SUCCESS) {
3,126✔
160
    qInfo("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
2,409!
161
  }
162
}
3,126✔
163

164
static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowInfo* pWinInfo, TSKEY* pTs,
5,050✔
165
                                     int32_t start, int32_t rows, int32_t maxRows, SSHashObj* pStUpdated,
166
                                     SSHashObj* pStDeleted, bool* pRebuild, int32_t* pWinRows) {
167
  int32_t     code = TSDB_CODE_SUCCESS;
5,050✔
168
  int32_t     lino = 0;
5,050✔
169
  SSessionKey sWinKey = pWinInfo->winInfo.sessionWin;
5,050✔
170
  int32_t     num = 0;
5,050✔
171
  for (int32_t i = start; i < rows; i++) {
5,194✔
172
    if (pTs[i] < pWinInfo->winInfo.sessionWin.win.ekey) {
5,180✔
173
      num++;
144✔
174
    } else {
175
      break;
5,036✔
176
    }
177
  }
178
  int32_t maxNum = TMIN(maxRows - *pWinInfo->pWindowCount, rows - start);
5,050✔
179
  if (num > maxNum) {
5,050!
180
    *pRebuild = true;
×
181
  }
182
  *pWinInfo->pWindowCount += maxNum;
5,050✔
183
  bool needDelState = false;
5,050✔
184
  if (pWinInfo->winInfo.sessionWin.win.skey > pTs[start]) {
5,050✔
185
    needDelState = true;
10✔
186
    if (pStDeleted && pWinInfo->winInfo.isOutput) {
10!
187
      code = saveDeleteRes(pStDeleted, pWinInfo->winInfo.sessionWin);
10✔
188
      QUERY_CHECK_CODE(code, lino, _end);
10!
189
    }
190

191
    pWinInfo->winInfo.sessionWin.win.skey = pTs[start];
10✔
192
  }
193

194
  if (pWinInfo->winInfo.sessionWin.win.ekey < pTs[maxNum + start - 1]) {
5,050✔
195
    needDelState = true;
3,096✔
196
    pWinInfo->winInfo.sessionWin.win.ekey = pTs[maxNum + start - 1];
3,096✔
197
  }
198

199
  if (needDelState) {
5,050✔
200
    memcpy(pWinInfo->winInfo.pStatePos->pKey, &pWinInfo->winInfo.sessionWin, sizeof(SSessionKey));
3,124✔
201
    removeCountResult(pStUpdated, pAggSup->pResultRows, &sWinKey);
3,124✔
202
    if (pWinInfo->winInfo.pStatePos->needFree) {
3,126✔
203
      pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &sWinKey);
36✔
204
    }
205
  }
206

207
  (*pWinRows) = maxNum;
5,064✔
208

209
_end:
5,064✔
210
  if (code != TSDB_CODE_SUCCESS) {
5,064!
211
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
212
  }
213
  return code;
5,064✔
214
}
215

216
void getCountWinRange(SStreamAggSupporter* pAggSup, const SSessionKey* pKey, EStreamType mode, SSessionKey* pDelRange) {
103✔
217
  *pDelRange = *pKey;
103✔
218
  SStreamStateCur* pCur = NULL;
103✔
219
  if (isSlidingCountWindow(pAggSup)) {
103✔
220
    pCur = pAggSup->stateStore.streamStateCountSeekKeyPrev(pAggSup->pState, pKey, pAggSup->windowCount);
16✔
221
  } else {
222
    pCur = pAggSup->stateStore.streamStateSessionSeekKeyCurrentNext(pAggSup->pState, pKey);
88✔
223
  }
224
  SSessionKey tmpKey = {.groupId = pKey->groupId, .win.ekey = INT64_MIN, .win.skey = INT64_MIN};
104✔
225
  int32_t     code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0);
104✔
226
  if (code != TSDB_CODE_SUCCESS) {
104!
227
    pAggSup->stateStore.streamStateFreeCur(pCur);
×
228
    return;
×
229
  }
230
  pDelRange->win = tmpKey.win;
104✔
231
  while (mode == STREAM_DELETE_DATA || mode == STREAM_PARTITION_DELETE_DATA) {
168✔
232
    pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pCur);
138✔
233
    code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0);
138✔
234
    if (code != TSDB_CODE_SUCCESS) {
138✔
235
      break;
74✔
236
    }
237
    pDelRange->win.ekey = TMAX(pDelRange->win.ekey, tmpKey.win.ekey);
64✔
238
  }
239
  pAggSup->stateStore.streamStateFreeCur(pCur);
104✔
240
}
241

242
static void destroySBuffInfo(SStreamAggSupporter* pAggSup, SBuffInfo* pBuffInfo) {
4,936✔
243
  pAggSup->stateStore.streamStateFreeCur(pBuffInfo->pCur);
4,936✔
244
}
4,939✔
245

246
bool inCountCalSlidingWindow(SStreamAggSupporter* pAggSup, STimeWindow* pWin, TSKEY sKey, TSKEY eKey) {
5,113✔
247
  if (pAggSup->windowCount == pAggSup->windowSliding) {
5,113✔
248
    return true;
4,709✔
249
  }
250
  if (sKey <= pWin->skey && pWin->ekey <= eKey) {
404!
251
    return true;
405✔
252
  }
UNCOV
253
  return false;
×
254
}
255

256
bool inCountSlidingWindow(SStreamAggSupporter* pAggSup, STimeWindow* pWin, SDataBlockInfo* pBlockInfo) {
5,088✔
257
  return inCountCalSlidingWindow(pAggSup, pWin, pBlockInfo->calWin.skey, pBlockInfo->calWin.ekey);
5,088✔
258
}
259

260
static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pStUpdated,
4,945✔
261
                                 SSHashObj* pStDeleted) {
262
  int32_t                      code = TSDB_CODE_SUCCESS;
4,945✔
263
  int32_t                      lino = 0;
4,945✔
264
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
4,945✔
265
  SStreamCountAggOperatorInfo* pInfo = pOperator->info;
4,945✔
266
  int32_t                      numOfOutput = pOperator->exprSupp.numOfExprs;
4,945✔
267
  uint64_t                     groupId = pSDataBlock->info.id.groupId;
4,945✔
268
  SResultRow*                  pResult = NULL;
4,945✔
269
  int32_t                      rows = pSDataBlock->info.rows;
4,945✔
270
  int32_t                      winRows = 0;
4,945✔
271
  SStreamAggSupporter*         pAggSup = &pInfo->streamAggSup;
4,945✔
272
  SBuffInfo                    buffInfo = {.rebuildWindow = false, .winBuffOp = NONE_WINDOW, .pCur = NULL};
4,945✔
273

274
  pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version);
4,945✔
275
  pAggSup->winRange = pTaskInfo->streamInfo.fillHistoryWindow;
4,945✔
276
  if (pAggSup->winRange.ekey <= 0) {
4,945!
277
    pAggSup->winRange.ekey = INT64_MAX;
×
278
  }
279

280
  SColumnInfoData* pStartTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
4,945✔
281
  if (!pStartTsCol) {
4,944!
282
    code = TSDB_CODE_FAILED;
×
283
    QUERY_CHECK_CODE(code, lino, _end);
×
284
  }
285
  TSKEY* startTsCols = (int64_t*)pStartTsCol->pData;
4,944✔
286
  code = blockDataEnsureCapacity(pAggSup->pScanBlock, rows * 2);
4,944✔
287
  QUERY_CHECK_CODE(code, lino, _end);
4,944!
288

289
  SStreamStateCur* pCur = NULL;
4,944✔
290
  COUNT_TYPE       slidingRows = 0;
4,944✔
291

292
  for (int32_t i = 0; i < rows;) {
10,011✔
293
    if (pInfo->ignoreExpiredData &&
9,798✔
294
        checkExpiredData(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pUpdateInfo, &pInfo->twAggSup,
4,694✔
295
                         pSDataBlock->info.id.uid, startTsCols[i], NULL, 0)) {
4,694✔
296
      i++;
3✔
297
      continue;
6✔
298
    }
299
    SCountWindowInfo curWin = {0};
5,101✔
300
    buffInfo.rebuildWindow = false;
5,101✔
301
    code = setCountOutputBuf(pAggSup, startTsCols[i], groupId, &curWin, &buffInfo);
5,101✔
302
    QUERY_CHECK_CODE(code, lino, _end);
5,102!
303

304
    if (!inCountSlidingWindow(pAggSup, &curWin.winInfo.sessionWin.win, &pSDataBlock->info)) {
5,102✔
305
      buffInfo.winBuffOp = MOVE_NEXT_WINDOW;
3✔
306
      continue;
3✔
307
    }
308
    setSessionWinOutputInfo(pStUpdated, &curWin.winInfo);
5,082✔
309
    slidingRows = *curWin.pWindowCount;
5,096✔
310
    if (!buffInfo.rebuildWindow) {
5,096✔
311
      code = updateCountWindowInfo(pAggSup, &curWin, startTsCols, i, rows, pAggSup->windowCount, pStUpdated, pStDeleted,
5,061✔
312
                                   &buffInfo.rebuildWindow, &winRows);
313
      QUERY_CHECK_CODE(code, lino, _end);
5,065!
314
    }
315
    if (buffInfo.rebuildWindow) {
5,100✔
316
      SSessionKey range = {0};
36✔
317
      if (isSlidingCountWindow(pAggSup)) {
36✔
318
        curWin.winInfo.sessionWin.win.skey = startTsCols[i];
6✔
319
        curWin.winInfo.sessionWin.win.ekey = startTsCols[i];
6✔
320
      }
321
      getCountWinRange(pAggSup, &curWin.winInfo.sessionWin, STREAM_DELETE_DATA, &range);
37✔
322
      range.win.skey = TMIN(startTsCols[i], range.win.skey);
36✔
323
      range.win.ekey = TMAX(startTsCols[rows - 1], range.win.ekey);
36✔
324
      uint64_t uid = 0;
36✔
325
      code =
326
          appendDataToSpecialBlock(pAggSup->pScanBlock, &range.win.skey, &range.win.ekey, &uid, &range.groupId, NULL);
36✔
327
      QUERY_CHECK_CODE(code, lino, _end);
37!
328
      break;
37✔
329
    }
330
    code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &curWin.winInfo, &pResult, i, winRows, rows, numOfOutput,
5,064✔
331
                              pOperator, 0);
332
    QUERY_CHECK_CODE(code, lino, _end);
5,062!
333

334
    code = saveSessionOutputBuf(pAggSup, &curWin.winInfo);
5,062✔
335
    QUERY_CHECK_CODE(code, lino, _end);
5,057!
336

337
    if (pInfo->destHasPrimaryKey && curWin.winInfo.isOutput && IS_NORMAL_COUNT_OP(pOperator)) {
5,057!
338
      code = saveDeleteRes(pInfo->pPkDeleted, curWin.winInfo.sessionWin);
×
339
      QUERY_CHECK_CODE(code, lino, _end);
×
340
    }
341

342
    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pStUpdated) {
5,057!
343
      code = saveResult(curWin.winInfo, pStUpdated);
4,109✔
344
      QUERY_CHECK_CODE(code, lino, _end);
4,110!
345
    }
346
    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
5,058✔
347
      curWin.winInfo.pStatePos->beUpdated = true;
947✔
348
      SSessionKey key = {0};
947✔
349
      getSessionHashKey(&curWin.winInfo.sessionWin, &key);
947✔
350
      code =
351
          tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
949✔
352
      QUERY_CHECK_CODE(code, lino, _end);
951!
353
    }
354

355
    if (isSlidingCountWindow(pAggSup)) {
5,062✔
356
      if (slidingRows <= pAggSup->windowSliding) {
396✔
357
        if (slidingRows + winRows > pAggSup->windowSliding) {
382✔
358
          buffInfo.winBuffOp = CREATE_NEW_WINDOW;
33✔
359
          winRows = pAggSup->windowSliding - slidingRows;
33✔
360
        }
361
      } else {
362
        buffInfo.winBuffOp = MOVE_NEXT_WINDOW;
14✔
363
        winRows = 0;
14✔
364
      }
365
    }
366
    i += winRows;
5,061✔
367
  }
368

369
_end:
4,908✔
370
  if (code != TSDB_CODE_SUCCESS) {
4,945!
371
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
372
  }
373
  destroySBuffInfo(pAggSup, &buffInfo);
4,945✔
374
}
4,939✔
375

376
static int32_t buildCountResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
5,698✔
377
  int32_t                      code = TSDB_CODE_SUCCESS;
5,698✔
378
  SStreamCountAggOperatorInfo* pInfo = pOperator->info;
5,698✔
379
  SStreamAggSupporter*         pAggSup = &pInfo->streamAggSup;
5,698✔
380
  SOptrBasicInfo*              pBInfo = &pInfo->binfo;
5,698✔
381
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
5,698✔
382
  doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
5,698✔
383
  if (pInfo->pDelRes->info.rows > 0) {
5,697✔
384
    printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
53✔
385
    (*ppRes) = pInfo->pDelRes;
52✔
386
    return code;
52✔
387
  }
388

389
  doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
5,644✔
390
  if (pBInfo->pRes->info.rows > 0) {
5,648✔
391
    printDataBlock(pBInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
2,331✔
392
    (*ppRes) = pBInfo->pRes;
2,331✔
393
    return code;
2,331✔
394
  }
395
  (*ppRes) = NULL;
3,317✔
396
  return code;
3,317✔
397
}
398

UNCOV
399
int32_t doStreamCountEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOperator, bool isParent) {
×
UNCOV
400
  SStreamCountAggOperatorInfo* pInfo = pOperator->info;
×
UNCOV
401
  if (!pInfo) {
×
402
    return 0;
×
403
  }
404

UNCOV
405
  void* pData = (buf == NULL) ? NULL : *buf;
×
406

407
  // 1.streamAggSup.pResultRows
UNCOV
408
  int32_t tlen = 0;
×
UNCOV
409
  int32_t mapSize = tSimpleHashGetSize(pInfo->streamAggSup.pResultRows);
×
UNCOV
410
  tlen += taosEncodeFixedI32(buf, mapSize);
×
UNCOV
411
  void*   pIte = NULL;
×
UNCOV
412
  size_t  keyLen = 0;
×
UNCOV
413
  int32_t iter = 0;
×
UNCOV
414
  while ((pIte = tSimpleHashIterate(pInfo->streamAggSup.pResultRows, pIte, &iter)) != NULL) {
×
UNCOV
415
    void* key = tSimpleHashGetKey(pIte, &keyLen);
×
UNCOV
416
    tlen += encodeSSessionKey(buf, key);
×
UNCOV
417
    tlen += encodeSResultWindowInfo(buf, pIte, pInfo->streamAggSup.resultRowSize);
×
418
  }
419

420
  // 2.twAggSup
UNCOV
421
  tlen += encodeSTimeWindowAggSupp(buf, &pInfo->twAggSup);
×
422

423
  // 3.dataVersion
UNCOV
424
  tlen += taosEncodeFixedI32(buf, pInfo->dataVersion);
×
425

426
  // 4.checksum
UNCOV
427
  if (isParent) {
×
UNCOV
428
    if (buf) {
×
UNCOV
429
      uint32_t cksum = taosCalcChecksum(0, pData, len - sizeof(uint32_t));
×
UNCOV
430
      tlen += taosEncodeFixedU32(buf, cksum);
×
431
    } else {
UNCOV
432
      tlen += sizeof(uint32_t);
×
433
    }
434
  }
435

UNCOV
436
  return tlen;
×
437
}
438

UNCOV
439
int32_t doStreamCountDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperator, bool isParent) {
×
UNCOV
440
  int32_t                      code = TSDB_CODE_SUCCESS;
×
UNCOV
441
  int32_t                      lino = 0;
×
UNCOV
442
  SStreamCountAggOperatorInfo* pInfo = pOperator->info;
×
UNCOV
443
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
×
UNCOV
444
  if (!pInfo) {
×
445
    code = TSDB_CODE_FAILED;
×
446
    QUERY_CHECK_CODE(code, lino, _end);
×
447
  }
448

449
  // 4.checksum
UNCOV
450
  if (isParent) {
×
UNCOV
451
    int32_t dataLen = len - sizeof(uint32_t);
×
UNCOV
452
    void*   pCksum = POINTER_SHIFT(buf, dataLen);
×
UNCOV
453
    if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) {
×
454
      code = TSDB_CODE_FAILED;
×
455
      QUERY_CHECK_CODE(code, lino, _end);
×
456
    }
457
  }
458

459
  // 1.streamAggSup.pResultRows
UNCOV
460
  int32_t mapSize = 0;
×
UNCOV
461
  buf = taosDecodeFixedI32(buf, &mapSize);
×
UNCOV
462
  for (int32_t i = 0; i < mapSize; i++) {
×
UNCOV
463
    SSessionKey      key = {0};
×
UNCOV
464
    SCountWindowInfo curWin = {0};
×
UNCOV
465
    buf = decodeSSessionKey(buf, &key);
×
UNCOV
466
    SBuffInfo buffInfo = {.rebuildWindow = false, .winBuffOp = NONE_WINDOW, .pCur = NULL};
×
UNCOV
467
    code = setCountOutputBuf(&pInfo->streamAggSup, key.win.skey, key.groupId, &curWin, &buffInfo);
×
UNCOV
468
    QUERY_CHECK_CODE(code, lino, _end);
×
469

UNCOV
470
    buf = decodeSResultWindowInfo(buf, &curWin.winInfo, pInfo->streamAggSup.resultRowSize);
×
UNCOV
471
    code = tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo,
×
472
                          sizeof(SResultWindowInfo));
UNCOV
473
    QUERY_CHECK_CODE(code, lino, _end);
×
474
  }
475

476
  // 2.twAggSup
UNCOV
477
  buf = decodeSTimeWindowAggSupp(buf, &pInfo->twAggSup);
×
478

479
  // 3.dataVersion
UNCOV
480
  buf = taosDecodeFixedI64(buf, &pInfo->dataVersion);
×
481

UNCOV
482
_end:
×
UNCOV
483
  if (code != TSDB_CODE_SUCCESS) {
×
484
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
485
  }
UNCOV
486
  return code;
×
487
}
488

489
void doStreamCountSaveCheckpoint(SOperatorInfo* pOperator) {
79✔
490
  int32_t                      code = TSDB_CODE_SUCCESS;
79✔
491
  int32_t                      lino = 0;
79✔
492
  void*                        pBuf = NULL;
79✔
493
  SStreamCountAggOperatorInfo* pInfo = pOperator->info;
79✔
494
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
79✔
495
  if (needSaveStreamOperatorInfo(&pInfo->basic)) {
79!
UNCOV
496
    int32_t len = doStreamCountEncodeOpState(NULL, 0, pOperator, true);
×
UNCOV
497
    pBuf = taosMemoryCalloc(1, len);
×
UNCOV
498
    if (!pBuf) {
×
499
      code = terrno;
×
500
      QUERY_CHECK_CODE(code, lino, _end);
×
501
    }
UNCOV
502
    void* pTmpBuf = pBuf;
×
UNCOV
503
    len = doStreamCountEncodeOpState(&pTmpBuf, len, pOperator, true);
×
UNCOV
504
    pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_COUNT_OP_CHECKPOINT_NAME,
×
505
                                                       strlen(STREAM_COUNT_OP_CHECKPOINT_NAME), pBuf, len);
UNCOV
506
    saveStreamOperatorStateComplete(&pInfo->basic);
×
507
  }
508

509
_end:
79✔
510
  taosMemoryFreeClear(pBuf);
79!
511
  if (code != TSDB_CODE_SUCCESS) {
79!
512
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
513
  }
514
}
79✔
515

516
void doResetCountWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock) {
29✔
517
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
29✔
518
  TSKEY*           startDatas = (TSKEY*)pStartTsCol->pData;
30✔
519
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
30✔
520
  TSKEY*           endDatas = (TSKEY*)pEndTsCol->pData;
30✔
521
  SColumnInfoData* pCalStartTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
30✔
522
  TSKEY*           calStartDatas = (TSKEY*)pStartTsCol->pData;
30✔
523
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
30✔
524
  TSKEY*           calEndDatas = (TSKEY*)pEndTsCol->pData;
30✔
525
  SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
30✔
526
  uint64_t*        gpDatas = (uint64_t*)pGroupCol->pData;
29✔
527

528
  SRowBuffPos* pPos = NULL;
29✔
529
  int32_t      size = 0;
29✔
530
  for (int32_t i = 0; i < pBlock->info.rows; i++) {
59✔
531
    SSessionKey      key = {.groupId = gpDatas[i], .win.skey = startDatas[i], .win.ekey = endDatas[i]};
29✔
532
    SStreamStateCur* pCur = NULL;
29✔
533
    if (isSlidingCountWindow(pAggSup)) {
29✔
534
      pCur = pAggSup->stateStore.streamStateCountSeekKeyPrev(pAggSup->pState, &key, pAggSup->windowCount);
2✔
535
    } else {
536
      pCur = pAggSup->stateStore.streamStateSessionSeekKeyCurrentNext(pAggSup->pState, &key);
28✔
537
    }
538
    while (1) {
33✔
539
      SSessionKey tmpKey = {.groupId = gpDatas[i], .win.skey = INT64_MIN, .win.ekey = INT64_MIN};
63✔
540
      int32_t     code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &tmpKey, (void**)&pPos, &size);
63✔
541
      if (code != TSDB_CODE_SUCCESS || tmpKey.win.skey > endDatas[i]) {
63✔
542
        pAggSup->stateStore.streamStateFreeCur(pCur);
30✔
543
        break;
30✔
544
      }
545
      if (!inCountCalSlidingWindow(pAggSup, &tmpKey.win, calStartDatas[i], calEndDatas[i])) {
33✔
546
        pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pCur);
2✔
547
        continue;
2✔
548
      }
549
      pAggSup->stateStore.streamStateSessionReset(pAggSup->pState, pPos->pRowBuff);
31✔
550
      pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pCur);
31✔
551
    }
552
  }
553
}
30✔
554

555
int32_t doDeleteCountWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SArray* result) {
12✔
556
  int32_t          code = TSDB_CODE_SUCCESS;
12✔
557
  int32_t          lino = 0;
12✔
558
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
12✔
559
  TSKEY*           startDatas = (TSKEY*)pStartTsCol->pData;
12✔
560
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
12✔
561
  TSKEY*           endDatas = (TSKEY*)pEndTsCol->pData;
12✔
562
  SColumnInfoData* pCalStartTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
12✔
563
  TSKEY*           calStartDatas = (TSKEY*)pStartTsCol->pData;
12✔
564
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
12✔
565
  TSKEY*           calEndDatas = (TSKEY*)pEndTsCol->pData;
12✔
566
  SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
12✔
567
  uint64_t*        gpDatas = (uint64_t*)pGroupCol->pData;
12✔
568
  for (int32_t i = 0; i < pBlock->info.rows; i++) {
26✔
569
    SSessionKey key = {.win.skey = startDatas[i], .win.ekey = endDatas[i], .groupId = gpDatas[i]};
14✔
570
    while (1) {
24✔
571
      SSessionKey curWin = {0};
38✔
572
      int32_t     winCode = pAggSup->stateStore.streamStateCountGetKeyByRange(pAggSup->pState, &key, &curWin);
38✔
573
      if (winCode != TSDB_CODE_SUCCESS) {
38✔
574
        break;
14✔
575
      }
576
      doDeleteSessionWindow(pAggSup, &curWin);
24✔
577
      if (result) {
24!
578
        code = saveDeleteInfo(result, curWin);
24✔
579
        QUERY_CHECK_CODE(code, lino, _end);
24!
580
      }
581
    }
582
  }
583

584
_end:
12✔
585
  if (code != TSDB_CODE_SUCCESS) {
12!
586
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
587
  }
588
  return code;
12✔
589
}
590

591
int32_t deleteCountWinState(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHashObj* pMapUpdate,
69✔
592
                            SSHashObj* pMapDelete, SSHashObj* pPkDelete, bool needAdd) {
593
  int32_t code = TSDB_CODE_SUCCESS;
69✔
594
  int32_t lino = 0;
69✔
595
  SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
69✔
596
  if (!pWins) {
69!
597
    code = terrno;
×
598
    QUERY_CHECK_CODE(code, lino, _end);
×
599
  }
600

601
  if (isSlidingCountWindow(pAggSup)) {
69✔
602
    code = doDeleteCountWindows(pAggSup, pBlock, pWins);
12✔
603
    QUERY_CHECK_CODE(code, lino, _end);
12!
604
  } else {
605
    code = doDeleteTimeWindows(pAggSup, pBlock, pWins);
57✔
606
    QUERY_CHECK_CODE(code, lino, _end);
57!
607
  }
608
  removeSessionResults(pAggSup, pMapUpdate, pWins);
69✔
609
  code = copyDeleteWindowInfo(pWins, pMapDelete);
69✔
610
  QUERY_CHECK_CODE(code, lino, _end);
70!
611
  if (needAdd) {
70!
612
    code = copyDeleteWindowInfo(pWins, pPkDelete);
×
613
    QUERY_CHECK_CODE(code, lino, _end);
×
614
  }
615
  taosArrayDestroy(pWins);
70✔
616

617
_end:
69✔
618
  if (code != TSDB_CODE_SUCCESS) {
69!
619
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
620
  }
621
  return code;
69✔
622
}
623

624
static int32_t doStreamCountAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
7,364✔
625
  int32_t                      code = TSDB_CODE_SUCCESS;
7,364✔
626
  int32_t                      lino = 0;
7,364✔
627
  SExprSupp*                   pSup = &pOperator->exprSupp;
7,364✔
628
  SStreamCountAggOperatorInfo* pInfo = pOperator->info;
7,364✔
629
  SOptrBasicInfo*              pBInfo = &pInfo->binfo;
7,364✔
630
  SStreamAggSupporter*         pAggSup = &pInfo->streamAggSup;
7,364✔
631
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
7,364✔
632
  qDebug("stask:%s  %s status: %d", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status);
7,364✔
633
  if (pOperator->status == OP_EXEC_DONE) {
7,365!
634
    (*ppRes) = NULL;
×
635
    return code;
×
636
  } else if (pOperator->status == OP_RES_TO_RETURN) {
7,365✔
637
    SSDataBlock* opRes = NULL;
2,384✔
638
    code = buildCountResult(pOperator, &opRes);
2,384✔
639
    QUERY_CHECK_CODE(code, lino, _end);
2,383!
640
    if (opRes) {
2,383✔
641
      (*ppRes) = opRes;
1,251✔
642
      return code;
2,382✔
643
    }
644

645
    if (pInfo->recvGetAll) {
1,132!
646
      pInfo->recvGetAll = false;
×
647
      resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows);
×
648
    }
649

650
    if (pInfo->reCkBlock) {
1,133!
651
      pInfo->reCkBlock = false;
×
652
      printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
653
      (*ppRes) = pInfo->pCheckpointRes;
×
654
      return code;
×
655
    }
656

657
    setStreamOperatorCompleted(pOperator);
1,133✔
658
    (*ppRes) = NULL;
1,132✔
659
    return code;
1,132✔
660
  }
661

662
  SOperatorInfo* downstream = pOperator->pDownstream[0];
4,981✔
663
  if (!pInfo->pUpdated) {
4,981✔
664
    pInfo->pUpdated = taosArrayInit(16, sizeof(SResultWindowInfo));
3,316✔
665
    QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
3,316!
666
  }
667
  if (!pInfo->pStUpdated) {
4,981✔
668
    _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
3,316✔
669
    pInfo->pStUpdated = tSimpleHashInit(64, hashFn);
3,316✔
670
    QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _end, terrno);
3,318!
671
  }
672
  while (1) {
5,114✔
673
    SSDataBlock* pBlock = NULL;
10,097✔
674
    code = downstream->fpSet.getNextFn(downstream, &pBlock);
10,097✔
675
    QUERY_CHECK_CODE(code, lino, _end);
10,069!
676

677
    if (pBlock == NULL) {
10,069✔
678
      break;
3,316✔
679
    }
680

681
    printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
6,753✔
682
    setStreamOperatorState(&pInfo->basic, pBlock->info.type);
6,779✔
683

684
    if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
6,784✔
685
      bool add = pInfo->destHasPrimaryKey && IS_NORMAL_COUNT_OP(pOperator);
69!
686
      code = deleteCountWinState(&pInfo->streamAggSup, pBlock, pInfo->pStUpdated, pInfo->pStDeleted, pInfo->pPkDeleted,
69✔
687
                                 add);
688
      QUERY_CHECK_CODE(code, lino, _end);
69!
689
      continue;
178✔
690
    } else if (pBlock->info.type == STREAM_CLEAR) {
6,715✔
691
      doResetCountWindows(&pInfo->streamAggSup, pBlock);
30✔
692
      continue;
30✔
693
    } else if (pBlock->info.type == STREAM_GET_ALL) {
6,685!
694
      pInfo->recvGetAll = true;
×
695
      code = getAllSessionWindow(pAggSup->pResultRows, pInfo->pStUpdated);
×
696
      QUERY_CHECK_CODE(code, lino, _end);
×
697
      continue;
×
698
    } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
6,685✔
699
      (*ppRes) = pBlock;
1,666✔
700
      return code;
1,666✔
701
    } else if (pBlock->info.type == STREAM_CHECKPOINT) {
5,019✔
702
      pAggSup->stateStore.streamStateCommit(pAggSup->pState);
79✔
703
      doStreamCountSaveCheckpoint(pOperator);
79✔
704
      code = copyDataBlock(pInfo->pCheckpointRes, pBlock);
79✔
705
      QUERY_CHECK_CODE(code, lino, _end);
79!
706
      continue;
79✔
707
    } else {
708
      if (pBlock->info.type != STREAM_NORMAL && pBlock->info.type != STREAM_INVALID) {
4,940!
709
        code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
710
        QUERY_CHECK_CODE(code, lino, _end);
×
711
      }
712
    }
713

714
    if (pInfo->scalarSupp.pExprInfo != NULL) {
4,940!
715
      SExprSupp* pExprSup = &pInfo->scalarSupp;
×
716
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
×
717
      QUERY_CHECK_CODE(code, lino, _end);
×
718
    }
719
    // the pDataBlock are always the same one, no need to call this again
720
    code = setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
4,940✔
721
    QUERY_CHECK_CODE(code, lino, _end);
4,945!
722
    doStreamCountAggImpl(pOperator, pBlock, pInfo->pStUpdated, pInfo->pStDeleted);
4,945✔
723
    pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
4,936✔
724
    pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.watermark);
4,936✔
725
  }
726
  // restore the value
727
  pOperator->status = OP_RES_TO_RETURN;
3,316✔
728

729
  code = closeSessionWindow(pAggSup->pResultRows, &pInfo->twAggSup, pInfo->pStUpdated);
3,316✔
730
  QUERY_CHECK_CODE(code, lino, _end);
3,314!
731

732
  code = copyUpdateResult(&pInfo->pStUpdated, pInfo->pUpdated, sessionKeyCompareAsc);
3,314✔
733
  QUERY_CHECK_CODE(code, lino, _end);
3,317!
734

735
  removeSessionDeleteResults(pInfo->pStDeleted, pInfo->pUpdated);
3,317✔
736
  initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
3,316✔
737
  pInfo->pUpdated = NULL;
3,313✔
738
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
3,313✔
739
  QUERY_CHECK_CODE(code, lino, _end);
3,315!
740

741
  if (pInfo->destHasPrimaryKey && IS_NORMAL_COUNT_OP(pOperator)) {
3,315!
742
    code = copyDeleteSessionKey(pInfo->pPkDeleted, pInfo->pStDeleted);
×
743
    QUERY_CHECK_CODE(code, lino, _end);
×
744
  }
745

746
  SSDataBlock* opRes = NULL;
3,315✔
747
  code = buildCountResult(pOperator, &opRes);
3,315✔
748
  QUERY_CHECK_CODE(code, lino, _end);
3,318!
749
  if (opRes) {
3,318✔
750
    (*ppRes) = opRes;
1,133✔
751
    return code;
1,133✔
752
  }
753

754
_end:
2,185✔
755
  if (code != TSDB_CODE_SUCCESS) {
2,185!
756
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
757
    pTaskInfo->code = code;
×
758
    T_LONG_JMP(pTaskInfo->env, code);
×
759
  }
760
  setStreamOperatorCompleted(pOperator);
2,185✔
761
  (*ppRes) = NULL;
2,184✔
762
  return code;
2,184✔
763
}
764

765
void streamCountReleaseState(SOperatorInfo* pOperator) {
79✔
766
  int32_t                      code = TSDB_CODE_SUCCESS;
79✔
767
  int32_t                      lino = 0;
79✔
768
  SStreamEventAggOperatorInfo* pInfo = pOperator->info;
79✔
769
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
79✔
770
  int32_t                      resSize = sizeof(TSKEY);
79✔
771
  char*                        pBuff = taosMemoryCalloc(1, resSize);
79!
772
  QUERY_CHECK_NULL(pBuff, code, lino, _end, terrno);
79!
773

774
  memcpy(pBuff, &pInfo->twAggSup.maxTs, sizeof(TSKEY));
79✔
775
  qDebug("===stream=== count window operator relase state. ");
79✔
776
  pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_COUNT_OP_STATE_NAME,
79✔
777
                                                     strlen(STREAM_COUNT_OP_STATE_NAME), pBuff, resSize);
778
  pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState);
79✔
779
  taosMemoryFreeClear(pBuff);
79!
780
  SOperatorInfo* downstream = pOperator->pDownstream[0];
79✔
781
  if (downstream->fpSet.releaseStreamStateFn) {
79!
782
    downstream->fpSet.releaseStreamStateFn(downstream);
79✔
783
  }
784
_end:
×
785
  if (code != TSDB_CODE_SUCCESS) {
79!
786
    terrno = code;
×
787
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
788
  }
789
}
79✔
790

791
void streamCountReloadState(SOperatorInfo* pOperator) {
79✔
792
  int32_t                      code = TSDB_CODE_SUCCESS;
79✔
793
  int32_t                      lino = 0;
79✔
794
  SStreamCountAggOperatorInfo* pInfo = pOperator->info;
79✔
795
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
79✔
796
  SStreamAggSupporter*         pAggSup = &pInfo->streamAggSup;
79✔
797
  int32_t                      size = 0;
79✔
798
  void*                        pBuf = NULL;
79✔
799

800
  code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_COUNT_OP_STATE_NAME,
79✔
801
                                                strlen(STREAM_COUNT_OP_STATE_NAME), &pBuf, &size);
802
  QUERY_CHECK_CODE(code, lino, _end);
79!
803

804
  TSKEY ts = *(TSKEY*)pBuf;
79✔
805
  pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts);
79✔
806
  taosMemoryFree(pBuf);
79!
807

808
  SOperatorInfo* downstream = pOperator->pDownstream[0];
79✔
809
  if (downstream->fpSet.reloadStreamStateFn) {
79!
810
    downstream->fpSet.reloadStreamStateFn(downstream);
79✔
811
  }
812
  reloadAggSupFromDownStream(downstream, &pInfo->streamAggSup);
79✔
813

814
_end:
79✔
815
  if (code != TSDB_CODE_SUCCESS) {
79!
816
    terrno = code;
×
817
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
818
  }
819
}
79✔
820

821
int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
308✔
822
                                         SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
823
  QRY_PARAM_CHECK(pOptrInfo);
308!
824

825
  SCountWinodwPhysiNode*       pCountNode = (SCountWinodwPhysiNode*)pPhyNode;
308✔
826
  int32_t                      numOfCols = 0;
308✔
827
  int32_t                      code = TSDB_CODE_SUCCESS;
308✔
828
  int32_t                      lino = 0;
308✔
829
  SStreamCountAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamCountAggOperatorInfo));
308!
830
  SOperatorInfo*               pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
308!
831
  if (pInfo == NULL || pOperator == NULL) {
308!
832
    code = terrno;
×
833
    QUERY_CHECK_CODE(code, lino, _error);
×
834
  }
835

836
  pOperator->pTaskInfo = pTaskInfo;
308✔
837

838
  initResultSizeInfo(&pOperator->resultInfo, 4096);
308✔
839
  if (pCountNode->window.pExprs != NULL) {
308!
840
    int32_t    numOfScalar = 0;
×
841
    SExprInfo* pScalarExprInfo = NULL;
×
842
    code = createExprInfo(pCountNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
×
843
    QUERY_CHECK_CODE(code, lino, _error);
×
844

845
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
×
846
    QUERY_CHECK_CODE(code, lino, _error);
×
847
  }
848
  SExprSupp* pExpSup = &pOperator->exprSupp;
308✔
849

850
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
308✔
851
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
308!
852
  pInfo->binfo.pRes = pResBlock;
308✔
853

854
  SExprInfo*   pExprInfo = NULL;
308✔
855
  code = createExprInfo(pCountNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
308✔
856
  QUERY_CHECK_CODE(code, lino, _error);
308!
857

858
  code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
308✔
859
  QUERY_CHECK_CODE(code, lino, _error);
308!
860

861
  pInfo->twAggSup = (STimeWindowAggSupp){
308✔
862
      .waterMark = pCountNode->window.watermark,
308✔
863
      .calTrigger = pCountNode->window.triggerType,
308✔
864
      .maxTs = INT64_MIN,
865
      .minTs = INT64_MAX,
866
      .deleteMark = getDeleteMark(&pCountNode->window, 0),
308✔
867
  };
868

869
  pInfo->primaryTsIndex = ((SColumnNode*)pCountNode->window.pTspk)->slotId;
308✔
870
  code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState,
308✔
871
                                sizeof(COUNT_TYPE), 0, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup,
872
                                GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex,
308✔
873
                                STREAM_STATE_BUFF_SORT, 1);
874
  QUERY_CHECK_CODE(code, lino, _error);
308!
875

876
  pInfo->streamAggSup.windowCount = pCountNode->windowCount;
308✔
877
  pInfo->streamAggSup.windowSliding = pCountNode->windowSliding;
308✔
878

879
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
308✔
880
  QUERY_CHECK_CODE(code, lino, _error);
308!
881

882
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
308✔
883
  pInfo->pStDeleted = tSimpleHashInit(64, hashFn);
308✔
884
  QUERY_CHECK_NULL(pInfo->pStDeleted, code, lino, _error, terrno);
308!
885
  pInfo->pDelIterator = NULL;
308✔
886

887
  code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
308✔
888
  QUERY_CHECK_CODE(code, lino, _error);
308!
889

890
  pInfo->ignoreExpiredData = pCountNode->window.igExpired;
308✔
891
  pInfo->ignoreExpiredDataSaved = false;
308✔
892
  pInfo->pUpdated = NULL;
308✔
893
  pInfo->pStUpdated = NULL;
308✔
894
  pInfo->dataVersion = 0;
308✔
895
  pInfo->historyWins = taosArrayInit(4, sizeof(SSessionKey));
308✔
896
  if (!pInfo->historyWins) {
308!
897
    code = terrno;
×
898
    QUERY_CHECK_CODE(code, lino, _error);
×
899
  }
900

901
  code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes);
308✔
902
  QUERY_CHECK_CODE(code, lino, _error);
308!
903

904
  pInfo->recvGetAll = false;
308✔
905
  pInfo->pPkDeleted = tSimpleHashInit(64, hashFn);
308✔
906
  QUERY_CHECK_NULL(pInfo->pPkDeleted, code, lino, _error, terrno);
308!
907
  pInfo->destHasPrimaryKey = pCountNode->window.destHasPrimaryKey;
308✔
908

909
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT;
308✔
910
  setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT, true,
308✔
911
                  OP_NOT_OPENED, pInfo, pTaskInfo);
912
  // for stream
913
  void*   buff = NULL;
308✔
914
  int32_t len = 0;
308✔
915
  int32_t res =
916
      pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_COUNT_OP_CHECKPOINT_NAME,
308✔
917
                                                        strlen(STREAM_COUNT_OP_CHECKPOINT_NAME), &buff, &len);
918
  if (res == TSDB_CODE_SUCCESS) {
308!
UNCOV
919
    code = doStreamCountDecodeOpState(buff, len, pOperator, true);
×
UNCOV
920
    QUERY_CHECK_CODE(code, lino, _error);
×
UNCOV
921
    taosMemoryFree(buff);
×
922
  }
923
  pInfo->pOperator = pOperator;
308✔
924
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamCountAggNext, NULL, destroyStreamCountAggOperatorInfo,
308✔
925
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
926
  setOperatorStreamStateFn(pOperator, streamCountReleaseState, streamCountReloadState);
308✔
927

928
  if (downstream) {
308!
929
    code = initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex,
308✔
930
                          &pInfo->twAggSup, &pInfo->basic);
308✔
931
    QUERY_CHECK_CODE(code, lino, _error);
308!
932

933
    code = appendDownstream(pOperator, &downstream, 1);
308✔
934
    QUERY_CHECK_CODE(code, lino, _error);
308!
935
  }
936

937
  *pOptrInfo = pOperator;
308✔
938
  return TSDB_CODE_SUCCESS;
308✔
939

940
_error:
×
941
  if (pInfo != NULL) {
×
942
    destroyStreamCountAggOperatorInfo(pInfo);
×
943
  }
944

945
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
946
  pTaskInfo->code = code;
×
947
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
948
  return code;
×
949
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc