• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3663

19 Mar 2025 09:21AM UTC coverage: 61.664% (-0.6%) from 62.28%
#3663

push

travis-ci

web-flow
docs: add defination of tmq_config_res_t & fix spell error (#30271)

153169 of 318241 branches covered (48.13%)

Branch coverage included in aggregate %.

239405 of 318390 relevant lines covered (75.19%)

5762846.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.29
/source/libs/stream/src/streamSessionState.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tstreamFileState.h"
17

18
#include "query.h"
19
#include "streamBackendRocksdb.h"
20
#include "tcommon.h"
21
#include "tsimplehash.h"
22

23
#define MAX_SCAN_RANGE_SIZE 102400
24

25
int sessionStateKeyCompare(const void* pWin1, const void* pDatas, int pos) {
36,734✔
26
  SRowBuffPos* pPos2 = taosArrayGetP(pDatas, pos);
36,734✔
27
  SSessionKey* pWin2 = (SSessionKey*)pPos2->pKey;
36,723✔
28
  return sessionWinKeyCmpr((SSessionKey*)pWin1, pWin2);
36,723✔
29
}
30

31
int sessionStateRangeKeyCompare(const SSessionKey* pWin1, const void* pDatas, int pos) {
1,086✔
32
  SRowBuffPos* pPos2 = taosArrayGetP(pDatas, pos);
1,086✔
33
  SSessionKey* pWin2 = (SSessionKey*)pPos2->pKey;
1,086✔
34
  return sessionRangeKeyCmpr(pWin1, pWin2);
1,086✔
35
}
36

37
int32_t binarySearch(void* keyList, int num, const void* key, __session_compare_fn_t cmpFn) {
28,605✔
38
  int firstPos = 0, lastPos = num - 1, midPos = -1;
28,605✔
39
  int numOfRows = 0;
28,605✔
40

41
  if (num <= 0) return -1;
28,605✔
42
  // find the first position which is smaller or equal than the key.
43
  // if all data is bigger than the key return -1
44
  while (1) {
45
    if (cmpFn(key, keyList, lastPos) >= 0) return lastPos;
29,184✔
46
    if (cmpFn(key, keyList, firstPos) == 0) return firstPos;
7,111✔
47
    if (cmpFn(key, keyList, firstPos) < 0) return firstPos - 1;
6,100✔
48

49
    numOfRows = lastPos - firstPos + 1;
2,968✔
50
    midPos = (numOfRows >> 1) + firstPos;
2,968✔
51

52
    if (cmpFn(key, keyList, midPos) < 0) {
2,968✔
53
      lastPos = midPos - 1;
1,572✔
54
    } else if (cmpFn(key, keyList, midPos) > 0) {
1,399✔
55
      firstPos = midPos + 1;
711✔
56
    } else {
57
      break;
693✔
58
    }
59
  }
60

61
  return midPos;
693✔
62
}
63

64
int64_t getSessionWindowEndkey(void* data, int32_t index) {
×
65
  SArray*       pWinInfos = (SArray*)data;
×
66
  SRowBuffPos** ppos = taosArrayGet(pWinInfos, index);
×
67
  if (ppos != NULL) {
×
68
    SSessionKey* pWin = (SSessionKey*)((*ppos)->pKey);
×
69
    return pWin->win.ekey;
×
70
  } else {
71
    return 0;
×
72
  }
73
}
74

75
bool inSessionWindow(SSessionKey* pKey, TSKEY ts, int64_t gap) {
9,670✔
76
  if (ts + gap >= pKey->win.skey && ts - gap <= pKey->win.ekey) {
9,670✔
77
    return true;
1,973✔
78
  }
79
  return false;
7,697✔
80
}
81

82
SStreamStateCur* createStateCursor(SStreamFileState* pFileState) {
13,803✔
83
  SStreamStateCur* pCur = createStreamStateCursor();
13,803✔
84
  if (pCur == NULL) {
13,809!
85
    return NULL;
×
86
  }
87
  pCur->pStreamFileState = pFileState;
13,809✔
88
  return pCur;
13,809✔
89
}
90

91
static int32_t addNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInfos, const SSessionKey* pKey,
5,786✔
92
                                   SRowBuffPos** ppPos) {
93
  int32_t      code = TSDB_CODE_SUCCESS;
5,786✔
94
  int32_t      lino = 0;
5,786✔
95
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
5,786✔
96
  if (!pNewPos || !pNewPos->pRowBuff) {
5,789!
97
    code = TSDB_CODE_OUT_OF_MEMORY;
1✔
98
    QUERY_CHECK_CODE(code, lino, _end);
1!
99
  }
100

101
  memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
5,789✔
102
  void* tmp = taosArrayPush(pWinInfos, &pNewPos);
5,789✔
103
  if (!tmp) {
5,789!
104
    code = terrno;
×
105
    QUERY_CHECK_CODE(code, lino, _end);
×
106
  }
107
  (*ppPos) = pNewPos;
5,789✔
108

109
_end:
5,789✔
110
  if (code != TSDB_CODE_SUCCESS) {
5,789!
111
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
112
  }
113
  return code;
5,789✔
114
}
115

116
static int32_t insertNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInfos, const SSessionKey* pKey,
137✔
117
                                      int32_t index, SRowBuffPos** ppPos) {
118
  int32_t      code = TSDB_CODE_SUCCESS;
137✔
119
  int32_t      lino = 0;
137✔
120
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
137✔
121
  if (!pNewPos || !pNewPos->pRowBuff) {
138!
122
    code = TSDB_CODE_OUT_OF_MEMORY;
×
123
    QUERY_CHECK_CODE(code, lino, _end);
×
124
  }
125

126
  memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
138✔
127
  void* tmp = taosArrayInsert(pWinInfos, index, &pNewPos);
138✔
128
  if (!tmp) {
138!
129
    code = terrno;
×
130
    QUERY_CHECK_CODE(code, lino, _end);
×
131
  }
132

133
  *ppPos = pNewPos;
138✔
134

135
_end:
138✔
136
  if (code != TSDB_CODE_SUCCESS) {
138!
137
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
138
  }
139
  return code;
138✔
140
}
141

142
SRowBuffPos* createSessionWinBuff(SStreamFileState* pFileState, SSessionKey* pKey, void* p, int32_t* pVLen) {
424✔
143
  int32_t      code = TSDB_CODE_SUCCESS;
424✔
144
  int32_t      lino = 0;
424✔
145
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
424✔
146
  if (!pNewPos || !pNewPos->pRowBuff) {
424!
147
    code = TSDB_CODE_OUT_OF_MEMORY;
×
148
    QUERY_CHECK_CODE(code, lino, _end);
×
149
  }
150

151
  memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
424✔
152
  pNewPos->needFree = true;
424✔
153
  pNewPos->beFlushed = true;
424✔
154
  if (p) {
424✔
155
    memcpy(pNewPos->pRowBuff, p, *pVLen);
413✔
156
  } else {
157
    int32_t len = getRowStateRowSize(pFileState);
11✔
158
    memset(pNewPos->pRowBuff, 0, len);
11✔
159
  }
160

161
_end:
424✔
162
  taosMemoryFree(p);
424!
163
  if (code != TSDB_CODE_SUCCESS) {
424!
164
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
165
    return NULL;
×
166
  }
167
  return pNewPos;
424✔
168
}
169

170
int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, int32_t* pVLen,
3,311✔
171
                                int32_t* pWinCode) {
172
  int32_t code = TSDB_CODE_SUCCESS;
3,311✔
173
  int32_t lino = 0;
3,311✔
174
  (*pWinCode) = TSDB_CODE_SUCCESS;
3,311✔
175
  SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
3,311✔
176
  SArray*    pWinStates = NULL;
3,310✔
177
  void**     ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t));
3,310✔
178
  if (ppBuff) {
3,313✔
179
    pWinStates = (SArray*)(*ppBuff);
2,176✔
180
  } else {
181
    pWinStates = taosArrayInit(16, POINTER_BYTES);
1,137✔
182
    if (!pWinStates) {
1,137!
183
      code = terrno;
×
184
      QUERY_CHECK_CODE(code, lino, _end);
×
185
    }
186
    code = tSimpleHashPut(pSessionBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
1,137✔
187
    QUERY_CHECK_CODE(code, lino, _end);
1,137!
188
  }
189

190
  TSKEY startTs = pKey->win.skey;
3,313✔
191
  TSKEY endTs = pKey->win.ekey;
3,313✔
192

193
  int32_t size = taosArrayGetSize(pWinStates);
3,313✔
194
  if (size == 0) {
3,311✔
195
    void*   pFileStore = getStateFileStore(pFileState);
1,233✔
196
    void*   p = NULL;
1,233✔
197
    int32_t code_file = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen);
1,233✔
198
    if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
1,232✔
199
      (*pVal) = createSessionWinBuff(pFileState, pKey, p, pVLen);
51✔
200
      if (!(*pVal)) {
51!
201
        code = TSDB_CODE_OUT_OF_MEMORY;
×
202
        QUERY_CHECK_CODE(code, lino, _end);
×
203
      }
204

205
      (*pWinCode) = code_file;
51✔
206
      qDebug("===stream===0 get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code_file);
51✔
207
    } else {
208
      code = addNewSessionWindow(pFileState, pWinStates, pKey, (SRowBuffPos**)pVal);
1,182✔
209
      (*pWinCode) = TSDB_CODE_FAILED;
1,181✔
210
      taosMemoryFree(p);
1,181!
211
      QUERY_CHECK_CODE(code, lino, _end);
1,182!
212
    }
213
    goto _end;
1,233✔
214
  }
215

216
  // find the first position which is smaller than the pKey
217
  int32_t      index = binarySearch(pWinStates, size, pKey, sessionStateKeyCompare);
2,078✔
218
  SRowBuffPos* pPos = NULL;
2,077✔
219

220
  if (index >= 0) {
2,077✔
221
    pPos = taosArrayGetP(pWinStates, index);
1,777✔
222
    if (inSessionWindow(pPos->pKey, startTs, gap)) {
1,777✔
223
      (*pVal) = pPos;
780✔
224
      SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
780✔
225
      pPos->beUsed = true;
780✔
226
      pPos->beFlushed = false;
780✔
227
      *pKey = *pDestWinKey;
780✔
228
      goto _end;
780✔
229
    }
230
  }
231

232
  if (index + 1 < size) {
1,295✔
233
    pPos = taosArrayGetP(pWinStates, index + 1);
411✔
234
    if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap))) {
411!
235
      (*pVal) = pPos;
83✔
236
      SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
83✔
237
      pPos->beUsed = true;
83✔
238
      pPos->beFlushed = false;
83✔
239
      *pKey = *pDestWinKey;
83✔
240
      goto _end;
83✔
241
    }
242
  }
243

244
  if (index + 1 == 0) {
1,212✔
245
    if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs, gap)) {
278!
246
      void*   p = NULL;
252✔
247
      void*   pFileStore = getStateFileStore(pFileState);
252✔
248
      int32_t code_file = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen);
248✔
249
      if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
248!
250
        (*pVal) = createSessionWinBuff(pFileState, pKey, p, pVLen);
248✔
251
        if (!(*pVal)) {
248!
252
          code = TSDB_CODE_OUT_OF_MEMORY;
×
253
          QUERY_CHECK_CODE(code, lino, _end);
248!
254
        }
255

256
        (*pWinCode) = code_file;
248✔
257
        qDebug("===stream===1 get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code_file);
248✔
258
        goto _end;
248✔
259
      } else {
260
        taosMemoryFree(p);
×
261
      }
262
    }
263
  }
264

265
  if (index == size - 1) {
960✔
266
    code = addNewSessionWindow(pFileState, pWinStates, pKey, (SRowBuffPos**)pVal);
882✔
267
    QUERY_CHECK_CODE(code, lino, _end);
885!
268

269
    (*pWinCode) = TSDB_CODE_FAILED;
885✔
270
    goto _end;
885✔
271
  }
272

273
  code = insertNewSessionWindow(pFileState, pWinStates, pKey, index + 1, (SRowBuffPos**)pVal);
78✔
274
  QUERY_CHECK_CODE(code, lino, _end);
79!
275

276
  (*pWinCode) = TSDB_CODE_FAILED;
79✔
277

278
_end:
3,308✔
279
  if (code != TSDB_CODE_SUCCESS) {
3,308!
280
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
281
  }
282
  return code;
3,308✔
283
}
284

285
int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,
55✔
286
                          int32_t* pWinCode) {
287
  SWinKey*    pTmpkey = pKey;
55✔
288
  SSessionKey pWinKey = {.groupId = pTmpkey->groupId, .win.skey = pTmpkey->ts, .win.ekey = pTmpkey->ts};
55✔
289
  return getSessionWinResultBuff(pFileState, &pWinKey, 0, pVal, pVLen, pWinCode);
55✔
290
}
291

292
int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
126✔
293
  int32_t      code = TSDB_CODE_SUCCESS;
126✔
294
  int32_t      lino = 0;
126✔
295
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
126✔
296
  SSessionKey* pKey = pPos->pKey;
126✔
297
  SArray*      pWinStates = NULL;
126✔
298
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t));
126✔
299
  if (ppBuff) {
126✔
300
    pWinStates = (SArray*)(*ppBuff);
98✔
301
  } else {
302
    pWinStates = taosArrayInit(16, POINTER_BYTES);
28✔
303
    if (!pWinStates) {
28!
304
      code = terrno;
×
305
      QUERY_CHECK_CODE(code, lino, _end);
×
306
    }
307

308
    code = tSimpleHashPut(pSessionBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
28✔
309
    QUERY_CHECK_CODE(code, lino, _end);
28!
310
  }
311

312
  int32_t size = taosArrayGetSize(pWinStates);
126✔
313
  if (size == 0) {
126✔
314
    void* tmp = taosArrayPush(pWinStates, &pPos);
115✔
315
    if (!tmp) {
114!
316
      code = terrno;
×
317
      QUERY_CHECK_CODE(code, lino, _end);
×
318
    }
319
    goto _end;
114✔
320
  }
321

322
  // find the first position which is smaller than the pKey
323
  int32_t index = binarySearch(pWinStates, size, pKey, sessionStateKeyCompare);
11✔
324
  if (index >= 0) {
11!
325
    void* tmp = taosArrayInsert(pWinStates, index, &pPos);
×
326
    if (!tmp) {
×
327
      code = terrno;
×
328
      QUERY_CHECK_CODE(code, lino, _end);
×
329
    }
330
  } else {
331
    void* tmp = taosArrayInsert(pWinStates, 0, &pPos);
11✔
332
    if (!tmp) {
11!
333
      code = terrno;
×
334
      QUERY_CHECK_CODE(code, lino, _end);
×
335
    }
336
  }
337

338
_end:
11✔
339
  pPos->needFree = false;
125✔
340
  if (code != TSDB_CODE_SUCCESS) {
125!
341
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
342
  }
343
  return code;
126✔
344
}
345

346
int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen,
342✔
347
                              int32_t* pWinCode) {
348
  int32_t      code = TSDB_CODE_SUCCESS;
342✔
349
  int32_t      lino = 0;
342✔
350
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
342✔
351
  if (!pNewPos || !pNewPos->pRowBuff) {
342!
352
    code = TSDB_CODE_OUT_OF_MEMORY;
×
353
    QUERY_CHECK_CODE(code, lino, _end);
×
354
  }
355
  pNewPos->needFree = true;
342✔
356
  pNewPos->beFlushed = true;
342✔
357
  void* pBuff = NULL;
342✔
358
  (*pWinCode) = streamStateSessionGet_rocksdb(getStateFileStore(pFileState), pKey, &pBuff, pVLen);
342✔
359
  if ((*pWinCode) != TSDB_CODE_SUCCESS) {
342!
360
    goto _end;
×
361
  }
362
  memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
342✔
363
  memcpy(pNewPos->pRowBuff, pBuff, *pVLen);
342✔
364
  taosMemoryFreeClear(pBuff);
342!
365
  (*pVal) = pNewPos;
342✔
366

367
_end:
342✔
368
  if (code != TSDB_CODE_SUCCESS) {
342!
369
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
370
  }
371
  return code;
342✔
372
}
373

374
int32_t deleteSessionWinStateBuffFn(void* pBuff, const void* key, size_t keyLen) {
1,964✔
375
  SSHashObj*   pSessionBuff = (SSHashObj*)pBuff;
1,964✔
376
  SSessionKey* pWinKey = (SSessionKey*)key;
1,964✔
377
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
1,964✔
378
  if (!ppBuff) {
1,966✔
379
    return TSDB_CODE_SUCCESS;
509✔
380
  }
381
  SArray* pWinStates = (SArray*)(*ppBuff);
1,457✔
382
  int32_t size = taosArrayGetSize(pWinStates);
1,457✔
383
  TSKEY   gap = 0;
1,457✔
384
  int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
1,457✔
385
  if (index >= 0) {
1,457✔
386
    SRowBuffPos* pPos = taosArrayGetP(pWinStates, index);
1,031✔
387
    if (inSessionWindow(pPos->pKey, pWinKey->win.skey, gap)) {
1,031!
388
      pPos->beFlushed = true;
1,031✔
389
      taosArrayRemove(pWinStates, index);
1,031✔
390
    }
391
  }
392
  return TSDB_CODE_SUCCESS;
1,457✔
393
}
394

395
void deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) {
258✔
396
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
258✔
397
  SSessionKey* pWinKey = (SSessionKey*)pPos->pKey;
258✔
398
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
258✔
399
  if (!ppBuff) {
258!
400
    return;
×
401
  }
402
  SArray* pWinStates = (SArray*)(*ppBuff);
258✔
403
  int32_t size = taosArrayGetSize(pWinStates);
258✔
404
  TSKEY   gap = 0;
258✔
405
  int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
258✔
406
  if (index >= 0) {
258✔
407
    SRowBuffPos* pItemPos = taosArrayGetP(pWinStates, index);
58✔
408
    if (pItemPos == pPos) {
58✔
409
      taosArrayRemove(pWinStates, index);
53✔
410
    }
411
  }
412
}
413

414
int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStreamStateCur* pCur,
448✔
415
                                           const SSessionKey* pWinKey, void** ppVal, int32_t* pVLen) {
416
  int32_t      code = TSDB_CODE_SUCCESS;
448✔
417
  int32_t      lino = 0;
448✔
418
  SRowBuffPos* pNewPos = NULL;
448✔
419
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
448✔
420
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
448✔
421
  SArray*      pWinStates = NULL;
448✔
422
  if (!ppBuff) {
448✔
423
    pWinStates = taosArrayInit(16, POINTER_BYTES);
92✔
424
    if (!pWinStates) {
92!
425
      code = terrno;
×
426
      QUERY_CHECK_CODE(code, lino, _end);
×
427
    }
428

429
    code = tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
92✔
430
    QUERY_CHECK_CODE(code, lino, _end);
92!
431
  } else {
432
    pWinStates = (SArray*)(*ppBuff);
356✔
433
  }
434
  if (!pCur) {
448✔
435
    code = addNewSessionWindow(pFileState, pWinStates, pWinKey, &pNewPos);
95✔
436
    QUERY_CHECK_CODE(code, lino, _end);
95!
437

438
    goto _end;
95✔
439
  }
440

441
  int32_t size = taosArrayGetSize(pWinStates);
353✔
442
  if (pCur->buffIndex >= 0) {
353✔
443
    if (pCur->buffIndex >= size) {
339✔
444
      code = addNewSessionWindow(pFileState, pWinStates, pWinKey, &pNewPos);
315✔
445
      QUERY_CHECK_CODE(code, lino, _end);
315!
446

447
      goto _end;
315✔
448
    }
449
    code = insertNewSessionWindow(pFileState, pWinStates, pWinKey, pCur->buffIndex, &pNewPos);
24✔
450
    QUERY_CHECK_CODE(code, lino, _end);
24!
451

452
    goto _end;
24✔
453
  } else {
454
    if (size > 0) {
14✔
455
      SRowBuffPos* pPos = taosArrayGetP(pWinStates, 0);
12✔
456
      if (sessionWinKeyCmpr(pWinKey, pPos->pKey) >= 0) {
12!
457
        // pCur is invalid
458
        SSessionKey pTmpKey = *pWinKey;
×
459
        int32_t     winCode = TSDB_CODE_SUCCESS;
×
460
        code = getSessionWinResultBuff(pFileState, &pTmpKey, 0, (void**)&pNewPos, pVLen, &winCode);
×
461
        QUERY_CHECK_CONDITION((winCode == TSDB_CODE_FAILED), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
462
        QUERY_CHECK_CODE(code, lino, _end);
×
463
        goto _end;
×
464
      }
465
    }
466
    pNewPos = getNewRowPosForWrite(pFileState);
14✔
467
    if (!pNewPos || !pNewPos->pRowBuff) {
14!
468
      code = TSDB_CODE_OUT_OF_MEMORY;
×
469
      QUERY_CHECK_CODE(code, lino, _end);
×
470
    }
471

472
    memcpy(pNewPos->pKey, pWinKey, sizeof(SSessionKey));
14✔
473
    pNewPos->needFree = true;
14✔
474
    pNewPos->beFlushed = true;
14✔
475
  }
476

477
_end:
448✔
478
  (*ppVal) = pNewPos;
448✔
479
  if (code != TSDB_CODE_SUCCESS) {
448!
480
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
481
  }
482
  return code;
448✔
483
}
484

485
void sessionWinStateClear(SStreamFileState* pFileState) {
517✔
486
  int32_t buffSize = getRowStateRowSize(pFileState);
517✔
487
  void*   pIte = NULL;
517✔
488
  size_t  keyLen = 0;
517✔
489
  int32_t iter = 0;
517✔
490
  void*   pBuff = getRowStateBuff(pFileState);
517✔
491
  while ((pIte = tSimpleHashIterate(pBuff, pIte, &iter)) != NULL) {
2,279✔
492
    SArray* pWinStates = *((void**)pIte);
1,762✔
493
    int32_t size = taosArrayGetSize(pWinStates);
1,762✔
494
    for (int32_t i = 0; i < size; i++) {
3,570✔
495
      SRowBuffPos* pPos = taosArrayGetP(pWinStates, i);
1,808✔
496
      memset(pPos->pRowBuff, 0, buffSize);
1,808✔
497
    }
498
  }
499
}
517✔
500

501
void freeArrayPtr(void* ptr) {
3,594✔
502
  SArray* pArray = *(void**)ptr;
3,594✔
503
  taosArrayDestroy(pArray);
3,594✔
504
}
3,594✔
505

506
void sessionWinStateCleanup(void* pBuff) {
7,860✔
507
  if (pBuff == NULL) {
7,860✔
508
    return;
5,646✔
509
  }
510
  tSimpleHashSetFreeFp(pBuff, freeArrayPtr);
2,214✔
511
  tSimpleHashCleanup(pBuff);
2,214✔
512
}
513

514
static SStreamStateCur* seekKeyCurrentPrev_buff(SStreamFileState* pFileState, const SSessionKey* pWinKey,
13,759✔
515
                                                SArray** pWins, int32_t* pIndex) {
516
  SStreamStateCur* pCur = NULL;
13,759✔
517
  SSHashObj*       pSessionBuff = getRowStateBuff(pFileState);
13,759✔
518
  void**           ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
13,760✔
519
  if (!ppBuff) {
13,766✔
520
    return NULL;
1,115✔
521
  }
522

523
  SArray* pWinStates = (SArray*)(*ppBuff);
12,651✔
524
  int32_t size = taosArrayGetSize(pWinStates);
12,651✔
525
  TSKEY   gap = 0;
12,650✔
526
  int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
12,650✔
527

528
  if (pWins) {
12,649✔
529
    (*pWins) = pWinStates;
7,992✔
530
  }
531

532
  if (size > 0 && index == -1) {
12,649✔
533
    SRowBuffPos* pPos = taosArrayGetP(pWinStates, 0);
1,708✔
534
    SSessionKey* pWin = (SSessionKey*)pPos->pKey;
1,708✔
535
    if (pWinKey->win.skey == pWin->win.skey) {
1,708✔
536
      index = 0;
294✔
537
    }
538
  }
539

540
  if (index >= 0) {
12,649✔
541
    pCur = createStateCursor(pFileState);
9,985✔
542
    if (pCur == NULL) {
9,986!
543
      return NULL;
×
544
    }
545
    pCur->buffIndex = index;
9,986✔
546
    if (pIndex) {
9,986✔
547
      *pIndex = index;
6,754✔
548
    }
549
  }
550

551
  return pCur;
12,650✔
552
}
553

554
SStreamStateCur* sessionWinStateSeekKeyCurrentPrev(SStreamFileState* pFileState, const SSessionKey* pWinKey) {
4,942✔
555
  SStreamStateCur* pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, NULL, NULL);
4,942✔
556
  if (pCur) {
4,945✔
557
    return pCur;
3,228✔
558
  }
559

560
  void* pFileStore = getStateFileStore(pFileState);
1,717✔
561
  pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pFileStore, pWinKey);
1,717✔
562
  if (!pCur) {
1,716✔
563
    return NULL;
755✔
564
  }
565
  pCur->buffIndex = -1;
961✔
566
  pCur->pStreamFileState = pFileState;
961✔
567
  return pCur;
961✔
568
}
569

570
SStreamStateCur* sessionWinStateSeekKeyPrev(SStreamFileState *pFileState, const SSessionKey *pWinKey) {
×
571
  SArray*          pWinStates = NULL;
×
572
  int32_t          index = -1;
×
573
  SStreamStateCur *pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index);
×
574
  if (pCur) {
×
575
    int32_t cmpRes= sessionStateRangeKeyCompare(pWinKey, pWinStates, index);
×
576
    if (cmpRes > 0) {
×
577
      return pCur;
×
578
    } else if (cmpRes == 0 && index > 0) {
×
579
      sessionWinStateMoveToPrev(pCur);
×
580
      return pCur;
×
581
    }
582
    streamStateFreeCur(pCur);
×
583
    pCur = NULL;
×
584
  }
585

586
  void* pFileStore = getStateFileStore(pFileState);
×
587
  pCur = streamStateSessionSeekKeyPrev_rocksdb(pFileStore, pWinKey);
×
588
  if (!pCur) {
×
589
    return NULL;
×
590
  }
591
  pCur->buffIndex = -1;
×
592
  pCur->pStreamFileState = pFileState;
×
593
  return pCur;
×
594
}
595

596
static void transformCursor(SStreamFileState* pFileState, SStreamStateCur* pCur) {
481✔
597
  if (!pCur) {
481!
598
    return;
×
599
  }
600
  streamStateResetCur(pCur);
481✔
601
  pCur->buffIndex = 0;
484✔
602
  pCur->pStreamFileState = pFileState;
484✔
603
}
604

605
static void checkAndTransformCursor(SStreamFileState* pFileState, const uint64_t groupId, SArray* pWinStates,
1,831✔
606
                                    SStreamStateCur** ppCur) {
607
  SSessionKey key = {.groupId = groupId};
1,831✔
608
  int32_t     code = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pFileState), *ppCur, &key, NULL, NULL);
1,831✔
609
  if (taosArrayGetSize(pWinStates) > 0 &&
1,831✔
610
      (code == TSDB_CODE_FAILED || sessionStateKeyCompare(&key, pWinStates, 0) >= 0)) {
105!
611
    if (!(*ppCur)) {
471✔
612
      (*ppCur) = createStateCursor(pFileState);
467✔
613
    }
614
    transformCursor(pFileState, *ppCur);
469✔
615
  } else if (*ppCur) {
1,361✔
616
    (*ppCur)->buffIndex = -1;
542✔
617
    (*ppCur)->pStreamFileState = pFileState;
542✔
618
  }
619
}
1,831✔
620

621
SStreamStateCur* sessionWinStateSeekKeyCurrentNext(SStreamFileState* pFileState, const SSessionKey* pWinKey) {
295✔
622
  SArray*          pWinStates = NULL;
295✔
623
  int32_t          index = -1;
295✔
624
  SStreamStateCur* pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index);
295✔
625
  if (pCur) {
295✔
626
    if (sessionStateRangeKeyCompare(pWinKey, pWinStates, index) > 0) {
139✔
627
      sessionWinStateMoveToNext(pCur);
18✔
628
    }
629
    return pCur;
139✔
630
  }
631

632
  void* pFileStore = getStateFileStore(pFileState);
156✔
633
  pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pFileStore, (SSessionKey*)pWinKey);
156✔
634
  checkAndTransformCursor(pFileState, pWinKey->groupId, pWinStates, &pCur);
156✔
635
  return pCur;
156✔
636
}
637

638
SStreamStateCur* sessionWinStateSeekKeyNext(SStreamFileState* pFileState, const SSessionKey* pWinKey) {
8,141✔
639
  SArray*          pWinStates = NULL;
8,141✔
640
  int32_t          index = -1;
8,141✔
641
  SStreamStateCur* pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index);
8,141✔
642
  if (pCur) {
8,144✔
643
    sessionWinStateMoveToNext(pCur);
6,468✔
644
    return pCur;
6,465✔
645
  }
646

647
  void* pFileStore = getStateFileStore(pFileState);
1,676✔
648
  pCur = streamStateSessionSeekKeyNext_rocksdb(pFileStore, pWinKey);
1,676✔
649
  checkAndTransformCursor(pFileState, pWinKey->groupId, pWinStates, &pCur);
1,674✔
650
  return pCur;
1,677✔
651
}
652

653
SStreamStateCur* countWinStateSeekKeyPrev(SStreamFileState* pFileState, const SSessionKey* pWinKey, COUNT_TYPE count) {
381✔
654
  SArray*          pWinStates = NULL;
381✔
655
  int32_t          index = -1;
381✔
656
  SStreamStateCur* pBuffCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index);
381✔
657
  int32_t          resSize = getRowStateRowSize(pFileState);
381✔
658
  COUNT_TYPE       winCount = 0;
381✔
659
  if (pBuffCur) {
381✔
660
    while (index >= 0) {
311✔
661
      SRowBuffPos* pPos = taosArrayGetP(pWinStates, index);
195✔
662
      winCount = *((COUNT_TYPE*)((char*)pPos->pRowBuff + (resSize - sizeof(COUNT_TYPE))));
195✔
663
      if (sessionStateRangeKeyCompare(pWinKey, pWinStates, index) == 0 || winCount < count) {
195✔
664
        index--;
163✔
665
      } else if (index >= 0) {
32!
666
        pBuffCur->buffIndex = index + 1;
32✔
667
        return pBuffCur;
32✔
668
      }
669
    }
670
    pBuffCur->buffIndex = 0;
116✔
671
  } else if (taosArrayGetSize(pWinStates) > 0) {
233✔
672
    pBuffCur = createStateCursor(pFileState);
6✔
673
    if (pBuffCur == NULL) {
6!
674
      return NULL;
×
675
    }
676
    pBuffCur->buffIndex = 0;
6✔
677
  }
678

679
  void*            pFileStore = getStateFileStore(pFileState);
349✔
680
  SStreamStateCur* pCur = streamStateSessionSeekKeyPrev_rocksdb(pFileStore, pWinKey);
349✔
681
  if (pCur) {
349!
682
    pCur->pStreamFileState = pFileState;
×
683
    SSessionKey key = {0};
×
684
    void*       pVal = NULL;
×
685
    int         len = 0;
×
686
    int32_t     code = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, &key, &pVal, &len);
×
687
    if (code == TSDB_CODE_FAILED) {
×
688
      streamStateFreeCur(pCur);
×
689
      return pBuffCur;
×
690
    }
691
    winCount = *((COUNT_TYPE*)((char*)pVal + (resSize - sizeof(COUNT_TYPE))));
×
692
    taosMemoryFreeClear(pVal);
×
693
    streamStateFreeCur(pBuffCur);
×
694
    if (sessionRangeKeyCmpr(pWinKey, &key) != 0 && winCount == count) {
×
695
      streamStateCurNext(pFileStore, pCur);
×
696
      return pCur;
×
697
    }
698
    streamStateCurPrev(pFileStore, pCur);
×
699
    while (1) {
700
      code = streamStateSessionGetKVByCur_rocksdb(NULL, pCur, &key, &pVal, &len);
×
701
      if (code == TSDB_CODE_FAILED) {
×
702
        streamStateCurNext(pFileStore, pCur);
×
703
        return pCur;
×
704
      }
705
      winCount = *((COUNT_TYPE*)((char*)pVal + (resSize - sizeof(COUNT_TYPE))));
×
706
      taosMemoryFreeClear(pVal);
×
707
      if (sessionRangeKeyCmpr(pWinKey, &key) == 0 || winCount < count) {
×
708
        streamStateCurPrev(pFileStore, pCur);
×
709
      } else {
710
        streamStateCurNext(pFileStore, pCur);
×
711
        return pCur;
×
712
      }
713
    }
714
  }
715
  return pBuffCur;
349✔
716
}
717

718
int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
14,450✔
719
  if (!pCur) {
14,450✔
720
    return TSDB_CODE_FAILED;
1,064✔
721
  }
722
  int32_t code = TSDB_CODE_SUCCESS;
13,386✔
723

724
  SSHashObj* pSessionBuff = getRowStateBuff(pCur->pStreamFileState);
13,386✔
725
  void**     ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t));
13,386✔
726
  SArray*    pWinStates = NULL;
13,398✔
727
  if (ppBuff) {
13,398✔
728
    pWinStates = (SArray*)(*ppBuff);
13,010✔
729
  }
730

731
  if (pCur->buffIndex >= 0) {
13,398✔
732
    int32_t size = taosArrayGetSize(pWinStates);
11,796✔
733
    if (pCur->buffIndex >= size) {
11,799✔
734
      return TSDB_CODE_FAILED;
6,930✔
735
    }
736
    SRowBuffPos* pPos = taosArrayGetP(pWinStates, pCur->buffIndex);
4,869✔
737
    if (pVal) {
4,870✔
738
      *pVal = pPos;
1,117✔
739
    }
740
    *pKey = *(SSessionKey*)(pPos->pKey);
4,870✔
741
  } else {
742
    void* pData = NULL;
1,602✔
743
    code = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pCur->pStreamFileState), pCur, pKey, &pData, pVLen);
1,602✔
744
    if (taosArrayGetSize(pWinStates) > 0 &&
1,602✔
745
        (code == TSDB_CODE_FAILED || sessionStateRangeKeyCompare(pKey, pWinStates, 0) >= 0)) {
752!
746
      transformCursor(pCur->pStreamFileState, pCur);
14✔
747
      SRowBuffPos* pPos = taosArrayGetP(pWinStates, pCur->buffIndex);
14✔
748
      if (pVal) {
14!
749
        *pVal = pPos;
×
750
      }
751
      *pKey = *(SSessionKey*)(pPos->pKey);
14✔
752
      code = TSDB_CODE_SUCCESS;
14✔
753
    } else if (code == TSDB_CODE_SUCCESS && pVal) {
1,588✔
754
      SRowBuffPos* pNewPos = getNewRowPosForWrite(pCur->pStreamFileState);
149✔
755
      if (!pNewPos || !pNewPos->pRowBuff) {
149!
756
        code = TSDB_CODE_OUT_OF_MEMORY;
×
757
        taosMemoryFreeClear(pData);
×
758
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
759
        return code;
×
760
      }
761
      memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
149✔
762
      pNewPos->needFree = true;
149✔
763
      pNewPos->beFlushed = true;
149✔
764
      memcpy(pNewPos->pRowBuff, pData, *pVLen);
149✔
765
      (*pVal) = pNewPos;
149✔
766
    }
767
    taosMemoryFreeClear(pData);
1,602!
768
  }
769
  return code;
6,472✔
770
}
771

772
void sessionWinStateMoveToNext(SStreamStateCur* pCur) {
11,497✔
773
  qTrace("move cursor to next");
11,497✔
774
  if (pCur && pCur->buffIndex >= 0) {
11,497✔
775
    pCur->buffIndex++;
7,824✔
776
  } else {
777
    streamStateCurNext_rocksdb(pCur);
3,673✔
778
  }
779
}
11,497✔
780

781
void sessionWinStateMoveToPrev(SStreamStateCur* pCur) {
×
782
  qTrace("move cursor to prev");
×
783
  if (pCur && pCur->buffIndex >= 1) {
×
784
    pCur->buffIndex--;
×
785
  } else {
786
    streamStateCurPrev_rocksdb(pCur);
×
787
  }
788
}
×
789

790
int32_t sessionWinStateGetKeyByRange(SStreamFileState* pFileState, const SSessionKey* key, SSessionKey* curKey,
4,417✔
791
                                     range_cmpr_fn cmpFn) {
792
  SStreamStateCur* pCur = sessionWinStateSeekKeyCurrentPrev(pFileState, key);
4,417✔
793
  SSessionKey      tmpKey = *key;
4,419✔
794
  int32_t          code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL);
4,419✔
795
  bool             hasCurrentPrev = true;
4,423✔
796
  if (code == TSDB_CODE_FAILED) {
4,423✔
797
    streamStateFreeCur(pCur);
853✔
798
    pCur = sessionWinStateSeekKeyNext(pFileState, key);
854✔
799
    code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL);
852✔
800
    hasCurrentPrev = false;
854✔
801
  }
802

803
  if (code == TSDB_CODE_FAILED) {
4,424✔
804
    code = TSDB_CODE_FAILED;
606✔
805
    goto _end;
606✔
806
  }
807

808
  if (cmpFn(key, &tmpKey) == 0) {
3,818✔
809
    *curKey = tmpKey;
2,803✔
810
    goto _end;
2,803✔
811
  } else if (!hasCurrentPrev) {
1,012✔
812
    code = TSDB_CODE_FAILED;
224✔
813
    goto _end;
224✔
814
  }
815

816
  sessionWinStateMoveToNext(pCur);
788✔
817
  code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL);
789✔
818
  if (code == TSDB_CODE_SUCCESS && cmpFn(key, &tmpKey) == 0) {
790✔
819
    *curKey = tmpKey;
348✔
820
  } else {
821
    code = TSDB_CODE_FAILED;
442✔
822
  }
823

824
_end:
4,423✔
825
  streamStateFreeCur(pCur);
4,423✔
826
  return code;
4,423✔
827
}
828

829
int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
3,203✔
830
                              state_key_cmpr_fn fn, void** pVal, int32_t* pVLen, int32_t* pWinCode) {
831
  int32_t code = TSDB_CODE_SUCCESS;
3,203✔
832
  int32_t lino = 0;
3,203✔
833
  (*pWinCode) = TSDB_CODE_SUCCESS;
3,203✔
834

835
  SSessionKey* pWinKey = key;
3,203✔
836
  TSKEY        gap = 0;
3,203✔
837
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
3,203✔
838
  SArray*      pWinStates = NULL;
3,203✔
839

840
  void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
3,203✔
841
  if (ppBuff) {
3,203✔
842
    pWinStates = (SArray*)(*ppBuff);
2,773✔
843
  } else {
844
    pWinStates = taosArrayInit(16, POINTER_BYTES);
430✔
845
    if (!pWinStates) {
430!
846
      code = terrno;
×
847
      QUERY_CHECK_CODE(code, lino, _end);
×
848
    }
849

850
    code = tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
430✔
851
    QUERY_CHECK_CODE(code, lino, _end);
430!
852
  }
853

854
  TSKEY startTs = pWinKey->win.skey;
3,203✔
855
  TSKEY endTs = pWinKey->win.ekey;
3,203✔
856

857
  int32_t size = taosArrayGetSize(pWinStates);
3,203✔
858
  if (size == 0) {
3,203✔
859
    void*   pFileStore = getStateFileStore(pFileState);
741✔
860
    void*   p = NULL;
741✔
861
    int32_t code_file = streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, pVLen);
741✔
862
    if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
741✔
863
      (*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen);
22✔
864
      if (!(*pVal)) {
22!
865
        code = TSDB_CODE_OUT_OF_MEMORY;
×
866
        QUERY_CHECK_CODE(code, lino, _end);
×
867
      }
868

869
      (*pWinCode) = code_file;
22✔
870
      qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
22✔
871
             pWinKey->win.ekey, code_file);
872
    } else {
873
      code = addNewSessionWindow(pFileState, pWinStates, key, (SRowBuffPos**)pVal);
719✔
874
      (*pWinCode) = TSDB_CODE_FAILED;
719✔
875
      taosMemoryFree(p);
719!
876
      QUERY_CHECK_CODE(code, lino, _end);
719!
877
    }
878
    goto _end;
741✔
879
  }
880

881
  // find the first position which is smaller than the pWinKey
882
  int32_t      index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
2,462✔
883
  SRowBuffPos* pPos = NULL;
2,462✔
884
  int32_t      valSize = *pVLen;
2,462✔
885

886
  if (index >= 0) {
2,462✔
887
    pPos = taosArrayGetP(pWinStates, index);
2,395✔
888
    void* stateKey = (char*)(pPos->pRowBuff) + (valSize - keyDataLen);
2,395✔
889
    if (inSessionWindow(pPos->pKey, startTs, gap) || fn(pKeyData, stateKey) == true) {
2,395✔
890
      (*pVal) = pPos;
1,950✔
891
      SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
1,950✔
892
      pPos->beUsed = true;
1,950✔
893
      pPos->beFlushed = false;
1,950✔
894
      *key = *pDestWinKey;
1,950✔
895
      goto _end;
1,950✔
896
    }
897
  }
898

899
  if (index + 1 < size) {
512✔
900
    pPos = taosArrayGetP(pWinStates, index + 1);
95✔
901
    void* stateKey = (char*)(pPos->pRowBuff) + (valSize - keyDataLen);
95✔
902
    if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap)) ||
188!
903
        fn(pKeyData, stateKey) == true) {
93✔
904
      (*pVal) = pPos;
9✔
905
      SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
9✔
906
      pPos->beUsed = true;
9✔
907
      pPos->beFlushed = false;
9✔
908
      *key = *pDestWinKey;
9✔
909
      goto _end;
9✔
910
    }
911
  }
912

913
  if (index + 1 == 0) {
503✔
914
    if (!isDeteled(pFileState, endTs)) {
60!
915
      void*   p = NULL;
60✔
916
      void*   pFileStore = getStateFileStore(pFileState);
60✔
917
      int32_t code_file =
918
          streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, pVLen);
60✔
919
      if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
60✔
920
        (*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen);
51✔
921
        if (!(*pVal)) {
51!
922
          code = TSDB_CODE_OUT_OF_MEMORY;
×
923
          QUERY_CHECK_CODE(code, lino, _end);
51!
924
        }
925

926
        (*pWinCode) = code_file;
51✔
927
        qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
51✔
928
               pWinKey->win.ekey, code_file);
929
        goto _end;
51✔
930
      } else {
931
        taosMemoryFree(p);
9!
932
      }
933
    }
934
  }
935

936
  if (index == size - 1) {
452✔
937
    code = addNewSessionWindow(pFileState, pWinStates, key, (SRowBuffPos**)pVal);
417✔
938
    QUERY_CHECK_CODE(code, lino, _end);
417!
939

940
    (*pWinCode) = TSDB_CODE_FAILED;
417✔
941
    goto _end;
417✔
942
  }
943
  code = insertNewSessionWindow(pFileState, pWinStates, key, index + 1, (SRowBuffPos**)pVal);
35✔
944
  QUERY_CHECK_CODE(code, lino, _end);
35!
945

946
  (*pWinCode) = TSDB_CODE_FAILED;
35✔
947

948
_end:
3,203✔
949
  if (code != TSDB_CODE_SUCCESS) {
3,203!
950
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
951
  }
952
  return code;
3,203✔
953
}
954

955
int32_t getCountWinStateFromDisc(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
1,571✔
956
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, pKey);
1,571✔
957
  int32_t          code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, pKey, pVal, pVLen);
1,571✔
958
  streamStateFreeCur(pCur);
1,568✔
959
  if (code == TSDB_CODE_SUCCESS) {
1,568!
960
    return code;
×
961
  } else {
962
    pCur = streamStateSessionSeekKeyPrev_rocksdb(pState, pKey);
1,568✔
963
  }
964

965
  code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, pKey, pVal, pVLen);
1,571✔
966
  streamStateFreeCur(pCur);
1,571✔
967
  return code;
1,571✔
968
}
969

970
int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal,
4,836✔
971
                              int32_t* pVLen, int32_t* pWinCount) {
972
  int32_t code = TSDB_CODE_SUCCESS;
4,836✔
973
  int32_t lino = 0;
4,836✔
974
  (*pWinCount) = TSDB_CODE_SUCCESS;
4,836✔
975

976
  SSessionKey* pWinKey = pKey;
4,836✔
977
  const TSKEY  gap = 0;
4,836✔
978
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
4,836✔
979
  SArray*      pWinStates = NULL;
4,838✔
980
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
4,838✔
981
  if (ppBuff) {
4,840✔
982
    pWinStates = (SArray*)(*ppBuff);
3,508✔
983
  } else {
984
    pWinStates = taosArrayInit(16, POINTER_BYTES);
1,332✔
985
    if (!pWinStates) {
1,332!
986
      code = terrno;
×
987
      QUERY_CHECK_CODE(code, lino, _end);
×
988
    }
989

990
    code = tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
1,332✔
991
    QUERY_CHECK_CODE(code, lino, _end);
1,332!
992
  }
993

994
  TSKEY startTs = pWinKey->win.skey;
4,840✔
995
  TSKEY endTs = pWinKey->win.ekey;
4,840✔
996

997
  int32_t size = taosArrayGetSize(pWinStates);
4,840✔
998
  if (size == 0) {
4,840✔
999
    void* pFileStore = getStateFileStore(pFileState);
1,344✔
1000
    void* pRockVal = NULL;
1,344✔
1001
    (*pWinCount) = getCountWinStateFromDisc(pFileStore, pWinKey, &pRockVal, pVLen);
1,344✔
1002
    if ((*pWinCount) == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
1,344!
1003
      qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
52✔
1004
             pWinKey->win.ekey, (*pWinCount));
1005
      if ((*pWinCount) == TSDB_CODE_SUCCESS) {
52!
1006
        int32_t     valSize = *pVLen;
52✔
1007
        COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)((char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)));
52✔
1008
        if (inSessionWindow(pWinKey, startTs, gap) || (*pWinStateCout) < winCount) {
52!
1009
          (*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
41✔
1010
          if (!(*pVal)) {
41!
1011
            code = TSDB_CODE_OUT_OF_MEMORY;
×
1012
            QUERY_CHECK_CODE(code, lino, _end);
×
1013
          }
1014

1015
          goto _end;
41✔
1016
        }
1017
      }
1018
      pWinKey->win.skey = startTs;
11✔
1019
      pWinKey->win.ekey = endTs;
11✔
1020
      (*pVal) = createSessionWinBuff(pFileState, pWinKey, NULL, NULL);
11✔
1021
      taosMemoryFree(pRockVal);
11!
1022
      if (!(*pVal)) {
11!
1023
        code = TSDB_CODE_OUT_OF_MEMORY;
×
1024
        QUERY_CHECK_CODE(code, lino, _end);
×
1025
      }
1026
    } else {
1027
      code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
1,292✔
1028
      QUERY_CHECK_CODE(code, lino, _end);
1,292!
1029

1030
      (*pWinCount) = TSDB_CODE_FAILED;
1,292✔
1031
    }
1032
    goto _end;
1,303✔
1033
  }
1034

1035
  // find the first position which is smaller than the pWinKey
1036
  int32_t      index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
3,496✔
1037
  SRowBuffPos* pPos = NULL;
3,496✔
1038
  int32_t      valSize = *pVLen;
3,496✔
1039

1040
  if (index >= 0) {
3,496✔
1041
    pPos = taosArrayGetP(pWinStates, index);
3,487✔
1042
    COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)((char*)(pPos->pRowBuff) + (valSize - sizeof(COUNT_TYPE)));
3,487✔
1043
    if (inSessionWindow(pPos->pKey, startTs, gap) || (index == size - 1 && (*pWinStateCout) < winCount)) {
3,487✔
1044
      (*pVal) = pPos;
2,830✔
1045
      SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
2,830✔
1046
      pPos->beUsed = true;
2,830✔
1047
      pPos->beFlushed = false;
2,830✔
1048
      *pWinKey = *pDestWinKey;
2,830✔
1049
      goto _end;
2,830✔
1050
    }
1051
  }
1052

1053
  if (index == -1) {
666✔
1054
    if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs, 0)) {
9!
1055
      SSessionKey tmpKey = *pWinKey;
×
1056
      void*       pRockVal = NULL;
×
1057
      void*       pFileStore = getStateFileStore(pFileState);
×
1058
      int32_t     code_file = getCountWinStateFromDisc(pFileStore, &tmpKey, &pRockVal, pVLen);
×
1059
      if (code_file == TSDB_CODE_SUCCESS) {
×
1060
        SRowBuffPos* pFirstPos = taosArrayGetP(pWinStates, 0);
×
1061
        SSessionKey* pFirstWinKey = (SSessionKey*)pFirstPos->pKey;
×
1062
        if (tmpKey.win.ekey < pFirstWinKey->win.skey) {
×
1063
          *pWinKey = tmpKey;
×
1064
          (*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
×
1065
          if (!(*pVal)) {
×
1066
            code = TSDB_CODE_OUT_OF_MEMORY;
×
1067
            QUERY_CHECK_CODE(code, lino, _end);
×
1068
          }
1069

1070
          (*pWinCount) = code_file;
×
1071
          qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
×
1072
                 pWinKey->win.ekey, code_file);
1073
          goto _end;
×
1074
        }
1075
      }
1076
      taosMemoryFree(pRockVal);
×
1077
    }
1078
  }
1079

1080
  if (index + 1 < size) {
666✔
1081
    pPos = taosArrayGetP(pWinStates, index + 1);
43✔
1082
    (*pVal) = pPos;
43✔
1083
    SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
43✔
1084
    pPos->beUsed = true;
43✔
1085
    pPos->beFlushed = false;
43✔
1086
    *pWinKey = *pDestWinKey;
43✔
1087
    goto _end;
43✔
1088
  }
1089

1090
  code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
623✔
1091
  QUERY_CHECK_CODE(code, lino, _end);
623!
1092

1093
  (*pWinCount) = TSDB_CODE_FAILED;
623✔
1094

1095
_end:
4,840✔
1096
  if (code != TSDB_CODE_SUCCESS) {
4,840!
1097
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1098
  }
1099
  return code;
4,840✔
1100
}
1101

1102
int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal,
262✔
1103
                                 int32_t* pVLen) {
1104
  SSessionKey* pWinKey = pKey;
262✔
1105
  const TSKEY  gap = 0;
262✔
1106
  int32_t      code = TSDB_CODE_SUCCESS;
262✔
1107
  int32_t      lino = 0;
262✔
1108
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
262✔
1109
  SArray*      pWinStates = NULL;
262✔
1110
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
262✔
1111
  if (ppBuff) {
262✔
1112
    pWinStates = (SArray*)(*ppBuff);
46✔
1113
  } else {
1114
    pWinStates = taosArrayInit(16, POINTER_BYTES);
216✔
1115
    code = tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
216✔
1116
    QUERY_CHECK_CODE(code, lino, _end);
216!
1117
  }
1118

1119
  TSKEY startTs = pWinKey->win.skey;
262✔
1120
  TSKEY endTs = pWinKey->win.ekey;
262✔
1121

1122
  int32_t size = taosArrayGetSize(pWinStates);
262✔
1123
  if (size == 0) {
262✔
1124
    void* pFileStore = getStateFileStore(pFileState);
227✔
1125
    void* pRockVal = NULL;
227✔
1126

1127
    int32_t code_file = getCountWinStateFromDisc(pFileStore, pWinKey, &pRockVal, pVLen);
227✔
1128
    if (code_file == TSDB_CODE_SUCCESS && isFlushedState(pFileState, endTs, 0)) {
227!
1129
      int32_t     valSize = *pVLen;
×
1130
      COUNT_TYPE* pWinStateCount = (COUNT_TYPE*)((char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)));
×
1131
      if ((*pWinStateCount) == winCount) {
×
1132
        code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
×
1133
        QUERY_CHECK_CODE(code, lino, _end);
×
1134
      } else {
1135
        (*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
×
1136
        if (!(*pVal)) {
×
1137
          code = TSDB_CODE_OUT_OF_MEMORY;
×
1138
          QUERY_CHECK_CODE(code, lino, _end);
×
1139
        }
1140
        qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
×
1141
               pWinKey->win.ekey, code_file);
1142
      }
1143
    } else {
1144
      code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
227✔
1145
      taosMemoryFree(pRockVal);
227!
1146
      QUERY_CHECK_CODE(code, lino, _end);
227!
1147
    }
1148
  } else {
1149
    code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
35✔
1150
    QUERY_CHECK_CODE(code, lino, _end);
35!
1151
  }
1152

1153
_end:
35✔
1154
  if (code != TSDB_CODE_SUCCESS) {
262!
1155
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1156
  }
1157
  return code;
262✔
1158
}
1159

1160
static int compareRangeKey(const void* pKey, const void* data, int index) {
×
1161
  SScanRange* pRange1 = (SScanRange*) pKey;
×
1162
  SScanRange* pRange2 = taosArrayGet((SArray*)data, index);
×
1163
  if (pRange1->win.skey > pRange2->win.skey) {
×
1164
    return 1;
×
1165
  } else if (pRange1->win.skey < pRange2->win.skey) {
×
1166
    return -1;
×
1167
  }
1168

1169
  if (pRange1->win.ekey > pRange2->win.ekey) {
×
1170
    return 1;
×
1171
  } else if (pRange1->win.ekey < pRange2->win.ekey) {
×
1172
    return -1;
×
1173
  }
1174

1175
  return 0;
×
1176
}
1177

1178
static int32_t scanRangeKeyCmpr(const SScanRange* pRange1, const SScanRange* pRange2) {
×
1179
  if (pRange1->win.skey > pRange2->win.ekey) {
×
1180
    return 1;
×
1181
  } else if (pRange1->win.ekey < pRange2->win.skey) {
×
1182
    return -1;
×
1183
  }
1184

1185
  return 0;
×
1186
}
1187

1188
static int32_t putRangeIdInfo(SScanRange* pRangeKey, uint64_t gpId, uint64_t uId) {
×
1189
  int32_t code = TSDB_CODE_SUCCESS;
×
1190
  int32_t lino = 0;
×
1191

1192
  code = tSimpleHashPut(pRangeKey->pUIds, &uId, sizeof(uint64_t), NULL, 0);
×
1193
  QUERY_CHECK_CODE(code, lino, _end);
×
1194
  code = tSimpleHashPut(pRangeKey->pGroupIds, &gpId, sizeof(uint64_t), NULL, 0);
×
1195
  QUERY_CHECK_CODE(code, lino, _end);
×
1196
_end:
×
1197
  if (code != TSDB_CODE_SUCCESS) {
×
1198
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1199
  }
1200
  return code;
×
1201
}
1202

1203
void mergeRangeKey(SScanRange* pRangeDest, SScanRange* pRangeSrc) {
×
1204
  pRangeDest->win.skey = TMIN(pRangeDest->win.skey, pRangeSrc->win.skey);
×
1205
  pRangeDest->win.ekey = TMAX(pRangeDest->win.ekey, pRangeSrc->win.ekey);
×
1206
  pRangeDest->calWin.skey = TMIN(pRangeDest->calWin.skey, pRangeSrc->calWin.skey);
×
1207
  pRangeDest->calWin.ekey = TMAX(pRangeDest->calWin.ekey, pRangeSrc->calWin.ekey);
×
1208
}
×
1209

1210
int32_t mergeScanRange(SArray* pRangeArray, SScanRange* pRangeKey, uint64_t gpId, uint64_t uId, int32_t* pIndex, bool* pRes, char* idStr) {
×
1211
  int32_t code = TSDB_CODE_SUCCESS;
×
1212
  int32_t lino = 0;
×
1213
  int32_t size = taosArrayGetSize(pRangeArray);
×
1214
  int32_t index = binarySearch(pRangeArray, size, pRangeKey, compareRangeKey);
×
1215
  if (index >= 0) {
×
1216
    SScanRange* pFindRangeKey = (SScanRange*) taosArrayGet(pRangeArray, index);
×
1217
    if (scanRangeKeyCmpr(pFindRangeKey, pRangeKey) == 0) {
×
1218
      mergeRangeKey(pFindRangeKey, pRangeKey);
×
1219
      code = putRangeIdInfo(pFindRangeKey, gpId, uId);
×
1220
      QUERY_CHECK_CODE(code, lino, _end);
×
1221
      *pRes = true;
×
1222
      goto _end;
×
1223
    }
1224
  }
1225
  if (index + 1 < size) {
×
1226
    SScanRange* pFindRangeKey = (SScanRange*) taosArrayGet(pRangeArray, index + 1);
×
1227
    if (scanRangeKeyCmpr(pFindRangeKey, pRangeKey) == 0) {
×
1228
      mergeRangeKey(pFindRangeKey, pRangeKey);
×
1229
      code = putRangeIdInfo(pFindRangeKey, gpId, uId);
×
1230
      QUERY_CHECK_CODE(code, lino, _end);
×
1231
      *pRes = true;
×
1232
      goto _end;
×
1233
    }
1234
  }
1235
  (*pIndex) = index;
×
1236
  *pRes = false;
×
1237

1238
_end:
×
1239
  qDebug("===stream===%s mergeScanRange start ts:%" PRId64 ",end ts:%" PRId64 ",group id:%" PRIu64 ", uid:%" PRIu64
×
1240
         ", res:%d", idStr, pRangeKey->win.skey, pRangeKey->win.ekey, gpId, uId, *pRes);
1241
  if (code != TSDB_CODE_SUCCESS) {
×
1242
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1243
  }
1244
  return code;
×
1245
}
1246

1247
int32_t mergeAndSaveScanRange(STableTsDataState* pTsDataState, STimeWindow* pWin, uint64_t gpId, SRecDataInfo* pRecData, int32_t recLen) {
×
1248
  int32_t code = TSDB_CODE_SUCCESS;
×
1249
  int32_t lino = 0;
×
1250

1251
  SArray* pRangeArray = pTsDataState->pScanRanges;
×
1252
  uint64_t uId = pRecData->tableUid;
×
1253

1254
  int32_t index = 0;
×
1255
  bool merge = false;
×
1256
  SScanRange rangeKey = {.win = *pWin, .calWin = pRecData->calWin, .pUIds = NULL, .pGroupIds = NULL};
×
1257
  code = mergeScanRange(pRangeArray, &rangeKey, gpId, uId, &index, &merge, pTsDataState->pState->pTaskIdStr);
×
1258
  QUERY_CHECK_CODE(code, lino, _end);
×
1259
  if (merge == true) {
×
1260
    goto _end;
×
1261
  }
1262

1263
  int32_t size = taosArrayGetSize(pRangeArray);
×
1264
  if (size > MAX_SCAN_RANGE_SIZE) {
×
1265
    SSessionKey sesKey = {.win = *pWin, .groupId = gpId};
×
1266
    void* pVal = NULL;
×
1267
    int32_t len = 0;
×
1268
    int32_t winCode = streamStateSessionGet_rocksdb(pTsDataState->pState, &sesKey, &pVal, &len);
×
1269
    if (winCode != TSDB_CODE_SUCCESS) {
×
1270
      code = streamStateSessionPut_rocksdb(pTsDataState->pState, &sesKey, &uId, sizeof(uint64_t));
×
1271
    } else {
1272
      char* pTempBuf = taosMemoryRealloc(pVal, len + sizeof(uint64_t));
×
1273
      QUERY_CHECK_NULL(pTempBuf, code, lino, _end, terrno);
×
1274
      memcpy(pTempBuf+len, &uId, sizeof(uint64_t));
×
1275
      code = streamStateSessionPut_rocksdb(pTsDataState->pState, &sesKey, pTempBuf, len + sizeof(uint64_t));
×
1276
      taosMemFreeClear(pTempBuf);
×
1277
    }
1278
    QUERY_CHECK_CODE(code, lino, _end);
×
1279
    goto _end;
×
1280
  }
1281
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
×
1282
  rangeKey.pGroupIds = tSimpleHashInit(8, hashFn);
×
1283
  rangeKey.pUIds = tSimpleHashInit(8, hashFn);
×
1284
  code = putRangeIdInfo(&rangeKey, gpId, uId);
×
1285
  QUERY_CHECK_CODE(code, lino, _end);
×
1286
  if (index < 0) {
×
1287
    index = 0;
×
1288
  }
1289
  taosArrayInsert(pRangeArray, index, &rangeKey);
×
1290

1291
_end:
×
1292
  if (code != TSDB_CODE_SUCCESS) {
×
1293
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1294
  }
1295
  return code;
×
1296
}
1297

1298
int32_t mergeSimpleHashMap(SSHashObj* pDest, SSHashObj* pSource) {
×
1299
  int32_t code = TSDB_CODE_SUCCESS;
×
1300
  int32_t lino = 0;
×
1301
  void*   pIte = NULL;
×
1302
  int32_t iter = 0;
×
1303
  while ((pIte = tSimpleHashIterate(pSource, pIte, &iter)) != NULL) {
×
1304
    size_t keyLen = 0;
×
1305
    void* pKey = tSimpleHashGetKey(pIte, &keyLen);
×
1306
    code = tSimpleHashPut(pDest, pKey, keyLen, pIte, sizeof(uint64_t));
×
1307
    QUERY_CHECK_CODE(code, lino, _end);
×
1308
  }
1309
  tSimpleHashCleanup(pSource);
×
1310

1311
_end:
×
1312
  if (code != TSDB_CODE_SUCCESS) {
×
1313
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1314
  }
1315
  return code;
×
1316
}
1317

1318
int32_t mergeAllScanRange(STableTsDataState* pTsDataState) {
1✔
1319
  int32_t code = TSDB_CODE_SUCCESS;
1✔
1320
  int32_t lino = 0;
1✔
1321
  SStreamStateCur* pCur = NULL;
1✔
1322
  SArray* pRangeArray = pTsDataState->pScanRanges;
1✔
1323
  if (taosArrayGetSize(pRangeArray) < 2) {
1!
1324
    return code;
1✔
1325
  }
1326

1327
  for (int32_t i = 0; i < taosArrayGetSize(pRangeArray) - 1;) {
×
1328
    SScanRange* pCurRange = taosArrayGet(pRangeArray, i);
×
1329
    SScanRange* pNextRange = taosArrayGet(pRangeArray, i + 1);
×
1330
    if (scanRangeKeyCmpr(pCurRange, pNextRange) == 0) {
×
1331
      pCurRange->win.skey = TMIN(pCurRange->win.skey, pNextRange->win.skey);
×
1332
      pCurRange->win.ekey = TMAX(pCurRange->win.ekey, pNextRange->win.ekey);
×
1333
      code = mergeSimpleHashMap(pCurRange->pGroupIds, pNextRange->pGroupIds);
×
1334
      QUERY_CHECK_CODE(code, lino, _end);
×
1335
      code = mergeSimpleHashMap(pCurRange->pUIds, pNextRange->pUIds);
×
1336
      QUERY_CHECK_CODE(code, lino, _end);
×
1337
      taosArrayRemove(pRangeArray, i+1);
×
1338
      continue;
×
1339
    }
1340
    i++;
×
1341
  }
1342

1343
  int32_t winRes = TSDB_CODE_SUCCESS;
×
1344
  pCur = streamStateSessionSeekToLast_rocksdb(pTsDataState->pState, INT64_MAX);
×
1345
  while (winRes == TSDB_CODE_SUCCESS) {
×
1346
    void*       pVal = NULL;
×
1347
    int32_t     vlen = 0;
×
1348
    SSessionKey key = {0};
×
1349
    winRes = streamStateSessionGetKVByCur_rocksdb(pTsDataState->pState, pCur, &key, &pVal, &vlen);
×
1350
    if (winRes != TSDB_CODE_SUCCESS) {
×
1351
      break;
×
1352
    }
1353
    int32_t index = 0;
×
1354
    bool merge = false;
×
1355
    SScanRange tmpRange = {.win = key.win, .pUIds = NULL, .pGroupIds = NULL};
×
1356
    int32_t num = vlen / sizeof(uint64_t);
×
1357
    uint64_t* pUids = (uint64_t*) pVal;
×
1358
    for (int32_t i = 0; i < num; i++) {
×
1359
      code = mergeScanRange(pRangeArray, &tmpRange, key.groupId, pUids[i], &index, &merge, pTsDataState->pState->pTaskIdStr);
×
1360
      QUERY_CHECK_CODE(code, lino, _end);
×
1361
    }
1362
    if (merge == true) {
×
1363
      code = streamStateSessionDel_rocksdb(pTsDataState->pState, &key);
×
1364
      QUERY_CHECK_CODE(code, lino, _end);
×
1365
    }
1366
  }
1367

1368
_end:
×
1369
  streamStateFreeCur(pCur);
×
1370
  if (code != TSDB_CODE_SUCCESS) {
×
1371
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1372
  }
1373
  return code;
×
1374
}
1375

1376
int32_t popScanRange(STableTsDataState* pTsDataState, SScanRange* pRange) {
1✔
1377
  int32_t code = TSDB_CODE_SUCCESS;
1✔
1378
  int32_t lino = 0;
1✔
1379
  SStreamStateCur* pCur = NULL;
1✔
1380
  SArray* pRangeArray = pTsDataState->pScanRanges;
1✔
1381
  if (taosArrayGetSize(pRangeArray) > 0) {
1!
1382
    (*pRange) = *(SScanRange*) taosArrayPop(pRangeArray);
×
1383
    goto _end;
×
1384
  }
1385

1386
  {
1387
    pCur = streamStateSessionSeekToLast_rocksdb(pTsDataState->pState, INT64_MAX);
1✔
1388
    SRecDataInfo* pRecVal = NULL;
1✔
1389
    int32_t       vlen = 0;
1✔
1390
    SSessionKey   key = {0};
1✔
1391
    int32_t winRes = streamStateSessionGetKVByCur_rocksdb(pTsDataState->pState, pCur, &key, (void**)&pRecVal, &vlen);
1✔
1392
    if (winRes != TSDB_CODE_SUCCESS) {
1!
1393
      goto _end;
1✔
1394
    }
1395
    qDebug("===stream===get scan range from disc start ts:%" PRId64 ",end ts:%" PRId64 ",group id:%" PRIu64,
×
1396
           key.win.skey, key.win.ekey, key.groupId);
1397

1398
    pRange->win = key.win;
×
1399
    pRange->calWin = pRecVal->calWin;
×
1400
    _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
×
1401
    if (pRange->pGroupIds == NULL) {
×
1402
      pRange->pGroupIds = tSimpleHashInit(8, hashFn);
×
1403
    }
1404
    if (pRange->pUIds == NULL) {
×
1405
      pRange->pUIds = tSimpleHashInit(8, hashFn);
×
1406
    }
1407
    code = putRangeIdInfo(pRange, key.groupId, pRecVal->tableUid);
×
1408
    QUERY_CHECK_CODE(code, lino, _end);
×
1409
    code = streamStateSessionDel_rocksdb(pTsDataState->pState, &key);
×
1410
    QUERY_CHECK_CODE(code, lino, _end);
×
1411
  }
1412

1413
_end:
1✔
1414
  streamStateFreeCur(pCur);
1✔
1415
  if (code != TSDB_CODE_SUCCESS) {
1!
1416
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1417
  }
1418
  return code;
1✔
1419
}
1420

1421
int32_t getNLastSessionStateKVByCur(SStreamStateCur* pCur, int32_t num, SArray* pRes) {
×
1422
  int32_t code = TSDB_CODE_SUCCESS;
×
1423
  int32_t lino = 0;
×
1424
  if (pCur->pHashData == NULL) {
×
1425
    return TSDB_CODE_FAILED;
×
1426
  }
1427
  SArray*  pWinStates = *((void**)pCur->pHashData);
×
1428
  int32_t size = taosArrayGetSize(pWinStates);
×
1429
  if (size == 0) {
×
1430
    return TSDB_CODE_FAILED;
×
1431
  }
1432

1433
  int32_t i = TMAX(size - num, 0);
×
1434

1435
  for ( ; i < size; i++) {
×
1436
    SRowBuffPos* pPos = taosArrayGetP(pWinStates, i);
×
1437
    QUERY_CHECK_NULL(pPos, code, lino, _end, terrno);
×
1438

1439
    SResultWindowInfo winInfo = {0};
×
1440
    winInfo.pStatePos = pPos;
×
1441
    winInfo.sessionWin = *(SSessionKey*)pPos->pKey;
×
1442

1443
    void* pTempRes = taosArrayPush(pRes, &winInfo);
×
1444
    QUERY_CHECK_NULL(pTempRes, code, lino, _end, terrno);
×
1445
  }
1446

1447
_end:
×
1448
  if (code != TSDB_CODE_SUCCESS) {
×
1449
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1450
  }
1451
  return code;
×
1452
}
1453

1454
bool hasSessionState(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, bool* pIsLast) {
×
1455
  SSHashObj*   pRowStateBuff = getRowStateBuff(pFileState);
×
1456
  void**       ppBuff = (void**)tSimpleHashGet(pRowStateBuff, &pKey->groupId, sizeof(uint64_t));
×
1457
  if (ppBuff == NULL) {
×
1458
    return false;
×
1459
  }
1460
  SArray*      pWinStates = (SArray*)(*ppBuff);
×
1461
  int32_t      size = taosArrayGetSize(pWinStates);
×
1462
  int32_t      index = binarySearch(pWinStates, size, pKey, sessionStateKeyCompare);
×
1463
  SRowBuffPos* pPos = NULL;
×
1464

1465
  if (index >= 0) {
×
1466
    pPos = taosArrayGetP(pWinStates, index);
×
1467
    if (inSessionWindow(pPos->pKey, pKey->win.skey, gap)) {
×
1468
      *pKey = *((SSessionKey*)pPos->pKey);
×
1469
      *pIsLast = (index == size - 1);
×
1470
      return true;
×
1471
    }
1472
  }
1473

1474
  if (index + 1 < size) {
×
1475
    pPos = taosArrayGetP(pWinStates, index + 1);
×
1476
    if (inSessionWindow(pPos->pKey, pKey->win.skey, gap) ||
×
1477
        (pKey->win.ekey != INT64_MIN && inSessionWindow(pPos->pKey, pKey->win.ekey, gap))) {
×
1478
      *pKey = *((SSessionKey*)pPos->pKey);
×
1479
      *pIsLast = (index + 1 == size - 1);
×
1480
      return true;
×
1481
    }
1482
  }
1483
  return false;
×
1484
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc