• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.79
/source/libs/stream/src/streamSessionState.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tstreamFileState.h"
17

18
#include "query.h"
19
#include "streamBackendRocksdb.h"
20
#include "tcommon.h"
21
#include "tsimplehash.h"
22

23
#define MAX_SCAN_RANGE_SIZE 102400
24

25
int sessionStateKeyCompare(const void* pWin1, const void* pDatas, int pos) {
39,791✔
26
  SRowBuffPos* pPos2 = taosArrayGetP(pDatas, pos);
39,791✔
27
  SSessionKey* pWin2 = (SSessionKey*)pPos2->pKey;
39,775✔
28
  return sessionWinKeyCmpr((SSessionKey*)pWin1, pWin2);
39,775✔
29
}
30

31
int sessionStateRangeKeyCompare(const SSessionKey* pWin1, const void* pDatas, int pos) {
1,256✔
32
  SRowBuffPos* pPos2 = taosArrayGetP(pDatas, pos);
1,256✔
33
  SSessionKey* pWin2 = (SSessionKey*)pPos2->pKey;
1,256✔
34
  return sessionRangeKeyCmpr(pWin1, pWin2);
1,256✔
35
}
36

37
int32_t binarySearch(void* keyList, int num, const void* key, __session_compare_fn_t cmpFn) {
29,073✔
38
  int firstPos = 0, lastPos = num - 1, midPos = -1;
29,073✔
39
  int numOfRows = 0;
29,073✔
40

41
  if (num <= 0) return -1;
29,073✔
42
  // find the first position which is smaller or equal than the key.
43
  // if all data is bigger than the key return -1
44
  while (1) {
45
    if (cmpFn(key, keyList, lastPos) >= 0) return lastPos;
29,843✔
46
    if (cmpFn(key, keyList, firstPos) == 0) return firstPos;
7,662✔
47
    if (cmpFn(key, keyList, firstPos) < 0) return firstPos - 1;
6,655✔
48

49
    numOfRows = lastPos - firstPos + 1;
3,165✔
50
    midPos = (numOfRows >> 1) + firstPos;
3,165✔
51

52
    if (cmpFn(key, keyList, midPos) < 0) {
3,165✔
53
      lastPos = midPos - 1;
1,640✔
54
    } else if (cmpFn(key, keyList, midPos) > 0) {
1,528✔
55
      firstPos = midPos + 1;
820✔
56
    } else {
57
      break;
711✔
58
    }
59
  }
60

61
  return midPos;
711✔
62
}
63

64
int64_t getSessionWindowEndkey(void* data, int32_t index) {
×
65
  SArray*       pWinInfos = (SArray*)data;
×
66
  SRowBuffPos** ppos = taosArrayGet(pWinInfos, index);
×
67
  if (ppos != NULL) {
×
UNCOV
68
    SSessionKey* pWin = (SSessionKey*)((*ppos)->pKey);
×
69
    return pWin->win.ekey;
×
70
  } else {
UNCOV
71
    return 0;
×
72
  }
73
}
74

75
bool inSessionWindow(SSessionKey* pKey, TSKEY ts, int64_t gap) {
10,112✔
76
  if (ts + gap >= pKey->win.skey && ts - gap <= pKey->win.ekey) {
10,112✔
77
    return true;
2,052✔
78
  }
79
  return false;
8,060✔
80
}
81

82
SStreamStateCur* createStateCursor(SStreamFileState* pFileState) {
13,397✔
83
  SStreamStateCur* pCur = createStreamStateCursor();
13,397✔
84
  if (pCur == NULL) {
13,406!
UNCOV
85
    return NULL;
×
86
  }
87
  pCur->pStreamFileState = pFileState;
13,406✔
88
  return pCur;
13,406✔
89
}
90

91
static int32_t addNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInfos, const SSessionKey* pKey,
5,902✔
92
                                   SRowBuffPos** ppPos) {
93
  int32_t      code = TSDB_CODE_SUCCESS;
5,902✔
94
  int32_t      lino = 0;
5,902✔
95
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
5,902✔
96
  if (!pNewPos || !pNewPos->pRowBuff) {
5,908!
UNCOV
97
    code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
98
    QUERY_CHECK_CODE(code, lino, _end);
×
99
  }
100

101
  memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
5,908✔
102
  void* tmp = taosArrayPush(pWinInfos, &pNewPos);
5,909✔
103
  if (!tmp) {
5,909!
UNCOV
104
    code = terrno;
×
UNCOV
105
    QUERY_CHECK_CODE(code, lino, _end);
×
106
  }
107
  (*ppPos) = pNewPos;
5,909✔
108

109
_end:
5,909✔
110
  if (code != TSDB_CODE_SUCCESS) {
5,909!
UNCOV
111
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
112
  }
113
  return code;
5,909✔
114
}
115

116
static int32_t insertNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInfos, const SSessionKey* pKey,
152✔
117
                                      int32_t index, SRowBuffPos** ppPos) {
118
  int32_t      code = TSDB_CODE_SUCCESS;
152✔
119
  int32_t      lino = 0;
152✔
120
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
152✔
121
  if (!pNewPos || !pNewPos->pRowBuff) {
152!
UNCOV
122
    code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
123
    QUERY_CHECK_CODE(code, lino, _end);
×
124
  }
125

126
  memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
152✔
127
  void* tmp = taosArrayInsert(pWinInfos, index, &pNewPos);
152✔
128
  if (!tmp) {
152!
UNCOV
129
    code = terrno;
×
UNCOV
130
    QUERY_CHECK_CODE(code, lino, _end);
×
131
  }
132

133
  *ppPos = pNewPos;
152✔
134

135
_end:
152✔
136
  if (code != TSDB_CODE_SUCCESS) {
152!
UNCOV
137
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
138
  }
139
  return code;
152✔
140
}
141

142
SRowBuffPos* createSessionWinBuff(SStreamFileState* pFileState, SSessionKey* pKey, void* p, int32_t* pVLen) {
491✔
143
  int32_t      code = TSDB_CODE_SUCCESS;
491✔
144
  int32_t      lino = 0;
491✔
145
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
491✔
146
  if (!pNewPos || !pNewPos->pRowBuff) {
491!
UNCOV
147
    code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
148
    QUERY_CHECK_CODE(code, lino, _end);
×
149
  }
150

151
  memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
491✔
152
  pNewPos->needFree = true;
491✔
153
  pNewPos->beFlushed = true;
491✔
154
  if (p) {
491✔
155
    memcpy(pNewPos->pRowBuff, p, *pVLen);
482✔
156
  } else {
157
    int32_t len = getRowStateRowSize(pFileState);
9✔
158
    memset(pNewPos->pRowBuff, 0, len);
9✔
159
  }
160

161
_end:
491✔
162
  taosMemoryFree(p);
491!
163
  if (code != TSDB_CODE_SUCCESS) {
491!
UNCOV
164
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
165
    return NULL;
×
166
  }
167
  return pNewPos;
491✔
168
}
169

170
int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, int32_t* pVLen,
3,348✔
171
                                int32_t* pWinCode) {
172
  int32_t code = TSDB_CODE_SUCCESS;
3,348✔
173
  int32_t lino = 0;
3,348✔
174
  (*pWinCode) = TSDB_CODE_SUCCESS;
3,348✔
175
  SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
3,348✔
176
  SArray*    pWinStates = NULL;
3,348✔
177
  void**     ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t));
3,348✔
178
  if (ppBuff) {
3,348✔
179
    pWinStates = (SArray*)(*ppBuff);
2,234✔
180
  } else {
181
    pWinStates = taosArrayInit(16, POINTER_BYTES);
1,114✔
182
    if (!pWinStates) {
1,115!
UNCOV
183
      code = terrno;
×
UNCOV
184
      QUERY_CHECK_CODE(code, lino, _end);
×
185
    }
186
    code = tSimpleHashPut(pSessionBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
1,115✔
187
    QUERY_CHECK_CODE(code, lino, _end);
1,113!
188
  }
189

190
  TSKEY startTs = pKey->win.skey;
3,347✔
191
  TSKEY endTs = pKey->win.ekey;
3,347✔
192

193
  int32_t size = taosArrayGetSize(pWinStates);
3,347✔
194
  if (size == 0) {
3,346✔
195
    void*   pFileStore = getStateFileStore(pFileState);
1,201✔
196
    void*   p = NULL;
1,201✔
197
    int32_t code_file = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen);
1,201✔
198
    if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
1,203✔
199
      (*pVal) = createSessionWinBuff(pFileState, pKey, p, pVLen);
57✔
200
      if (!(*pVal)) {
57!
UNCOV
201
        code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
202
        QUERY_CHECK_CODE(code, lino, _end);
×
203
      }
204

205
      (*pWinCode) = code_file;
57✔
206
      qDebug("===stream===0 get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code_file);
57✔
207
    } else {
208
      code = addNewSessionWindow(pFileState, pWinStates, pKey, (SRowBuffPos**)pVal);
1,147✔
209
      (*pWinCode) = TSDB_CODE_FAILED;
1,147✔
210
      taosMemoryFree(p);
1,147!
211
      QUERY_CHECK_CODE(code, lino, _end);
1,147!
212
    }
213
    goto _end;
1,204✔
214
  }
215

216
  // find the first position which is smaller than the pKey
217
  int32_t      index = binarySearch(pWinStates, size, pKey, sessionStateKeyCompare);
2,145✔
218
  SRowBuffPos* pPos = NULL;
2,144✔
219

220
  if (index >= 0) {
2,144✔
221
    pPos = taosArrayGetP(pWinStates, index);
1,842✔
222
    if (inSessionWindow(pPos->pKey, startTs, gap)) {
1,842✔
223
      (*pVal) = pPos;
801✔
224
      SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
801✔
225
      pPos->beUsed = true;
801✔
226
      pPos->beFlushed = false;
801✔
227
      *pKey = *pDestWinKey;
801✔
228
      goto _end;
801✔
229
    }
230
  }
231

232
  if (index + 1 < size) {
1,344✔
233
    pPos = taosArrayGetP(pWinStates, index + 1);
406✔
234
    if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap))) {
406!
235
      (*pVal) = pPos;
79✔
236
      SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
79✔
237
      pPos->beUsed = true;
79✔
238
      pPos->beFlushed = false;
79✔
239
      *pKey = *pDestWinKey;
79✔
240
      goto _end;
79✔
241
    }
242
  }
243

244
  if (index + 1 == 0) {
1,265✔
245
    if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs, gap)) {
274!
246
      void*   p = NULL;
250✔
247
      void*   pFileStore = getStateFileStore(pFileState);
250✔
248
      int32_t code_file = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen);
249✔
249
      if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
249!
250
        (*pVal) = createSessionWinBuff(pFileState, pKey, p, pVLen);
249✔
251
        if (!(*pVal)) {
249!
UNCOV
252
          code = TSDB_CODE_OUT_OF_MEMORY;
×
253
          QUERY_CHECK_CODE(code, lino, _end);
249!
254
        }
255

256
        (*pWinCode) = code_file;
249✔
257
        qDebug("===stream===1 get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code_file);
249✔
258
        goto _end;
249✔
259
      } else {
UNCOV
260
        taosMemoryFree(p);
×
261
      }
262
    }
263
  }
264

265
  if (index == size - 1) {
1,015✔
266
    code = addNewSessionWindow(pFileState, pWinStates, pKey, (SRowBuffPos**)pVal);
938✔
267
    QUERY_CHECK_CODE(code, lino, _end);
938!
268

269
    (*pWinCode) = TSDB_CODE_FAILED;
938✔
270
    goto _end;
938✔
271
  }
272

273
  code = insertNewSessionWindow(pFileState, pWinStates, pKey, index + 1, (SRowBuffPos**)pVal);
77✔
274
  QUERY_CHECK_CODE(code, lino, _end);
77!
275

276
  (*pWinCode) = TSDB_CODE_FAILED;
77✔
277

278
_end:
3,348✔
279
  if (code != TSDB_CODE_SUCCESS) {
3,348!
UNCOV
280
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
281
  }
282
  return code;
3,347✔
283
}
284

285
int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,
93✔
286
                          int32_t* pWinCode) {
287
  SWinKey*    pTmpkey = pKey;
93✔
288
  SSessionKey pWinKey = {.groupId = pTmpkey->groupId, .win.skey = pTmpkey->ts, .win.ekey = pTmpkey->ts};
93✔
289
  return getSessionWinResultBuff(pFileState, &pWinKey, 0, pVal, pVLen, pWinCode);
93✔
290
}
291

292
int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
140✔
293
  int32_t      code = TSDB_CODE_SUCCESS;
140✔
294
  int32_t      lino = 0;
140✔
295
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
140✔
296
  SSessionKey* pKey = pPos->pKey;
140✔
297
  SArray*      pWinStates = NULL;
140✔
298
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t));
140✔
299
  if (ppBuff) {
140✔
300
    pWinStates = (SArray*)(*ppBuff);
109✔
301
  } else {
302
    pWinStates = taosArrayInit(16, POINTER_BYTES);
31✔
303
    if (!pWinStates) {
31!
UNCOV
304
      code = terrno;
×
UNCOV
305
      QUERY_CHECK_CODE(code, lino, _end);
×
306
    }
307

308
    code = tSimpleHashPut(pSessionBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
31✔
309
    QUERY_CHECK_CODE(code, lino, _end);
31!
310
  }
311

312
  int32_t size = taosArrayGetSize(pWinStates);
140✔
313
  if (size == 0) {
140✔
314
    void* tmp = taosArrayPush(pWinStates, &pPos);
128✔
315
    if (!tmp) {
128!
UNCOV
316
      code = terrno;
×
UNCOV
317
      QUERY_CHECK_CODE(code, lino, _end);
×
318
    }
319
    goto _end;
128✔
320
  }
321

322
  // find the first position which is smaller than the pKey
323
  int32_t index = binarySearch(pWinStates, size, pKey, sessionStateKeyCompare);
12✔
324
  if (index >= 0) {
12!
325
    void* tmp = taosArrayInsert(pWinStates, index, &pPos);
×
326
    if (!tmp) {
×
UNCOV
327
      code = terrno;
×
UNCOV
328
      QUERY_CHECK_CODE(code, lino, _end);
×
329
    }
330
  } else {
331
    void* tmp = taosArrayInsert(pWinStates, 0, &pPos);
12✔
332
    if (!tmp) {
12!
UNCOV
333
      code = terrno;
×
UNCOV
334
      QUERY_CHECK_CODE(code, lino, _end);
×
335
    }
336
  }
337

338
_end:
12✔
339
  pPos->needFree = false;
140✔
340
  if (code != TSDB_CODE_SUCCESS) {
140!
UNCOV
341
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
342
  }
343
  return code;
140✔
344
}
345

346
int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen,
338✔
347
                              int32_t* pWinCode) {
348
  int32_t      code = TSDB_CODE_SUCCESS;
338✔
349
  int32_t      lino = 0;
338✔
350
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
338✔
351
  if (!pNewPos || !pNewPos->pRowBuff) {
338!
UNCOV
352
    code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
353
    QUERY_CHECK_CODE(code, lino, _end);
×
354
  }
355
  pNewPos->needFree = true;
338✔
356
  pNewPos->beFlushed = true;
338✔
357
  void* pBuff = NULL;
338✔
358
  (*pWinCode) = streamStateSessionGet_rocksdb(getStateFileStore(pFileState), pKey, &pBuff, pVLen);
338✔
359
  if ((*pWinCode) != TSDB_CODE_SUCCESS) {
338!
UNCOV
360
    goto _end;
×
361
  }
362
  memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
338✔
363
  memcpy(pNewPos->pRowBuff, pBuff, *pVLen);
338✔
364
  taosMemoryFreeClear(pBuff);
338!
365
  (*pVal) = pNewPos;
338✔
366

367
_end:
338✔
368
  if (code != TSDB_CODE_SUCCESS) {
338!
UNCOV
369
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
370
  }
371
  return code;
338✔
372
}
373

374
int32_t deleteSessionWinStateBuffFn(void* pBuff, const void* key, size_t keyLen) {
2,084✔
375
  SSHashObj*   pSessionBuff = (SSHashObj*)pBuff;
2,084✔
376
  SSessionKey* pWinKey = (SSessionKey*)key;
2,084✔
377
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
2,084✔
378
  if (!ppBuff) {
2,085✔
379
    return TSDB_CODE_SUCCESS;
509✔
380
  }
381
  SArray* pWinStates = (SArray*)(*ppBuff);
1,576✔
382
  int32_t size = taosArrayGetSize(pWinStates);
1,576✔
383
  TSKEY   gap = 0;
1,577✔
384
  int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
1,577✔
385
  if (index >= 0) {
1,574✔
386
    SRowBuffPos* pPos = taosArrayGetP(pWinStates, index);
1,089✔
387
    if (inSessionWindow(pPos->pKey, pWinKey->win.skey, gap)) {
1,091!
388
      pPos->beFlushed = true;
1,090✔
389
      taosArrayRemove(pWinStates, index);
1,090✔
390
    }
391
  }
392
  return TSDB_CODE_SUCCESS;
1,575✔
393
}
394

395
void deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) {
408✔
396
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
408✔
397
  SSessionKey* pWinKey = (SSessionKey*)pPos->pKey;
408✔
398
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
408✔
399
  if (!ppBuff) {
408!
UNCOV
400
    return;
×
401
  }
402
  SArray* pWinStates = (SArray*)(*ppBuff);
408✔
403
  int32_t size = taosArrayGetSize(pWinStates);
408✔
404
  TSKEY   gap = 0;
408✔
405
  int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
408✔
406
  if (index >= 0) {
408✔
407
    SRowBuffPos* pItemPos = taosArrayGetP(pWinStates, index);
87✔
408
    if (pItemPos == pPos) {
87✔
409
      taosArrayRemove(pWinStates, index);
82✔
410
    }
411
  }
412
}
413

414
int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStreamStateCur* pCur,
444✔
415
                                           const SSessionKey* pWinKey, void** ppVal, int32_t* pVLen) {
416
  int32_t      code = TSDB_CODE_SUCCESS;
444✔
417
  int32_t      lino = 0;
444✔
418
  SRowBuffPos* pNewPos = NULL;
444✔
419
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
444✔
420
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
444✔
421
  SArray*      pWinStates = NULL;
451✔
422
  if (!ppBuff) {
451✔
423
    pWinStates = taosArrayInit(16, POINTER_BYTES);
92✔
424
    if (!pWinStates) {
92!
UNCOV
425
      code = terrno;
×
UNCOV
426
      QUERY_CHECK_CODE(code, lino, _end);
×
427
    }
428

429
    code = tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
92✔
430
    QUERY_CHECK_CODE(code, lino, _end);
92!
431
  } else {
432
    pWinStates = (SArray*)(*ppBuff);
359✔
433
  }
434
  if (!pCur) {
451✔
435
    code = addNewSessionWindow(pFileState, pWinStates, pWinKey, &pNewPos);
94✔
436
    QUERY_CHECK_CODE(code, lino, _end);
94!
437

438
    goto _end;
94✔
439
  }
440

441
  int32_t size = taosArrayGetSize(pWinStates);
357✔
442
  if (pCur->buffIndex >= 0) {
353✔
443
    if (pCur->buffIndex >= size) {
341✔
444
      code = addNewSessionWindow(pFileState, pWinStates, pWinKey, &pNewPos);
302✔
445
      QUERY_CHECK_CODE(code, lino, _end);
306!
446

447
      goto _end;
306✔
448
    }
449
    code = insertNewSessionWindow(pFileState, pWinStates, pWinKey, pCur->buffIndex, &pNewPos);
39✔
450
    QUERY_CHECK_CODE(code, lino, _end);
39!
451

452
    goto _end;
39✔
453
  } else {
454
    if (size > 0) {
12✔
455
      SRowBuffPos* pPos = taosArrayGetP(pWinStates, 0);
10✔
456
      if (sessionWinKeyCmpr(pWinKey, pPos->pKey) >= 0) {
10!
457
        // pCur is invalid
458
        SSessionKey pTmpKey = *pWinKey;
×
459
        int32_t     winCode = TSDB_CODE_SUCCESS;
×
460
        code = getSessionWinResultBuff(pFileState, &pTmpKey, 0, (void**)&pNewPos, pVLen, &winCode);
×
461
        QUERY_CHECK_CONDITION((winCode == TSDB_CODE_FAILED), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
UNCOV
462
        QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
463
        goto _end;
×
464
      }
465
    }
466
    pNewPos = getNewRowPosForWrite(pFileState);
12✔
467
    if (!pNewPos || !pNewPos->pRowBuff) {
12!
UNCOV
468
      code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
469
      QUERY_CHECK_CODE(code, lino, _end);
×
470
    }
471

472
    memcpy(pNewPos->pKey, pWinKey, sizeof(SSessionKey));
12✔
473
    pNewPos->needFree = true;
12✔
474
    pNewPos->beFlushed = true;
12✔
475
  }
476

477
_end:
451✔
478
  (*ppVal) = pNewPos;
451✔
479
  if (code != TSDB_CODE_SUCCESS) {
451!
UNCOV
480
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
481
  }
482
  return code;
451✔
483
}
484

485
void sessionWinStateClear(SStreamFileState* pFileState) {
510✔
486
  int32_t buffSize = getRowStateRowSize(pFileState);
510✔
487
  void*   pIte = NULL;
510✔
488
  size_t  keyLen = 0;
510✔
489
  int32_t iter = 0;
510✔
490
  void*   pBuff = getRowStateBuff(pFileState);
510✔
491
  while ((pIte = tSimpleHashIterate(pBuff, pIte, &iter)) != NULL) {
2,159✔
492
    SArray* pWinStates = *((void**)pIte);
1,649✔
493
    int32_t size = taosArrayGetSize(pWinStates);
1,649✔
494
    for (int32_t i = 0; i < size; i++) {
3,526✔
495
      SRowBuffPos* pPos = taosArrayGetP(pWinStates, i);
1,877✔
496
      memset(pPos->pRowBuff, 0, buffSize);
1,877✔
497
    }
498
  }
499
}
510✔
500

501
void freeArrayPtr(void* ptr) {
3,530✔
502
  SArray* pArray = *(void**)ptr;
3,530✔
503
  taosArrayDestroy(pArray);
3,530✔
504
}
3,530✔
505

506
void sessionWinStateCleanup(void* pBuff) {
7,986✔
507
  if (pBuff == NULL) {
7,986✔
508
    return;
5,797✔
509
  }
510
  tSimpleHashSetFreeFp(pBuff, freeArrayPtr);
2,189✔
511
  tSimpleHashCleanup(pBuff);
2,189✔
512
}
513

514
static SStreamStateCur* seekKeyCurrentPrev_buff(SStreamFileState* pFileState, const SSessionKey* pWinKey,
14,143✔
515
                                                SArray** pWins, int32_t* pIndex) {
516
  SStreamStateCur* pCur = NULL;
14,143✔
517
  SSHashObj*       pSessionBuff = getRowStateBuff(pFileState);
14,143✔
518
  void**           ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
14,144✔
519
  if (!ppBuff) {
14,162✔
520
    return NULL;
1,147✔
521
  }
522

523
  SArray* pWinStates = (SArray*)(*ppBuff);
13,015✔
524
  int32_t size = taosArrayGetSize(pWinStates);
13,015✔
525
  TSKEY   gap = 0;
13,014✔
526
  int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
13,014✔
527

528
  if (pWins) {
13,006✔
529
    (*pWins) = pWinStates;
8,124✔
530
  }
531

532
  if (size > 0 && index == -1) {
13,006✔
533
    SRowBuffPos* pPos = taosArrayGetP(pWinStates, 0);
1,844✔
534
    SSessionKey* pWin = (SSessionKey*)pPos->pKey;
1,844✔
535
    if (pWinKey->win.skey == pWin->win.skey) {
1,844✔
536
      index = 0;
289✔
537
    }
538
  }
539

540
  if (index >= 0) {
13,006✔
541
    pCur = createStateCursor(pFileState);
10,206✔
542
    if (pCur == NULL) {
10,205!
UNCOV
543
      return NULL;
×
544
    }
545
    pCur->buffIndex = index;
10,205✔
546
    if (pIndex) {
10,205✔
547
      *pIndex = index;
6,844✔
548
    }
549
  }
550

551
  return pCur;
13,005✔
552
}
553

554
SStreamStateCur* sessionWinStateSeekKeyCurrentPrev(SStreamFileState* pFileState, const SSessionKey* pWinKey) {
5,192✔
555
  SStreamStateCur* pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, NULL, NULL);
5,192✔
556
  if (pCur) {
5,197✔
557
    return pCur;
3,358✔
558
  }
559

560
  void* pFileStore = getStateFileStore(pFileState);
1,839✔
561
  pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pFileStore, pWinKey);
1,841✔
562
  if (!pCur) {
1,841✔
563
    return NULL;
767✔
564
  }
565
  pCur->buffIndex = -1;
1,074✔
566
  pCur->pStreamFileState = pFileState;
1,074✔
567
  return pCur;
1,074✔
568
}
569

570
SStreamStateCur* sessionWinStateSeekKeyPrev(SStreamFileState *pFileState, const SSessionKey *pWinKey) {
×
571
  SArray*          pWinStates = NULL;
×
572
  int32_t          index = -1;
×
573
  SStreamStateCur *pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index);
×
574
  if (pCur) {
×
575
    int32_t cmpRes= sessionStateRangeKeyCompare(pWinKey, pWinStates, index);
×
576
    if (cmpRes > 0) {
×
UNCOV
577
      return pCur;
×
578
    } else if (cmpRes == 0 && index > 0) {
×
579
      sessionWinStateMoveToPrev(pCur);
×
UNCOV
580
      return pCur;
×
581
    }
582
    streamStateFreeCur(pCur);
×
583
    pCur = NULL;
×
584
  }
585

UNCOV
586
  void* pFileStore = getStateFileStore(pFileState);
×
587
  pCur = streamStateSessionSeekKeyPrev_rocksdb(pFileStore, pWinKey);
×
588
  if (!pCur) {
×
589
    return NULL;
×
590
  }
UNCOV
591
  pCur->buffIndex = -1;
×
UNCOV
592
  pCur->pStreamFileState = pFileState;
×
UNCOV
593
  return pCur;
×
594
}
595

596
static void transformCursor(SStreamFileState* pFileState, SStreamStateCur* pCur) {
520✔
597
  if (!pCur) {
520!
UNCOV
598
    return;
×
599
  }
600
  streamStateResetCur(pCur);
520✔
601
  pCur->buffIndex = 0;
520✔
602
  pCur->pStreamFileState = pFileState;
520✔
603
}
604

605
static void checkAndTransformCursor(SStreamFileState* pFileState, const uint64_t groupId, SArray* pWinStates,
1,894✔
606
                                    SStreamStateCur** ppCur) {
607
  SSessionKey key = {.groupId = groupId};
1,894✔
608
  int32_t     code = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pFileState), *ppCur, &key, NULL, NULL);
1,894✔
609
  if (taosArrayGetSize(pWinStates) > 0 &&
1,894✔
610
      (code == TSDB_CODE_FAILED || sessionStateKeyCompare(&key, pWinStates, 0) >= 0)) {
148!
611
    if (!(*ppCur)) {
482✔
612
      (*ppCur) = createStateCursor(pFileState);
481✔
613
    }
614
    transformCursor(pFileState, *ppCur);
482✔
615
  } else if (*ppCur) {
1,412✔
616
    (*ppCur)->buffIndex = -1;
597✔
617
    (*ppCur)->pStreamFileState = pFileState;
597✔
618
  }
619
}
1,894✔
620

621
SStreamStateCur* sessionWinStateSeekKeyCurrentNext(SStreamFileState* pFileState, const SSessionKey* pWinKey) {
321✔
622
  SArray*          pWinStates = NULL;
321✔
623
  int32_t          index = -1;
321✔
624
  SStreamStateCur* pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index);
321✔
625
  if (pCur) {
321✔
626
    if (sessionStateRangeKeyCompare(pWinKey, pWinStates, index) > 0) {
163✔
627
      sessionWinStateMoveToNext(pCur);
27✔
628
    }
629
    return pCur;
163✔
630
  }
631

632
  void* pFileStore = getStateFileStore(pFileState);
158✔
633
  pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pFileStore, (SSessionKey*)pWinKey);
158✔
634
  checkAndTransformCursor(pFileState, pWinKey->groupId, pWinStates, &pCur);
158✔
635
  return pCur;
158✔
636
}
637

638
SStreamStateCur* sessionWinStateSeekKeyNext(SStreamFileState* pFileState, const SSessionKey* pWinKey) {
8,257✔
639
  SArray*          pWinStates = NULL;
8,257✔
640
  int32_t          index = -1;
8,257✔
641
  SStreamStateCur* pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index);
8,257✔
642
  if (pCur) {
8,261✔
643
    sessionWinStateMoveToNext(pCur);
6,525✔
644
    return pCur;
6,520✔
645
  }
646

647
  void* pFileStore = getStateFileStore(pFileState);
1,736✔
648
  pCur = streamStateSessionSeekKeyNext_rocksdb(pFileStore, pWinKey);
1,736✔
649
  checkAndTransformCursor(pFileState, pWinKey->groupId, pWinStates, &pCur);
1,736✔
650
  return pCur;
1,736✔
651
}
652

653
SStreamStateCur* countWinStateSeekKeyPrev(SStreamFileState* pFileState, const SSessionKey* pWinKey, COUNT_TYPE count) {
374✔
654
  SArray*          pWinStates = NULL;
374✔
655
  int32_t          index = -1;
374✔
656
  SStreamStateCur* pBuffCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index);
374✔
657
  int32_t          resSize = getRowStateRowSize(pFileState);
374✔
658
  COUNT_TYPE       winCount = 0;
374✔
659
  if (pBuffCur) {
374✔
660
    while (index >= 0) {
331✔
661
      SRowBuffPos* pPos = taosArrayGetP(pWinStates, index);
206✔
662
      winCount = *((COUNT_TYPE*)((char*)pPos->pRowBuff + (resSize - sizeof(COUNT_TYPE))));
206✔
663
      if (sessionStateRangeKeyCompare(pWinKey, pWinStates, index) == 0 || winCount < count) {
206✔
664
        index--;
174✔
665
      } else if (index >= 0) {
32!
666
        pBuffCur->buffIndex = index + 1;
32✔
667
        return pBuffCur;
32✔
668
      }
669
    }
670
    pBuffCur->buffIndex = 0;
125✔
671
  } else if (taosArrayGetSize(pWinStates) > 0) {
217!
672
    pBuffCur = createStateCursor(pFileState);
×
UNCOV
673
    if (pBuffCur == NULL) {
×
UNCOV
674
      return NULL;
×
675
    }
UNCOV
676
    pBuffCur->buffIndex = 0;
×
677
  }
678

679
  void*            pFileStore = getStateFileStore(pFileState);
342✔
680
  SStreamStateCur* pCur = streamStateSessionSeekKeyPrev_rocksdb(pFileStore, pWinKey);
342✔
681
  if (pCur) {
342!
682
    pCur->pStreamFileState = pFileState;
×
683
    SSessionKey key = {0};
×
684
    void*       pVal = NULL;
×
685
    int         len = 0;
×
UNCOV
686
    int32_t     code = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, &key, &pVal, &len);
×
687
    if (code == TSDB_CODE_FAILED) {
×
688
      streamStateFreeCur(pCur);
×
689
      return pBuffCur;
×
690
    }
691
    winCount = *((COUNT_TYPE*)((char*)pVal + (resSize - sizeof(COUNT_TYPE))));
×
692
    taosMemoryFreeClear(pVal);
×
UNCOV
693
    streamStateFreeCur(pBuffCur);
×
694
    if (sessionRangeKeyCmpr(pWinKey, &key) != 0 && winCount == count) {
×
UNCOV
695
      streamStateCurNext(pFileStore, pCur);
×
696
      return pCur;
×
697
    }
698
    streamStateCurPrev(pFileStore, pCur);
×
699
    while (1) {
UNCOV
700
      code = streamStateSessionGetKVByCur_rocksdb(NULL, pCur, &key, &pVal, &len);
×
701
      if (code == TSDB_CODE_FAILED) {
×
702
        streamStateCurNext(pFileStore, pCur);
×
703
        return pCur;
×
704
      }
UNCOV
705
      winCount = *((COUNT_TYPE*)((char*)pVal + (resSize - sizeof(COUNT_TYPE))));
×
706
      taosMemoryFreeClear(pVal);
×
707
      if (sessionRangeKeyCmpr(pWinKey, &key) == 0 || winCount < count) {
×
UNCOV
708
        streamStateCurPrev(pFileStore, pCur);
×
709
      } else {
UNCOV
710
        streamStateCurNext(pFileStore, pCur);
×
UNCOV
711
        return pCur;
×
712
      }
713
    }
714
  }
715
  return pBuffCur;
342✔
716
}
717

718
int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
15,058✔
719
  if (!pCur) {
15,058✔
720
    return TSDB_CODE_FAILED;
1,060✔
721
  }
722
  int32_t code = TSDB_CODE_SUCCESS;
13,998✔
723

724
  SSHashObj* pSessionBuff = getRowStateBuff(pCur->pStreamFileState);
13,998✔
725
  void**     ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t));
13,996✔
726
  SArray*    pWinStates = NULL;
14,013✔
727
  if (ppBuff) {
14,013✔
728
    pWinStates = (SArray*)(*ppBuff);
13,596✔
729
  }
730

731
  if (pCur->buffIndex >= 0) {
14,013✔
732
    int32_t size = taosArrayGetSize(pWinStates);
12,216✔
733
    if (pCur->buffIndex >= size) {
12,214✔
734
      return TSDB_CODE_FAILED;
7,057✔
735
    }
736
    SRowBuffPos* pPos = taosArrayGetP(pWinStates, pCur->buffIndex);
5,157✔
737
    if (pVal) {
5,153✔
738
      *pVal = pPos;
1,173✔
739
    }
740
    *pKey = *(SSessionKey*)(pPos->pKey);
5,153✔
741
  } else {
742
    void* pData = NULL;
1,797✔
743
    code = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pCur->pStreamFileState), pCur, pKey, &pData, pVLen);
1,797✔
744
    if (taosArrayGetSize(pWinStates) > 0 &&
1,797✔
745
        (code == TSDB_CODE_FAILED || sessionStateRangeKeyCompare(pKey, pWinStates, 0) >= 0)) {
887!
746
      transformCursor(pCur->pStreamFileState, pCur);
38✔
747
      SRowBuffPos* pPos = taosArrayGetP(pWinStates, pCur->buffIndex);
38✔
748
      if (pVal) {
38!
UNCOV
749
        *pVal = pPos;
×
750
      }
751
      *pKey = *(SSessionKey*)(pPos->pKey);
38✔
752
      code = TSDB_CODE_SUCCESS;
38✔
753
    } else if (code == TSDB_CODE_SUCCESS && pVal) {
1,759✔
754
      SRowBuffPos* pNewPos = getNewRowPosForWrite(pCur->pStreamFileState);
196✔
755
      if (!pNewPos || !pNewPos->pRowBuff) {
196!
UNCOV
756
        code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
757
        taosMemoryFreeClear(pData);
×
UNCOV
758
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
759
        return code;
×
760
      }
761
      memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
196✔
762
      pNewPos->needFree = true;
196✔
763
      pNewPos->beFlushed = true;
196✔
764
      memcpy(pNewPos->pRowBuff, pData, *pVLen);
196✔
765
      (*pVal) = pNewPos;
196✔
766
    }
767
    taosMemoryFreeClear(pData);
1,797!
768
  }
769
  return code;
6,950✔
770
}
771

772
void sessionWinStateMoveToNext(SStreamStateCur* pCur) {
11,772✔
773
  qTrace("move cursor to next");
11,772✔
774
  if (pCur && pCur->buffIndex >= 0) {
11,772✔
775
    pCur->buffIndex++;
8,089✔
776
  } else {
777
    streamStateCurNext_rocksdb(pCur);
3,683✔
778
  }
779
}
11,777✔
780

UNCOV
781
void sessionWinStateMoveToPrev(SStreamStateCur* pCur) {
×
782
  qTrace("move cursor to prev");
×
UNCOV
783
  if (pCur && pCur->buffIndex >= 1) {
×
784
    pCur->buffIndex--;
×
785
  } else {
UNCOV
786
    streamStateCurPrev_rocksdb(pCur);
×
787
  }
UNCOV
788
}
×
789

790
int32_t sessionWinStateGetKeyByRange(SStreamFileState* pFileState, const SSessionKey* key, SSessionKey* curKey,
4,625✔
791
                                     range_cmpr_fn cmpFn) {
792
  SStreamStateCur* pCur = sessionWinStateSeekKeyCurrentPrev(pFileState, key);
4,625✔
793
  SSessionKey      tmpKey = *key;
4,633✔
794
  int32_t          code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL);
4,633✔
795
  bool             hasCurrentPrev = true;
4,631✔
796
  if (code == TSDB_CODE_FAILED) {
4,631✔
797
    streamStateFreeCur(pCur);
849✔
798
    pCur = sessionWinStateSeekKeyNext(pFileState, key);
849✔
799
    code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL);
849✔
800
    hasCurrentPrev = false;
849✔
801
  }
802

803
  if (code == TSDB_CODE_FAILED) {
4,631✔
804
    code = TSDB_CODE_FAILED;
606✔
805
    goto _end;
606✔
806
  }
807

808
  if (cmpFn(key, &tmpKey) == 0) {
4,025✔
809
    *curKey = tmpKey;
2,902✔
810
    goto _end;
2,902✔
811
  } else if (!hasCurrentPrev) {
1,123✔
812
    code = TSDB_CODE_FAILED;
222✔
813
    goto _end;
222✔
814
  }
815

816
  sessionWinStateMoveToNext(pCur);
901✔
817
  code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL);
901✔
818
  if (code == TSDB_CODE_SUCCESS && cmpFn(key, &tmpKey) == 0) {
902✔
819
    *curKey = tmpKey;
398✔
820
  } else {
821
    code = TSDB_CODE_FAILED;
504✔
822
  }
823

824
_end:
4,632✔
825
  streamStateFreeCur(pCur);
4,632✔
826
  return code;
4,629✔
827
}
828

829
int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
3,287✔
830
                              state_key_cmpr_fn fn, void** pVal, int32_t* pVLen, int32_t* pWinCode) {
831
  int32_t code = TSDB_CODE_SUCCESS;
3,287✔
832
  int32_t lino = 0;
3,287✔
833
  (*pWinCode) = TSDB_CODE_SUCCESS;
3,287✔
834

835
  SSessionKey* pWinKey = key;
3,287✔
836
  TSKEY        gap = 0;
3,287✔
837
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
3,287✔
838
  SArray*      pWinStates = NULL;
3,288✔
839

840
  void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
3,288✔
841
  if (ppBuff) {
3,288✔
842
    pWinStates = (SArray*)(*ppBuff);
2,856✔
843
  } else {
844
    pWinStates = taosArrayInit(16, POINTER_BYTES);
432✔
845
    if (!pWinStates) {
432!
UNCOV
846
      code = terrno;
×
UNCOV
847
      QUERY_CHECK_CODE(code, lino, _end);
×
848
    }
849

850
    code = tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
432✔
851
    QUERY_CHECK_CODE(code, lino, _end);
432!
852
  }
853

854
  TSKEY startTs = pWinKey->win.skey;
3,288✔
855
  TSKEY endTs = pWinKey->win.ekey;
3,288✔
856

857
  int32_t size = taosArrayGetSize(pWinStates);
3,288✔
858
  if (size == 0) {
3,288✔
859
    void*   pFileStore = getStateFileStore(pFileState);
740✔
860
    void*   p = NULL;
740✔
861
    int32_t code_file = streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, pVLen);
740✔
862
    if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
740✔
863
      (*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen);
25✔
864
      if (!(*pVal)) {
25!
UNCOV
865
        code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
866
        QUERY_CHECK_CODE(code, lino, _end);
×
867
      }
868

869
      (*pWinCode) = code_file;
25✔
870
      qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
25✔
871
             pWinKey->win.ekey, code_file);
872
    } else {
873
      code = addNewSessionWindow(pFileState, pWinStates, key, (SRowBuffPos**)pVal);
715✔
874
      (*pWinCode) = TSDB_CODE_FAILED;
715✔
875
      taosMemoryFree(p);
715!
876
      QUERY_CHECK_CODE(code, lino, _end);
715!
877
    }
878
    goto _end;
740✔
879
  }
880

881
  // find the first position which is smaller than the pWinKey
882
  int32_t      index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
2,548✔
883
  SRowBuffPos* pPos = NULL;
2,548✔
884
  int32_t      valSize = *pVLen;
2,548✔
885

886
  if (index >= 0) {
2,548✔
887
    pPos = taosArrayGetP(pWinStates, index);
2,436✔
888
    void* stateKey = (char*)(pPos->pRowBuff) + (valSize - keyDataLen);
2,436✔
889
    if (inSessionWindow(pPos->pKey, startTs, gap) || fn(pKeyData, stateKey) == true) {
2,436✔
890
      (*pVal) = pPos;
1,956✔
891
      SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
1,956✔
892
      pPos->beUsed = true;
1,956✔
893
      pPos->beFlushed = false;
1,956✔
894
      *key = *pDestWinKey;
1,956✔
895
      goto _end;
1,956✔
896
    }
897
  }
898

899
  if (index + 1 < size) {
592✔
900
    pPos = taosArrayGetP(pWinStates, index + 1);
141✔
901
    void* stateKey = (char*)(pPos->pRowBuff) + (valSize - keyDataLen);
141✔
902
    if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap)) ||
280!
903
        fn(pKeyData, stateKey) == true) {
139✔
904
      (*pVal) = pPos;
7✔
905
      SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
7✔
906
      pPos->beUsed = true;
7✔
907
      pPos->beFlushed = false;
7✔
908
      *key = *pDestWinKey;
7✔
909
      goto _end;
7✔
910
    }
911
  }
912

913
  if (index + 1 == 0) {
585✔
914
    if (!isDeteled(pFileState, endTs)) {
107!
915
      void*   p = NULL;
107✔
916
      void*   pFileStore = getStateFileStore(pFileState);
107✔
917
      int32_t code_file =
918
          streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, pVLen);
107✔
919
      if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
107✔
920
        (*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen);
98✔
921
        if (!(*pVal)) {
98!
UNCOV
922
          code = TSDB_CODE_OUT_OF_MEMORY;
×
923
          QUERY_CHECK_CODE(code, lino, _end);
98!
924
        }
925

926
        (*pWinCode) = code_file;
98✔
927
        qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
98✔
928
               pWinKey->win.ekey, code_file);
929
        goto _end;
98✔
930
      } else {
931
        taosMemoryFree(p);
9!
932
      }
933
    }
934
  }
935

936
  if (index == size - 1) {
487✔
937
    code = addNewSessionWindow(pFileState, pWinStates, key, (SRowBuffPos**)pVal);
451✔
938
    QUERY_CHECK_CODE(code, lino, _end);
451!
939

940
    (*pWinCode) = TSDB_CODE_FAILED;
451✔
941
    goto _end;
451✔
942
  }
943
  code = insertNewSessionWindow(pFileState, pWinStates, key, index + 1, (SRowBuffPos**)pVal);
36✔
944
  QUERY_CHECK_CODE(code, lino, _end);
36!
945

946
  (*pWinCode) = TSDB_CODE_FAILED;
36✔
947

948
_end:
3,288✔
949
  if (code != TSDB_CODE_SUCCESS) {
3,288!
UNCOV
950
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
951
  }
952
  return code;
3,288✔
953
}
954

955
int32_t getCountWinStateFromDisc(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
1,548✔
956
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, pKey);
1,548✔
957
  int32_t          code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, pKey, pVal, pVLen);
1,545✔
958
  streamStateFreeCur(pCur);
1,547✔
959
  if (code == TSDB_CODE_SUCCESS) {
1,547!
UNCOV
960
    return code;
×
961
  } else {
962
    pCur = streamStateSessionSeekKeyPrev_rocksdb(pState, pKey);
1,547✔
963
  }
964

965
  code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, pKey, pVal, pVLen);
1,548✔
966
  streamStateFreeCur(pCur);
1,548✔
967
  return code;
1,548✔
968
}
969

970
int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal,
5,011✔
971
                              int32_t* pVLen, int32_t* pWinCount) {
972
  int32_t code = TSDB_CODE_SUCCESS;
5,011✔
973
  int32_t lino = 0;
5,011✔
974
  (*pWinCount) = TSDB_CODE_SUCCESS;
5,011✔
975

976
  SSessionKey* pWinKey = pKey;
5,011✔
977
  const TSKEY  gap = 0;
5,011✔
978
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
5,011✔
979
  SArray*      pWinStates = NULL;
5,010✔
980
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
5,010✔
981
  if (ppBuff) {
5,011✔
982
    pWinStates = (SArray*)(*ppBuff);
3,692✔
983
  } else {
984
    pWinStates = taosArrayInit(16, POINTER_BYTES);
1,319✔
985
    if (!pWinStates) {
1,318!
UNCOV
986
      code = terrno;
×
UNCOV
987
      QUERY_CHECK_CODE(code, lino, _end);
×
988
    }
989

990
    code = tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
1,318✔
991
    QUERY_CHECK_CODE(code, lino, _end);
1,319!
992
  }
993

994
  TSKEY startTs = pWinKey->win.skey;
5,011✔
995
  TSKEY endTs = pWinKey->win.ekey;
5,011✔
996

997
  int32_t size = taosArrayGetSize(pWinStates);
5,011✔
998
  if (size == 0) {
5,011✔
999
    void* pFileStore = getStateFileStore(pFileState);
1,331✔
1000
    void* pRockVal = NULL;
1,331✔
1001
    (*pWinCount) = getCountWinStateFromDisc(pFileStore, pWinKey, &pRockVal, pVLen);
1,331✔
1002
    if ((*pWinCount) == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
1,331!
1003
      qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
61✔
1004
             pWinKey->win.ekey, (*pWinCount));
1005
      if ((*pWinCount) == TSDB_CODE_SUCCESS) {
61!
1006
        int32_t     valSize = *pVLen;
61✔
1007
        COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)((char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)));
61✔
1008
        if (inSessionWindow(pWinKey, startTs, gap) || (*pWinStateCout) < winCount) {
61!
1009
          (*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
52✔
1010
          if (!(*pVal)) {
52!
UNCOV
1011
            code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
1012
            QUERY_CHECK_CODE(code, lino, _end);
×
1013
          }
1014

1015
          goto _end;
52✔
1016
        }
1017
      }
1018
      pWinKey->win.skey = startTs;
9✔
1019
      pWinKey->win.ekey = endTs;
9✔
1020
      (*pVal) = createSessionWinBuff(pFileState, pWinKey, NULL, NULL);
9✔
1021
      taosMemoryFree(pRockVal);
9!
1022
      if (!(*pVal)) {
9!
UNCOV
1023
        code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
1024
        QUERY_CHECK_CODE(code, lino, _end);
×
1025
      }
1026
    } else {
1027
      code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
1,270✔
1028
      QUERY_CHECK_CODE(code, lino, _end);
1,270!
1029

1030
      (*pWinCount) = TSDB_CODE_FAILED;
1,270✔
1031
    }
1032
    goto _end;
1,279✔
1033
  }
1034

1035
  // find the first position which is smaller than the pWinKey
1036
  int32_t      index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
3,680✔
1037
  SRowBuffPos* pPos = NULL;
3,678✔
1038
  int32_t      valSize = *pVLen;
3,678✔
1039

1040
  if (index >= 0) {
3,678✔
1041
    pPos = taosArrayGetP(pWinStates, index);
3,668✔
1042
    COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)((char*)(pPos->pRowBuff) + (valSize - sizeof(COUNT_TYPE)));
3,668✔
1043
    if (inSessionWindow(pPos->pKey, startTs, gap) || (index == size - 1 && (*pWinStateCout) < winCount)) {
3,668✔
1044
      (*pVal) = pPos;
2,896✔
1045
      SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
2,896✔
1046
      pPos->beUsed = true;
2,896✔
1047
      pPos->beFlushed = false;
2,896✔
1048
      *pWinKey = *pDestWinKey;
2,896✔
1049
      goto _end;
2,896✔
1050
    }
1051
  }
1052

1053
  if (index == -1) {
782✔
1054
    if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs, 0)) {
10!
1055
      SSessionKey tmpKey = *pWinKey;
×
1056
      void*       pRockVal = NULL;
×
1057
      void*       pFileStore = getStateFileStore(pFileState);
×
1058
      int32_t     code_file = getCountWinStateFromDisc(pFileStore, &tmpKey, &pRockVal, pVLen);
×
1059
      if (code_file == TSDB_CODE_SUCCESS) {
×
1060
        SRowBuffPos* pFirstPos = taosArrayGetP(pWinStates, 0);
×
1061
        SSessionKey* pFirstWinKey = (SSessionKey*)pFirstPos->pKey;
×
1062
        if (tmpKey.win.ekey < pFirstWinKey->win.skey) {
×
1063
          *pWinKey = tmpKey;
×
UNCOV
1064
          (*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
×
UNCOV
1065
          if (!(*pVal)) {
×
1066
            code = TSDB_CODE_OUT_OF_MEMORY;
×
1067
            QUERY_CHECK_CODE(code, lino, _end);
×
1068
          }
1069

UNCOV
1070
          (*pWinCount) = code_file;
×
UNCOV
1071
          qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
×
1072
                 pWinKey->win.ekey, code_file);
UNCOV
1073
          goto _end;
×
1074
        }
1075
      }
UNCOV
1076
      taosMemoryFree(pRockVal);
×
1077
    }
1078
  }
1079

1080
  if (index + 1 < size) {
783✔
1081
    pPos = taosArrayGetP(pWinStates, index + 1);
51✔
1082
    (*pVal) = pPos;
51✔
1083
    SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
51✔
1084
    pPos->beUsed = true;
51✔
1085
    pPos->beFlushed = false;
51✔
1086
    *pWinKey = *pDestWinKey;
51✔
1087
    goto _end;
51✔
1088
  }
1089

1090
  code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
732✔
1091
  QUERY_CHECK_CODE(code, lino, _end);
732!
1092

1093
  (*pWinCount) = TSDB_CODE_FAILED;
732✔
1094

1095
_end:
5,010✔
1096
  if (code != TSDB_CODE_SUCCESS) {
5,010!
UNCOV
1097
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1098
  }
1099
  return code;
5,010✔
1100
}
1101

1102
int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal,
256✔
1103
                                 int32_t* pVLen) {
1104
  SSessionKey* pWinKey = pKey;
256✔
1105
  const TSKEY  gap = 0;
256✔
1106
  int32_t      code = TSDB_CODE_SUCCESS;
256✔
1107
  int32_t      lino = 0;
256✔
1108
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
256✔
1109
  SArray*      pWinStates = NULL;
256✔
1110
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
256✔
1111
  if (ppBuff) {
256✔
1112
    pWinStates = (SArray*)(*ppBuff);
46✔
1113
  } else {
1114
    pWinStates = taosArrayInit(16, POINTER_BYTES);
210✔
1115
    code = tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
210✔
1116
    QUERY_CHECK_CODE(code, lino, _end);
210!
1117
  }
1118

1119
  TSKEY startTs = pWinKey->win.skey;
256✔
1120
  TSKEY endTs = pWinKey->win.ekey;
256✔
1121

1122
  int32_t size = taosArrayGetSize(pWinStates);
256✔
1123
  if (size == 0) {
256✔
1124
    void* pFileStore = getStateFileStore(pFileState);
217✔
1125
    void* pRockVal = NULL;
217✔
1126

1127
    int32_t code_file = getCountWinStateFromDisc(pFileStore, pWinKey, &pRockVal, pVLen);
217✔
1128
    if (code_file == TSDB_CODE_SUCCESS && isFlushedState(pFileState, endTs, 0)) {
217!
1129
      int32_t     valSize = *pVLen;
×
UNCOV
1130
      COUNT_TYPE* pWinStateCount = (COUNT_TYPE*)((char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)));
×
1131
      if ((*pWinStateCount) == winCount) {
×
1132
        code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
×
1133
        QUERY_CHECK_CODE(code, lino, _end);
×
1134
      } else {
UNCOV
1135
        (*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
×
1136
        if (!(*pVal)) {
×
UNCOV
1137
          code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
1138
          QUERY_CHECK_CODE(code, lino, _end);
×
1139
        }
UNCOV
1140
        qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
×
1141
               pWinKey->win.ekey, code_file);
1142
      }
1143
    } else {
1144
      code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
217✔
1145
      taosMemoryFree(pRockVal);
217!
1146
      QUERY_CHECK_CODE(code, lino, _end);
217!
1147
    }
1148
  } else {
1149
    code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
39✔
1150
    QUERY_CHECK_CODE(code, lino, _end);
39!
1151
  }
1152

1153
_end:
39✔
1154
  if (code != TSDB_CODE_SUCCESS) {
256!
UNCOV
1155
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1156
  }
1157
  return code;
256✔
1158
}
1159

UNCOV
1160
static int compareRangeKey(const void* pKey, const void* data, int index) {
×
UNCOV
1161
  SScanRange* pRange1 = (SScanRange*) pKey;
×
UNCOV
1162
  SScanRange* pRange2 = taosArrayGet((SArray*)data, index);
×
UNCOV
1163
  if (pRange1->win.skey > pRange2->win.skey) {
×
UNCOV
1164
    return 1;
×
UNCOV
1165
  } else if (pRange1->win.skey < pRange2->win.skey) {
×
UNCOV
1166
    return -1;
×
1167
  }
1168

UNCOV
1169
  if (pRange1->win.ekey > pRange2->win.ekey) {
×
UNCOV
1170
    return 1;
×
UNCOV
1171
  } else if (pRange1->win.ekey < pRange2->win.ekey) {
×
UNCOV
1172
    return -1;
×
1173
  }
1174

UNCOV
1175
  return 0;
×
1176
}
1177

UNCOV
1178
static int32_t scanRangeKeyCmpr(const SScanRange* pRange1, const SScanRange* pRange2) {
×
UNCOV
1179
  if (pRange1->win.skey > pRange2->win.ekey) {
×
UNCOV
1180
    return 1;
×
UNCOV
1181
  } else if (pRange1->win.ekey < pRange2->win.skey) {
×
UNCOV
1182
    return -1;
×
1183
  }
1184

UNCOV
1185
  return 0;
×
1186
}
1187

UNCOV
1188
static int32_t putRangeIdInfo(SScanRange* pRangeKey, uint64_t gpId, uint64_t uId) {
×
UNCOV
1189
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1190
  int32_t lino = 0;
×
1191

UNCOV
1192
  code = tSimpleHashPut(pRangeKey->pUIds, &uId, sizeof(uint64_t), NULL, 0);
×
UNCOV
1193
  QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
1194
  code = tSimpleHashPut(pRangeKey->pGroupIds, &gpId, sizeof(uint64_t), NULL, 0);
×
UNCOV
1195
  QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
1196
_end:
×
UNCOV
1197
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1198
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1199
  }
UNCOV
1200
  return code;
×
1201
}
1202

UNCOV
1203
void mergeRangeKey(SScanRange* pRangeDest, SScanRange* pRangeSrc) {
×
UNCOV
1204
  pRangeDest->win.skey = TMIN(pRangeDest->win.skey, pRangeSrc->win.skey);
×
UNCOV
1205
  pRangeDest->win.ekey = TMAX(pRangeDest->win.ekey, pRangeSrc->win.ekey);
×
UNCOV
1206
  pRangeDest->calWin.skey = TMIN(pRangeDest->calWin.skey, pRangeSrc->calWin.skey);
×
UNCOV
1207
  pRangeDest->calWin.ekey = TMAX(pRangeDest->calWin.ekey, pRangeSrc->calWin.ekey);
×
UNCOV
1208
}
×
1209

UNCOV
1210
int32_t mergeScanRange(SArray* pRangeArray, SScanRange* pRangeKey, uint64_t gpId, uint64_t uId, int32_t* pIndex, bool* pRes, char* idStr) {
×
UNCOV
1211
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1212
  int32_t lino = 0;
×
UNCOV
1213
  int32_t size = taosArrayGetSize(pRangeArray);
×
UNCOV
1214
  int32_t index = binarySearch(pRangeArray, size, pRangeKey, compareRangeKey);
×
UNCOV
1215
  if (index >= 0) {
×
UNCOV
1216
    SScanRange* pFindRangeKey = (SScanRange*) taosArrayGet(pRangeArray, index);
×
UNCOV
1217
    if (scanRangeKeyCmpr(pFindRangeKey, pRangeKey) == 0) {
×
UNCOV
1218
      mergeRangeKey(pFindRangeKey, pRangeKey);
×
UNCOV
1219
      code = putRangeIdInfo(pFindRangeKey, gpId, uId);
×
UNCOV
1220
      QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
1221
      *pRes = true;
×
UNCOV
1222
      goto _end;
×
1223
    }
1224
  }
UNCOV
1225
  if (index + 1 < size) {
×
UNCOV
1226
    SScanRange* pFindRangeKey = (SScanRange*) taosArrayGet(pRangeArray, index + 1);
×
UNCOV
1227
    if (scanRangeKeyCmpr(pFindRangeKey, pRangeKey) == 0) {
×
UNCOV
1228
      mergeRangeKey(pFindRangeKey, pRangeKey);
×
UNCOV
1229
      code = putRangeIdInfo(pFindRangeKey, gpId, uId);
×
UNCOV
1230
      QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
1231
      *pRes = true;
×
UNCOV
1232
      goto _end;
×
1233
    }
1234
  }
UNCOV
1235
  (*pIndex) = index;
×
UNCOV
1236
  *pRes = false;
×
1237

UNCOV
1238
_end:
×
UNCOV
1239
  qDebug("===stream===%s mergeScanRange start ts:%" PRId64 ",end ts:%" PRId64 ",group id:%" PRIu64 ", uid:%" PRIu64
×
1240
         ", res:%d", idStr, pRangeKey->win.skey, pRangeKey->win.ekey, gpId, uId, *pRes);
UNCOV
1241
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1242
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1243
  }
UNCOV
1244
  return code;
×
1245
}
1246

UNCOV
1247
int32_t mergeAndSaveScanRange(STableTsDataState* pTsDataState, STimeWindow* pWin, uint64_t gpId, SRecDataInfo* pRecData, int32_t recLen) {
×
UNCOV
1248
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1249
  int32_t lino = 0;
×
1250

UNCOV
1251
  SArray* pRangeArray = pTsDataState->pScanRanges;
×
UNCOV
1252
  uint64_t uId = pRecData->tableUid;
×
1253

UNCOV
1254
  int32_t index = 0;
×
UNCOV
1255
  bool merge = false;
×
UNCOV
1256
  SScanRange rangeKey = {.win = *pWin, .calWin = pRecData->calWin, .pUIds = NULL, .pGroupIds = NULL};
×
UNCOV
1257
  code = mergeScanRange(pRangeArray, &rangeKey, gpId, uId, &index, &merge, pTsDataState->pState->pTaskIdStr);
×
UNCOV
1258
  QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
1259
  if (merge == true) {
×
UNCOV
1260
    goto _end;
×
1261
  }
1262

UNCOV
1263
  int32_t size = taosArrayGetSize(pRangeArray);
×
UNCOV
1264
  if (size > MAX_SCAN_RANGE_SIZE) {
×
UNCOV
1265
    SSessionKey sesKey = {.win = *pWin, .groupId = gpId};
×
UNCOV
1266
    void* pVal = NULL;
×
UNCOV
1267
    int32_t len = 0;
×
UNCOV
1268
    int32_t winCode = streamStateSessionGet_rocksdb(pTsDataState->pState, &sesKey, &pVal, &len);
×
UNCOV
1269
    if (winCode != TSDB_CODE_SUCCESS) {
×
UNCOV
1270
      code = streamStateSessionPut_rocksdb(pTsDataState->pState, &sesKey, &uId, sizeof(uint64_t));
×
1271
    } else {
UNCOV
1272
      char* pTempBuf = taosMemoryRealloc(pVal, len + sizeof(uint64_t));
×
UNCOV
1273
      QUERY_CHECK_NULL(pTempBuf, code, lino, _end, terrno);
×
UNCOV
1274
      memcpy(pTempBuf+len, &uId, sizeof(uint64_t));
×
UNCOV
1275
      code = streamStateSessionPut_rocksdb(pTsDataState->pState, &sesKey, pTempBuf, len + sizeof(uint64_t));
×
UNCOV
1276
      taosMemFreeClear(pTempBuf);
×
1277
    }
UNCOV
1278
    QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
1279
    goto _end;
×
1280
  }
UNCOV
1281
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
×
UNCOV
1282
  rangeKey.pGroupIds = tSimpleHashInit(8, hashFn);
×
UNCOV
1283
  rangeKey.pUIds = tSimpleHashInit(8, hashFn);
×
UNCOV
1284
  code = putRangeIdInfo(&rangeKey, gpId, uId);
×
UNCOV
1285
  QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
1286
  if (index < 0) {
×
UNCOV
1287
    index = 0;
×
1288
  }
UNCOV
1289
  taosArrayInsert(pRangeArray, index, &rangeKey);
×
1290

UNCOV
1291
_end:
×
UNCOV
1292
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1293
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1294
  }
UNCOV
1295
  return code;
×
1296
}
1297

UNCOV
1298
int32_t mergeSimpleHashMap(SSHashObj* pDest, SSHashObj* pSource) {
×
UNCOV
1299
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1300
  int32_t lino = 0;
×
UNCOV
1301
  void*   pIte = NULL;
×
UNCOV
1302
  int32_t iter = 0;
×
UNCOV
1303
  while ((pIte = tSimpleHashIterate(pSource, pIte, &iter)) != NULL) {
×
UNCOV
1304
    size_t keyLen = 0;
×
UNCOV
1305
    void* pKey = tSimpleHashGetKey(pIte, &keyLen);
×
UNCOV
1306
    code = tSimpleHashPut(pDest, pKey, keyLen, pIte, sizeof(uint64_t));
×
UNCOV
1307
    QUERY_CHECK_CODE(code, lino, _end);
×
1308
  }
UNCOV
1309
  tSimpleHashCleanup(pSource);
×
1310

UNCOV
1311
_end:
×
UNCOV
1312
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1313
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1314
  }
UNCOV
1315
  return code;
×
1316
}
1317

1318
int32_t mergeAllScanRange(STableTsDataState* pTsDataState) {
1✔
1319
  int32_t code = TSDB_CODE_SUCCESS;
1✔
1320
  int32_t lino = 0;
1✔
1321
  SStreamStateCur* pCur = NULL;
1✔
1322
  SArray* pRangeArray = pTsDataState->pScanRanges;
1✔
1323
  if (taosArrayGetSize(pRangeArray) < 2) {
1!
1324
    return code;
1✔
1325
  }
1326

UNCOV
1327
  for (int32_t i = 0; i < taosArrayGetSize(pRangeArray) - 1;) {
×
UNCOV
1328
    SScanRange* pCurRange = taosArrayGet(pRangeArray, i);
×
UNCOV
1329
    SScanRange* pNextRange = taosArrayGet(pRangeArray, i + 1);
×
UNCOV
1330
    if (scanRangeKeyCmpr(pCurRange, pNextRange) == 0) {
×
UNCOV
1331
      pCurRange->win.skey = TMIN(pCurRange->win.skey, pNextRange->win.skey);
×
UNCOV
1332
      pCurRange->win.ekey = TMAX(pCurRange->win.ekey, pNextRange->win.ekey);
×
UNCOV
1333
      code = mergeSimpleHashMap(pCurRange->pGroupIds, pNextRange->pGroupIds);
×
UNCOV
1334
      QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
1335
      code = mergeSimpleHashMap(pCurRange->pUIds, pNextRange->pUIds);
×
UNCOV
1336
      QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
1337
      taosArrayRemove(pRangeArray, i+1);
×
UNCOV
1338
      continue;
×
1339
    }
UNCOV
1340
    i++;
×
1341
  }
1342

UNCOV
1343
  int32_t winRes = TSDB_CODE_SUCCESS;
×
UNCOV
1344
  pCur = streamStateSessionSeekToLast_rocksdb(pTsDataState->pState, INT64_MAX);
×
UNCOV
1345
  while (winRes == TSDB_CODE_SUCCESS) {
×
UNCOV
1346
    void*       pVal = NULL;
×
UNCOV
1347
    int32_t     vlen = 0;
×
UNCOV
1348
    SSessionKey key = {0};
×
UNCOV
1349
    winRes = streamStateSessionGetKVByCur_rocksdb(pTsDataState->pState, pCur, &key, &pVal, &vlen);
×
UNCOV
1350
    if (winRes != TSDB_CODE_SUCCESS) {
×
UNCOV
1351
      break;
×
1352
    }
UNCOV
1353
    int32_t index = 0;
×
UNCOV
1354
    bool merge = false;
×
UNCOV
1355
    SScanRange tmpRange = {.win = key.win, .pUIds = NULL, .pGroupIds = NULL};
×
UNCOV
1356
    int32_t num = vlen / sizeof(uint64_t);
×
UNCOV
1357
    uint64_t* pUids = (uint64_t*) pVal;
×
UNCOV
1358
    for (int32_t i = 0; i < num; i++) {
×
UNCOV
1359
      code = mergeScanRange(pRangeArray, &tmpRange, key.groupId, pUids[i], &index, &merge, pTsDataState->pState->pTaskIdStr);
×
UNCOV
1360
      QUERY_CHECK_CODE(code, lino, _end);
×
1361
    }
UNCOV
1362
    if (merge == true) {
×
UNCOV
1363
      code = streamStateSessionDel_rocksdb(pTsDataState->pState, &key);
×
UNCOV
1364
      QUERY_CHECK_CODE(code, lino, _end);
×
1365
    }
1366
  }
1367

UNCOV
1368
_end:
×
UNCOV
1369
  streamStateFreeCur(pCur);
×
UNCOV
1370
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1371
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1372
  }
UNCOV
1373
  return code;
×
1374
}
1375

1376
int32_t popScanRange(STableTsDataState* pTsDataState, SScanRange* pRange) {
1✔
1377
  int32_t code = TSDB_CODE_SUCCESS;
1✔
1378
  int32_t lino = 0;
1✔
1379
  SStreamStateCur* pCur = NULL;
1✔
1380
  SArray* pRangeArray = pTsDataState->pScanRanges;
1✔
1381
  if (taosArrayGetSize(pRangeArray) > 0) {
1!
UNCOV
1382
    (*pRange) = *(SScanRange*) taosArrayPop(pRangeArray);
×
UNCOV
1383
    goto _end;
×
1384
  }
1385

1386
  {
1387
    pCur = streamStateSessionSeekToLast_rocksdb(pTsDataState->pState, INT64_MAX);
1✔
1388
    SRecDataInfo* pRecVal = NULL;
1✔
1389
    int32_t       vlen = 0;
1✔
1390
    SSessionKey   key = {0};
1✔
1391
    int32_t winRes = streamStateSessionGetKVByCur_rocksdb(pTsDataState->pState, pCur, &key, (void**)&pRecVal, &vlen);
1✔
1392
    if (winRes != TSDB_CODE_SUCCESS) {
1!
1393
      goto _end;
1✔
1394
    }
UNCOV
1395
    qDebug("===stream===get scan range from disc start ts:%" PRId64 ",end ts:%" PRId64 ",group id:%" PRIu64,
×
1396
           key.win.skey, key.win.ekey, key.groupId);
1397

UNCOV
1398
    pRange->win = key.win;
×
UNCOV
1399
    pRange->calWin = pRecVal->calWin;
×
UNCOV
1400
    _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
×
UNCOV
1401
    if (pRange->pGroupIds == NULL) {
×
UNCOV
1402
      pRange->pGroupIds = tSimpleHashInit(8, hashFn);
×
1403
    }
UNCOV
1404
    if (pRange->pUIds == NULL) {
×
UNCOV
1405
      pRange->pUIds = tSimpleHashInit(8, hashFn);
×
1406
    }
UNCOV
1407
    code = putRangeIdInfo(pRange, key.groupId, pRecVal->tableUid);
×
UNCOV
1408
    QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
1409
    code = streamStateSessionDel_rocksdb(pTsDataState->pState, &key);
×
UNCOV
1410
    QUERY_CHECK_CODE(code, lino, _end);
×
1411
  }
1412

1413
_end:
1✔
1414
  streamStateFreeCur(pCur);
1✔
1415
  if (code != TSDB_CODE_SUCCESS) {
1!
UNCOV
1416
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1417
  }
1418
  return code;
1✔
1419
}
1420

UNCOV
1421
int32_t getNLastSessionStateKVByCur(SStreamStateCur* pCur, int32_t num, SArray* pRes) {
×
UNCOV
1422
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1423
  int32_t lino = 0;
×
UNCOV
1424
  if (pCur->pHashData == NULL) {
×
UNCOV
1425
    return TSDB_CODE_FAILED;
×
1426
  }
UNCOV
1427
  SArray*  pWinStates = *((void**)pCur->pHashData);
×
UNCOV
1428
  int32_t size = taosArrayGetSize(pWinStates);
×
UNCOV
1429
  if (size == 0) {
×
UNCOV
1430
    return TSDB_CODE_FAILED;
×
1431
  }
1432

UNCOV
1433
  int32_t i = TMAX(size - num, 0);
×
1434

UNCOV
1435
  for ( ; i < size; i++) {
×
UNCOV
1436
    SRowBuffPos* pPos = taosArrayGetP(pWinStates, i);
×
UNCOV
1437
    QUERY_CHECK_NULL(pPos, code, lino, _end, terrno);
×
1438

UNCOV
1439
    SResultWindowInfo winInfo = {0};
×
UNCOV
1440
    winInfo.pStatePos = pPos;
×
UNCOV
1441
    winInfo.sessionWin = *(SSessionKey*)pPos->pKey;
×
1442

UNCOV
1443
    void* pTempRes = taosArrayPush(pRes, &winInfo);
×
UNCOV
1444
    QUERY_CHECK_NULL(pTempRes, code, lino, _end, terrno);
×
1445
  }
1446

UNCOV
1447
_end:
×
UNCOV
1448
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1449
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1450
  }
UNCOV
1451
  return code;
×
1452
}
1453

UNCOV
1454
bool hasSessionState(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, bool* pIsLast) {
×
UNCOV
1455
  SSHashObj*   pRowStateBuff = getRowStateBuff(pFileState);
×
UNCOV
1456
  void**       ppBuff = (void**)tSimpleHashGet(pRowStateBuff, &pKey->groupId, sizeof(uint64_t));
×
UNCOV
1457
  if (ppBuff == NULL) {
×
UNCOV
1458
    return false;
×
1459
  }
UNCOV
1460
  SArray*      pWinStates = (SArray*)(*ppBuff);
×
UNCOV
1461
  int32_t      size = taosArrayGetSize(pWinStates);
×
UNCOV
1462
  int32_t      index = binarySearch(pWinStates, size, pKey, sessionStateKeyCompare);
×
UNCOV
1463
  SRowBuffPos* pPos = NULL;
×
1464

UNCOV
1465
  if (index >= 0) {
×
UNCOV
1466
    pPos = taosArrayGetP(pWinStates, index);
×
UNCOV
1467
    if (inSessionWindow(pPos->pKey, pKey->win.skey, gap)) {
×
UNCOV
1468
      *pKey = *((SSessionKey*)pPos->pKey);
×
UNCOV
1469
      *pIsLast = (index == size - 1);
×
UNCOV
1470
      return true;
×
1471
    }
1472
  }
1473

UNCOV
1474
  if (index + 1 < size) {
×
UNCOV
1475
    pPos = taosArrayGetP(pWinStates, index + 1);
×
UNCOV
1476
    if (inSessionWindow(pPos->pKey, pKey->win.skey, gap) ||
×
UNCOV
1477
        (pKey->win.ekey != INT64_MIN && inSessionWindow(pPos->pKey, pKey->win.ekey, gap))) {
×
UNCOV
1478
      *pKey = *((SSessionKey*)pPos->pKey);
×
UNCOV
1479
      *pIsLast = (index + 1 == size - 1);
×
UNCOV
1480
      return true;
×
1481
    }
1482
  }
UNCOV
1483
  return false;
×
1484
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc