• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3524

08 Nov 2024 04:27AM UTC coverage: 60.898% (+5.0%) from 55.861%
#3524

push

travis-ci

web-flow
Merge pull request #28647 from taosdata/fix/3.0/TD-32519_drop_ctb

fix TD-32519 drop child table with tsma caused crash

118687 of 248552 branches covered (47.75%)

Branch coverage included in aggregate %.

286 of 337 new or added lines in 18 files covered. (84.87%)

9647 existing lines in 190 files now uncovered.

199106 of 273291 relevant lines covered (72.85%)

15236719.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.25
/source/libs/stream/src/streamSessionState.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tstreamFileState.h"
17

18
#include "query.h"
19
#include "streamBackendRocksdb.h"
20
#include "tcommon.h"
21
#include "tsimplehash.h"
22

23
int sessionStateKeyCompare(const void* pWin1, const void* pDatas, int pos) {
35,953✔
24
  SRowBuffPos* pPos2 = taosArrayGetP(pDatas, pos);
35,953✔
25
  SSessionKey* pWin2 = (SSessionKey*)pPos2->pKey;
35,952✔
26
  return sessionWinKeyCmpr((SSessionKey*)pWin1, pWin2);
35,952✔
27
}
28

29
int sessionStateRangeKeyCompare(const SSessionKey* pWin1, const void* pDatas, int pos) {
1,067✔
30
  SRowBuffPos* pPos2 = taosArrayGetP(pDatas, pos);
1,067✔
31
  SSessionKey* pWin2 = (SSessionKey*)pPos2->pKey;
1,067✔
32
  return sessionRangeKeyCmpr(pWin1, pWin2);
1,067✔
33
}
34

35
int32_t binarySearch(void* keyList, int num, const void* key, __session_compare_fn_t cmpFn) {
22,962✔
36
  int firstPos = 0, lastPos = num - 1, midPos = -1;
22,962✔
37
  int numOfRows = 0;
22,962✔
38

39
  if (num <= 0) return -1;
22,962✔
40
  // find the first position which is smaller or equal than the key.
41
  // if all data is bigger than the key return -1
42
  while (1) {
43
    if (cmpFn(key, keyList, lastPos) >= 0) return lastPos;
23,358✔
44
    if (cmpFn(key, keyList, firstPos) == 0) return firstPos;
5,310✔
45
    if (cmpFn(key, keyList, firstPos) < 0) return firstPos - 1;
4,903✔
46

47
    numOfRows = lastPos - firstPos + 1;
2,217✔
48
    midPos = (numOfRows >> 1) + firstPos;
2,217✔
49

50
    if (cmpFn(key, keyList, midPos) < 0) {
2,217✔
51
      lastPos = midPos - 1;
1,347✔
52
    } else if (cmpFn(key, keyList, midPos) > 0) {
874✔
53
      firstPos = midPos + 1;
534✔
54
    } else {
55
      break;
343✔
56
    }
57
  }
58

59
  return midPos;
343✔
60
}
61

UNCOV
62
int64_t getSessionWindowEndkey(void* data, int32_t index) {
×
UNCOV
63
  SArray*       pWinInfos = (SArray*)data;
×
64
  SRowBuffPos** ppos = taosArrayGet(pWinInfos, index);
×
65
  if (ppos != NULL) {
×
66
    SSessionKey* pWin = (SSessionKey*)((*ppos)->pKey);
×
67
    return pWin->win.ekey;
×
68
  } else {
69
    return 0;
×
70
  }
71
}
72

73
bool inSessionWindow(SSessionKey* pKey, TSKEY ts, int64_t gap) {
9,637✔
74
  if (ts + gap >= pKey->win.skey && ts - gap <= pKey->win.ekey) {
9,637✔
75
    return true;
2,135✔
76
  }
77
  return false;
7,502✔
78
}
79

80
SStreamStateCur* createStateCursor(SStreamFileState* pFileState) {
11,002✔
81
  SStreamStateCur* pCur = createStreamStateCursor();
11,002✔
82
  if (pCur == NULL) {
11,017!
UNCOV
83
    return NULL;
×
84
  }
85
  pCur->pStreamFileState = pFileState;
11,017✔
86
  return pCur;
11,017✔
87
}
88

89
static int32_t addNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInfos, const SSessionKey* pKey,
5,628✔
90
                                   SRowBuffPos** ppPos) {
91
  int32_t      code = TSDB_CODE_SUCCESS;
5,628✔
92
  int32_t      lino = 0;
5,628✔
93
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
5,628✔
94
  if (!pNewPos || !pNewPos->pRowBuff) {
5,629!
UNCOV
95
    code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
96
    QUERY_CHECK_CODE(code, lino, _end);
×
97
  }
98

99
  memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
5,629✔
100
  void* tmp = taosArrayPush(pWinInfos, &pNewPos);
5,629✔
101
  if (!tmp) {
5,629!
UNCOV
102
    code = terrno;
×
UNCOV
103
    QUERY_CHECK_CODE(code, lino, _end);
×
104
  }
105
  (*ppPos) = pNewPos;
5,629✔
106

107
_end:
5,629✔
108
  if (code != TSDB_CODE_SUCCESS) {
5,629!
UNCOV
109
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
110
  }
111
  return code;
5,629✔
112
}
113

114
static int32_t insertNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInfos, const SSessionKey* pKey,
121✔
115
                                      int32_t index, SRowBuffPos** ppPos) {
116
  int32_t      code = TSDB_CODE_SUCCESS;
121✔
117
  int32_t      lino = 0;
121✔
118
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
121✔
119
  if (!pNewPos || !pNewPos->pRowBuff) {
121!
UNCOV
120
    code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
121
    QUERY_CHECK_CODE(code, lino, _end);
×
122
  }
123

124
  memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
121✔
125
  void* tmp = taosArrayInsert(pWinInfos, index, &pNewPos);
121✔
126
  if (!tmp) {
121!
UNCOV
127
    code = terrno;
×
UNCOV
128
    QUERY_CHECK_CODE(code, lino, _end);
×
129
  }
130

131
  *ppPos = pNewPos;
121✔
132

133
_end:
121✔
134
  if (code != TSDB_CODE_SUCCESS) {
121!
UNCOV
135
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
136
  }
137
  return code;
121✔
138
}
139

140
SRowBuffPos* createSessionWinBuff(SStreamFileState* pFileState, SSessionKey* pKey, void* p, int32_t* pVLen) {
413✔
141
  int32_t      code = TSDB_CODE_SUCCESS;
413✔
142
  int32_t      lino = 0;
413✔
143
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
413✔
144
  if (!pNewPos || !pNewPos->pRowBuff) {
413!
UNCOV
145
    code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
146
    QUERY_CHECK_CODE(code, lino, _end);
×
147
  }
148

149
  memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
413✔
150
  pNewPos->needFree = true;
413✔
151
  pNewPos->beFlushed = true;
413✔
152
  if (p) {
413✔
153
    memcpy(pNewPos->pRowBuff, p, *pVLen);
405✔
154
  } else {
155
    int32_t len = getRowStateRowSize(pFileState);
8✔
156
    memset(pNewPos->pRowBuff, 0, len);
8✔
157
  }
158

159
_end:
413✔
160
  taosMemoryFree(p);
413✔
161
  if (code != TSDB_CODE_SUCCESS) {
413!
UNCOV
162
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
163
    return NULL;
×
164
  }
165
  return pNewPos;
413✔
166
}
167

168
int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, int32_t* pVLen,
3,416✔
169
                                int32_t* pWinCode) {
170
  int32_t code = TSDB_CODE_SUCCESS;
3,416✔
171
  int32_t lino = 0;
3,416✔
172
  (*pWinCode) = TSDB_CODE_SUCCESS;
3,416✔
173
  SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
3,416✔
174
  SArray*    pWinStates = NULL;
3,418✔
175
  void**     ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t));
3,418✔
176
  if (ppBuff) {
3,418✔
177
    pWinStates = (SArray*)(*ppBuff);
2,340✔
178
  } else {
179
    pWinStates = taosArrayInit(16, POINTER_BYTES);
1,078✔
180
    if (!pWinStates) {
1,078!
UNCOV
181
      code = terrno;
×
UNCOV
182
      QUERY_CHECK_CODE(code, lino, _end);
×
183
    }
184
    code = tSimpleHashPut(pSessionBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
1,078✔
185
    QUERY_CHECK_CODE(code, lino, _end);
1,078!
186
  }
187

188
  TSKEY startTs = pKey->win.skey;
3,418✔
189
  TSKEY endTs = pKey->win.ekey;
3,418✔
190

191
  int32_t size = taosArrayGetSize(pWinStates);
3,418✔
192
  if (size == 0) {
3,418✔
193
    void*   pFileStore = getStateFileStore(pFileState);
1,183✔
194
    void*   p = NULL;
1,183✔
195
    int32_t code_file = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen);
1,183✔
196
    if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
1,183✔
197
      (*pVal) = createSessionWinBuff(pFileState, pKey, p, pVLen);
60✔
198
      if (!(*pVal)) {
60!
UNCOV
199
        code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
200
        QUERY_CHECK_CODE(code, lino, _end);
×
201
      }
202

203
      (*pWinCode) = code_file;
60✔
204
      qDebug("===stream===0 get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code_file);
60✔
205
    } else {
206
      code = addNewSessionWindow(pFileState, pWinStates, pKey, (SRowBuffPos**)pVal);
1,123✔
207
      (*pWinCode) = TSDB_CODE_FAILED;
1,123✔
208
      taosMemoryFree(p);
1,123✔
209
      QUERY_CHECK_CODE(code, lino, _end);
1,123!
210
    }
211
    goto _end;
1,183✔
212
  }
213

214
  // find the first position which is smaller than the pKey
215
  int32_t      index = binarySearch(pWinStates, size, pKey, sessionStateKeyCompare);
2,235✔
216
  SRowBuffPos* pPos = NULL;
2,236✔
217

218
  if (index >= 0) {
2,236✔
219
    pPos = taosArrayGetP(pWinStates, index);
1,911✔
220
    if (inSessionWindow(pPos->pKey, startTs, gap)) {
1,910✔
221
      (*pVal) = pPos;
893✔
222
      SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
893✔
223
      pPos->beUsed = true;
893✔
224
      pPos->beFlushed = false;
893✔
225
      *pKey = *pDestWinKey;
893✔
226
      goto _end;
893✔
227
    }
228
  }
229

230
  if (index + 1 < size) {
1,343✔
231
    pPos = taosArrayGetP(pWinStates, index + 1);
454✔
232
    if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap))) {
454!
233
      (*pVal) = pPos;
119✔
234
      SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
119✔
235
      pPos->beUsed = true;
119✔
236
      pPos->beFlushed = false;
119✔
237
      *pKey = *pDestWinKey;
119✔
238
      goto _end;
119✔
239
    }
240
  }
241

242
  if (index + 1 == 0) {
1,224✔
243
    if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs, gap)) {
286!
244
      void*   p = NULL;
261✔
245
      void*   pFileStore = getStateFileStore(pFileState);
261✔
246
      int32_t code_file = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen);
261✔
247
      if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
261!
248
        (*pVal) = createSessionWinBuff(pFileState, pKey, p, pVLen);
261✔
249
        if (!(*pVal)) {
261!
UNCOV
250
          code = TSDB_CODE_OUT_OF_MEMORY;
×
251
          QUERY_CHECK_CODE(code, lino, _end);
261!
252
        }
253

254
        (*pWinCode) = code_file;
261✔
255
        qDebug("===stream===1 get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code_file);
261✔
256
        goto _end;
261✔
257
      } else {
UNCOV
258
        taosMemoryFree(p);
×
259
      }
260
    }
261
  }
262

263
  if (index == size - 1) {
963✔
264
    code = addNewSessionWindow(pFileState, pWinStates, pKey, (SRowBuffPos**)pVal);
889✔
265
    QUERY_CHECK_CODE(code, lino, _end);
889!
266

267
    (*pWinCode) = TSDB_CODE_FAILED;
889✔
268
    goto _end;
889✔
269
  }
270

271
  code = insertNewSessionWindow(pFileState, pWinStates, pKey, index + 1, (SRowBuffPos**)pVal);
74✔
272
  QUERY_CHECK_CODE(code, lino, _end);
74!
273

274
  (*pWinCode) = TSDB_CODE_FAILED;
74✔
275

276
_end:
3,419✔
277
  if (code != TSDB_CODE_SUCCESS) {
3,419!
UNCOV
278
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
279
  }
280
  return code;
3,419✔
281
}
282

283
int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,
128✔
284
                          int32_t* pWinCode) {
285
  SWinKey*    pTmpkey = pKey;
128✔
286
  SSessionKey pWinKey = {.groupId = pTmpkey->groupId, .win.skey = pTmpkey->ts, .win.ekey = pTmpkey->ts};
128✔
287
  return getSessionWinResultBuff(pFileState, &pWinKey, 0, pVal, pVLen, pWinCode);
128✔
288
}
289

290
int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
141✔
291
  int32_t      code = TSDB_CODE_SUCCESS;
141✔
292
  int32_t      lino = 0;
141✔
293
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
141✔
294
  SSessionKey* pKey = pPos->pKey;
141✔
295
  SArray*      pWinStates = NULL;
141✔
296
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t));
141✔
297
  if (ppBuff) {
141✔
298
    pWinStates = (SArray*)(*ppBuff);
113✔
299
  } else {
300
    pWinStates = taosArrayInit(16, POINTER_BYTES);
28✔
301
    if (!pWinStates) {
28!
UNCOV
302
      code = terrno;
×
UNCOV
303
      QUERY_CHECK_CODE(code, lino, _end);
×
304
    }
305

306
    code = tSimpleHashPut(pSessionBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
28✔
307
    QUERY_CHECK_CODE(code, lino, _end);
28!
308
  }
309

310
  int32_t size = taosArrayGetSize(pWinStates);
141✔
311
  if (size == 0) {
141✔
312
    void* tmp = taosArrayPush(pWinStates, &pPos);
120✔
313
    if (!tmp) {
120!
UNCOV
314
      code = terrno;
×
UNCOV
315
      QUERY_CHECK_CODE(code, lino, _end);
×
316
    }
317
    goto _end;
120✔
318
  }
319

320
  // find the first position which is smaller than the pKey
321
  int32_t index = binarySearch(pWinStates, size, pKey, sessionStateKeyCompare);
21✔
322
  if (index >= 0) {
21!
UNCOV
323
    void* tmp = taosArrayInsert(pWinStates, index, &pPos);
×
UNCOV
324
    if (!tmp) {
×
325
      code = terrno;
×
326
      QUERY_CHECK_CODE(code, lino, _end);
×
327
    }
328
  } else {
329
    void* tmp = taosArrayInsert(pWinStates, 0, &pPos);
21✔
330
    if (!tmp) {
21!
UNCOV
331
      code = terrno;
×
UNCOV
332
      QUERY_CHECK_CODE(code, lino, _end);
×
333
    }
334
  }
335

336
_end:
21✔
337
  pPos->needFree = false;
141✔
338
  if (code != TSDB_CODE_SUCCESS) {
141!
UNCOV
339
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
340
  }
341
  return code;
141✔
342
}
343

344
int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen,
346✔
345
                              int32_t* pWinCode) {
346
  int32_t      code = TSDB_CODE_SUCCESS;
346✔
347
  int32_t      lino = 0;
346✔
348
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
346✔
349
  if (!pNewPos || !pNewPos->pRowBuff) {
346!
UNCOV
350
    code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
351
    QUERY_CHECK_CODE(code, lino, _end);
×
352
  }
353
  pNewPos->needFree = true;
346✔
354
  pNewPos->beFlushed = true;
346✔
355
  void* pBuff = NULL;
346✔
356
  (*pWinCode) = streamStateSessionGet_rocksdb(getStateFileStore(pFileState), pKey, &pBuff, pVLen);
346✔
357
  if ((*pWinCode) != TSDB_CODE_SUCCESS) {
346!
UNCOV
358
    goto _end;
×
359
  }
360
  memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
346✔
361
  memcpy(pNewPos->pRowBuff, pBuff, *pVLen);
346✔
362
  taosMemoryFreeClear(pBuff);
346!
363
  (*pVal) = pNewPos;
346✔
364

365
_end:
346✔
366
  if (code != TSDB_CODE_SUCCESS) {
346!
UNCOV
367
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
368
  }
369
  return code;
346✔
370
}
371

372
int32_t deleteSessionWinStateBuffFn(void* pBuff, const void* key, size_t keyLen) {
2,010✔
373
  SSHashObj*   pSessionBuff = (SSHashObj*)pBuff;
2,010✔
374
  SSessionKey* pWinKey = (SSessionKey*)key;
2,010✔
375
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
2,010✔
376
  if (!ppBuff) {
2,010✔
377
    return TSDB_CODE_SUCCESS;
523✔
378
  }
379
  SArray* pWinStates = (SArray*)(*ppBuff);
1,487✔
380
  int32_t size = taosArrayGetSize(pWinStates);
1,487✔
381
  TSKEY   gap = 0;
1,487✔
382
  int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
1,487✔
383
  if (index >= 0) {
1,487✔
384
    SRowBuffPos* pPos = taosArrayGetP(pWinStates, index);
1,057✔
385
    if (inSessionWindow(pPos->pKey, pWinKey->win.skey, gap)) {
1,056!
386
      pPos->beFlushed = true;
1,056✔
387
      taosArrayRemove(pWinStates, index);
1,056✔
388
    }
389
  }
390
  return TSDB_CODE_SUCCESS;
1,486✔
391
}
392

393
void deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) {
125✔
394
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
125✔
395
  SSessionKey* pWinKey = (SSessionKey*)pPos->pKey;
125✔
396
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
125✔
397
  if (!ppBuff) {
125!
UNCOV
398
    return;
×
399
  }
400
  SArray* pWinStates = (SArray*)(*ppBuff);
125✔
401
  int32_t size = taosArrayGetSize(pWinStates);
125✔
402
  TSKEY   gap = 0;
125✔
403
  int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
125✔
404
  if (index >= 0) {
125✔
405
    SRowBuffPos* pItemPos = taosArrayGetP(pWinStates, index);
26✔
406
    if (pItemPos == pPos) {
26!
407
      taosArrayRemove(pWinStates, index);
26✔
408
    }
409
  }
410
}
411

412
int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStreamStateCur* pCur,
473✔
413
                                           const SSessionKey* pWinKey, void** ppVal, int32_t* pVLen) {
414
  int32_t      code = TSDB_CODE_SUCCESS;
473✔
415
  int32_t      lino = 0;
473✔
416
  SRowBuffPos* pNewPos = NULL;
473✔
417
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
473✔
418
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
473✔
419
  SArray*      pWinStates = NULL;
474✔
420
  if (!ppBuff) {
474✔
421
    pWinStates = taosArrayInit(16, POINTER_BYTES);
89✔
422
    if (!pWinStates) {
89!
UNCOV
423
      code = terrno;
×
UNCOV
424
      QUERY_CHECK_CODE(code, lino, _end);
×
425
    }
426

427
    code = tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
89✔
428
    QUERY_CHECK_CODE(code, lino, _end);
89!
429
  } else {
430
    pWinStates = (SArray*)(*ppBuff);
385✔
431
  }
432
  if (!pCur) {
474✔
433
    code = addNewSessionWindow(pFileState, pWinStates, pWinKey, &pNewPos);
93✔
434
    QUERY_CHECK_CODE(code, lino, _end);
93!
435

436
    goto _end;
93✔
437
  }
438

439
  int32_t size = taosArrayGetSize(pWinStates);
381✔
440
  if (pCur->buffIndex >= 0) {
381✔
441
    if (pCur->buffIndex >= size) {
359✔
442
      code = addNewSessionWindow(pFileState, pWinStates, pWinKey, &pNewPos);
332✔
443
      QUERY_CHECK_CODE(code, lino, _end);
332!
444

445
      goto _end;
332✔
446
    }
447
    code = insertNewSessionWindow(pFileState, pWinStates, pWinKey, pCur->buffIndex, &pNewPos);
27✔
448
    QUERY_CHECK_CODE(code, lino, _end);
27!
449

450
    goto _end;
27✔
451
  } else {
452
    if (size > 0) {
22✔
453
      SRowBuffPos* pPos = taosArrayGetP(pWinStates, 0);
20✔
454
      if (sessionWinKeyCmpr(pWinKey, pPos->pKey) >= 0) {
20!
455
        // pCur is invalid
UNCOV
456
        SSessionKey pTmpKey = *pWinKey;
×
UNCOV
457
        int32_t     winCode = TSDB_CODE_SUCCESS;
×
458
        code = getSessionWinResultBuff(pFileState, &pTmpKey, 0, (void**)&pNewPos, pVLen, &winCode);
×
459
        QUERY_CHECK_CONDITION((winCode == TSDB_CODE_FAILED), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
460
        QUERY_CHECK_CODE(code, lino, _end);
×
461
        goto _end;
×
462
      }
463
    }
464
    pNewPos = getNewRowPosForWrite(pFileState);
22✔
465
    if (!pNewPos || !pNewPos->pRowBuff) {
22!
UNCOV
466
      code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
467
      QUERY_CHECK_CODE(code, lino, _end);
×
468
    }
469

470
    memcpy(pNewPos->pKey, pWinKey, sizeof(SSessionKey));
22✔
471
    pNewPos->needFree = true;
22✔
472
    pNewPos->beFlushed = true;
22✔
473
  }
474

475
_end:
474✔
476
  (*ppVal) = pNewPos;
474✔
477
  if (code != TSDB_CODE_SUCCESS) {
474!
UNCOV
478
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
479
  }
480
  return code;
474✔
481
}
482

483
void sessionWinStateClear(SStreamFileState* pFileState) {
807✔
484
  int32_t buffSize = getRowStateRowSize(pFileState);
807✔
485
  void*   pIte = NULL;
807✔
486
  size_t  keyLen = 0;
807✔
487
  int32_t iter = 0;
807✔
488
  void*   pBuff = getRowStateBuff(pFileState);
807✔
489
  while ((pIte = tSimpleHashIterate(pBuff, pIte, &iter)) != NULL) {
3,461✔
490
    SArray* pWinStates = *((void**)pIte);
2,654✔
491
    int32_t size = taosArrayGetSize(pWinStates);
2,654✔
492
    for (int32_t i = 0; i < size; i++) {
5,580✔
493
      SRowBuffPos* pPos = taosArrayGetP(pWinStates, i);
2,926✔
494
      memset(pPos->pRowBuff, 0, buffSize);
2,926✔
495
    }
496
  }
497
}
807✔
498

499
void sessionWinStateCleanup(void* pBuff) {
7,354✔
500
  void*   pIte = NULL;
7,354✔
501
  size_t  keyLen = 0;
7,354✔
502
  int32_t iter = 0;
7,354✔
503
  while ((pIte = tSimpleHashIterate(pBuff, pIte, &iter)) != NULL) {
10,528✔
504
    SArray* pWinStates = (SArray*)(*(void**)pIte);
3,174✔
505
    taosArrayDestroy(pWinStates);
3,174✔
506
  }
507
  tSimpleHashCleanup(pBuff);
7,356✔
508
}
7,356✔
509

510
static SStreamStateCur* seekKeyCurrentPrev_buff(SStreamFileState* pFileState, const SSessionKey* pWinKey,
13,798✔
511
                                                SArray** pWins, int32_t* pIndex) {
512
  SStreamStateCur* pCur = NULL;
13,798✔
513
  SSHashObj*       pSessionBuff = getRowStateBuff(pFileState);
13,798✔
514
  void**           ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
13,799✔
515
  if (!ppBuff) {
13,802✔
516
    return NULL;
1,132✔
517
  }
518

519
  SArray* pWinStates = (SArray*)(*ppBuff);
12,670✔
520
  int32_t size = taosArrayGetSize(pWinStates);
12,670✔
521
  TSKEY   gap = 0;
12,669✔
522
  int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
12,669✔
523

524
  if (pWins) {
12,656✔
525
    (*pWins) = pWinStates;
7,893✔
526
  }
527

528
  if (size > 0 && index == -1) {
12,656✔
529
    SRowBuffPos* pPos = taosArrayGetP(pWinStates, 0);
1,633✔
530
    SSessionKey* pWin = (SSessionKey*)pPos->pKey;
1,632✔
531
    if (pWinKey->win.skey == pWin->win.skey) {
1,632✔
532
      index = 0;
290✔
533
    }
534
  }
535

536
  if (index >= 0) {
12,655✔
537
    pCur = createStateCursor(pFileState);
9,973✔
538
    if (pCur == NULL) {
9,988!
UNCOV
539
      return NULL;
×
540
    }
541
    pCur->buffIndex = index;
9,988✔
542
    if (pIndex) {
9,988✔
543
      *pIndex = index;
6,665✔
544
    }
545
  }
546

547
  return pCur;
12,670✔
548
}
549

550
SStreamStateCur* sessionWinStateSeekKeyCurrentPrev(SStreamFileState* pFileState, const SSessionKey* pWinKey) {
5,069✔
551
  SStreamStateCur* pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, NULL, NULL);
5,069✔
552
  if (pCur) {
5,072✔
553
    return pCur;
3,323✔
554
  }
555

556
  void* pFileStore = getStateFileStore(pFileState);
1,749✔
557
  pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pFileStore, pWinKey);
1,749✔
558
  if (!pCur) {
1,748✔
559
    return NULL;
729✔
560
  }
561
  pCur->buffIndex = -1;
1,019✔
562
  pCur->pStreamFileState = pFileState;
1,019✔
563
  return pCur;
1,019✔
564
}
565
static void transformCursor(SStreamFileState* pFileState, SStreamStateCur* pCur) {
489✔
566
  if (!pCur) {
489!
UNCOV
567
    return;
×
568
  }
569
  streamStateResetCur(pCur);
489✔
570
  pCur->buffIndex = 0;
489✔
571
  pCur->pStreamFileState = pFileState;
489✔
572
}
573

574
static void checkAndTransformCursor(SStreamFileState* pFileState, const uint64_t groupId, SArray* pWinStates,
1,851✔
575
                                    SStreamStateCur** ppCur) {
576
  SSessionKey key = {.groupId = groupId};
1,851✔
577
  int32_t     code = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pFileState), *ppCur, &key, NULL, NULL);
1,851✔
578
  if (taosArrayGetSize(pWinStates) > 0 &&
1,851✔
579
      (code == TSDB_CODE_FAILED || sessionStateKeyCompare(&key, pWinStates, 0) >= 0)) {
74!
580
    if (!(*ppCur)) {
467✔
581
      (*ppCur) = createStateCursor(pFileState);
465✔
582
    }
583
    transformCursor(pFileState, *ppCur);
467✔
584
  } else if (*ppCur) {
1,384✔
585
    (*ppCur)->buffIndex = -1;
550✔
586
    (*ppCur)->pStreamFileState = pFileState;
550✔
587
  }
588
}
1,851✔
589

590
SStreamStateCur* sessionWinStateSeekKeyCurrentNext(SStreamFileState* pFileState, const SSessionKey* pWinKey) {
342✔
591
  SArray*          pWinStates = NULL;
342✔
592
  int32_t          index = -1;
342✔
593
  SStreamStateCur* pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index);
342✔
594
  if (pCur) {
342✔
595
    if (sessionStateRangeKeyCompare(pWinKey, pWinStates, index) > 0) {
157✔
596
      sessionWinStateMoveToNext(pCur);
25✔
597
    }
598
    return pCur;
158✔
599
  }
600

601
  void* pFileStore = getStateFileStore(pFileState);
185✔
602
  pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pFileStore, (SSessionKey*)pWinKey);
184✔
603
  checkAndTransformCursor(pFileState, pWinKey->groupId, pWinStates, &pCur);
184✔
604
  return pCur;
184✔
605
}
606

607
SStreamStateCur* sessionWinStateSeekKeyNext(SStreamFileState* pFileState, const SSessionKey* pWinKey) {
8,008✔
608
  SArray*          pWinStates = NULL;
8,008✔
609
  int32_t          index = -1;
8,008✔
610
  SStreamStateCur* pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index);
8,008✔
611
  if (pCur) {
8,010✔
612
    sessionWinStateMoveToNext(pCur);
6,344✔
613
    return pCur;
6,344✔
614
  }
615

616
  void* pFileStore = getStateFileStore(pFileState);
1,666✔
617
  pCur = streamStateSessionSeekKeyNext_rocksdb(pFileStore, pWinKey);
1,666✔
618
  checkAndTransformCursor(pFileState, pWinKey->groupId, pWinStates, &pCur);
1,667✔
619
  return pCur;
1,666✔
620
}
621

622
SStreamStateCur* countWinStateSeekKeyPrev(SStreamFileState* pFileState, const SSessionKey* pWinKey, COUNT_TYPE count) {
378✔
623
  SArray*          pWinStates = NULL;
378✔
624
  int32_t          index = -1;
378✔
625
  SStreamStateCur* pBuffCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index);
378✔
626
  int32_t          resSize = getRowStateRowSize(pFileState);
378✔
627
  COUNT_TYPE       winCount = 0;
378✔
628
  if (pBuffCur) {
378✔
629
    while (index >= 0) {
341✔
630
      SRowBuffPos* pPos = taosArrayGetP(pWinStates, index);
210✔
631
      winCount = *((COUNT_TYPE*)((char*)pPos->pRowBuff + (resSize - sizeof(COUNT_TYPE))));
210✔
632
      if (sessionStateRangeKeyCompare(pWinKey, pWinStates, index) == 0 || winCount < count) {
210✔
633
        index--;
178✔
634
      } else if (index >= 0) {
32!
635
        pBuffCur->buffIndex = index + 1;
32✔
636
        return pBuffCur;
32✔
637
      }
638
    }
639
    pBuffCur->buffIndex = 0;
131✔
640
  } else if (taosArrayGetSize(pWinStates) > 0) {
215!
UNCOV
641
    pBuffCur = createStateCursor(pFileState);
×
UNCOV
642
    if (pBuffCur == NULL) {
×
643
      return NULL;
×
644
    }
645
    pBuffCur->buffIndex = 0;
×
646
  }
647

648
  void*            pFileStore = getStateFileStore(pFileState);
346✔
649
  SStreamStateCur* pCur = streamStateSessionSeekKeyPrev_rocksdb(pFileStore, pWinKey);
346✔
650
  if (pCur) {
345!
UNCOV
651
    pCur->pStreamFileState = pFileState;
×
UNCOV
652
    SSessionKey key = {0};
×
653
    void*       pVal = NULL;
×
654
    int         len = 0;
×
655
    int32_t     code = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, &key, &pVal, &len);
×
656
    if (code == TSDB_CODE_FAILED) {
×
657
      streamStateFreeCur(pCur);
×
658
      return pBuffCur;
×
659
    }
660
    winCount = *((COUNT_TYPE*)((char*)pVal + (resSize - sizeof(COUNT_TYPE))));
×
UNCOV
661
    taosMemoryFreeClear(pVal);
×
662
    streamStateFreeCur(pBuffCur);
×
663
    if (sessionRangeKeyCmpr(pWinKey, &key) != 0 && winCount == count) {
×
664
      streamStateCurNext(pFileStore, pCur);
×
665
      return pCur;
×
666
    }
667
    streamStateCurPrev(pFileStore, pCur);
×
668
    while (1) {
669
      code = streamStateSessionGetKVByCur_rocksdb(NULL, pCur, &key, &pVal, &len);
×
UNCOV
670
      if (code == TSDB_CODE_FAILED) {
×
671
        streamStateCurNext(pFileStore, pCur);
×
672
        return pCur;
×
673
      }
674
      winCount = *((COUNT_TYPE*)((char*)pVal + (resSize - sizeof(COUNT_TYPE))));
×
UNCOV
675
      taosMemoryFreeClear(pVal);
×
676
      if (sessionRangeKeyCmpr(pWinKey, &key) == 0 || winCount < count) {
×
677
        streamStateCurPrev(pFileStore, pCur);
×
678
      } else {
679
        streamStateCurNext(pFileStore, pCur);
×
UNCOV
680
        return pCur;
×
681
      }
682
    }
683
  }
684
  return pBuffCur;
345✔
685
}
686

687
int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
15,378✔
688
  if (!pCur) {
15,378✔
689
    return TSDB_CODE_FAILED;
1,870✔
690
  }
691
  int32_t code = TSDB_CODE_SUCCESS;
13,508✔
692

693
  SSHashObj* pSessionBuff = getRowStateBuff(pCur->pStreamFileState);
13,508✔
694
  void**     ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t));
13,509✔
695
  SArray*    pWinStates = NULL;
13,513✔
696
  if (ppBuff) {
13,513✔
697
    pWinStates = (SArray*)(*ppBuff);
13,094✔
698
  }
699

700
  if (pCur->buffIndex >= 0) {
13,513✔
701
    int32_t size = taosArrayGetSize(pWinStates);
11,833✔
702
    if (pCur->buffIndex >= size) {
11,833✔
703
      return TSDB_CODE_FAILED;
6,888✔
704
    }
705
    SRowBuffPos* pPos = taosArrayGetP(pWinStates, pCur->buffIndex);
4,945✔
706
    if (pVal) {
4,943✔
707
      *pVal = pPos;
1,118✔
708
    }
709
    *pKey = *(SSessionKey*)(pPos->pKey);
4,943✔
710
  } else {
711
    void* pData = NULL;
1,680✔
712
    code = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pCur->pStreamFileState), pCur, pKey, &pData, pVLen);
1,680✔
713
    if (taosArrayGetSize(pWinStates) > 0 &&
1,680✔
714
        (code == TSDB_CODE_FAILED || sessionStateRangeKeyCompare(pKey, pWinStates, 0) >= 0)) {
699!
715
      transformCursor(pCur->pStreamFileState, pCur);
22✔
716
      SRowBuffPos* pPos = taosArrayGetP(pWinStates, pCur->buffIndex);
22✔
717
      if (pVal) {
22!
UNCOV
718
        *pVal = pPos;
×
719
      }
720
      *pKey = *(SSessionKey*)(pPos->pKey);
22✔
721
      code = TSDB_CODE_SUCCESS;
22✔
722
    } else if (code == TSDB_CODE_SUCCESS && pVal) {
1,658✔
723
      SRowBuffPos* pNewPos = getNewRowPosForWrite(pCur->pStreamFileState);
136✔
724
      if (!pNewPos || !pNewPos->pRowBuff) {
136!
UNCOV
725
        code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
726
        taosMemoryFreeClear(pData);
×
727
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
728
        return code;
×
729
      }
730
      memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
136✔
731
      pNewPos->needFree = true;
136✔
732
      pNewPos->beFlushed = true;
136✔
733
      memcpy(pNewPos->pRowBuff, pData, *pVLen);
136✔
734
      (*pVal) = pNewPos;
136✔
735
    }
736
    taosMemoryFreeClear(pData);
1,680✔
737
  }
738
  return code;
6,623✔
739
}
740

741
void sessionWinStateMoveToNext(SStreamStateCur* pCur) {
11,427✔
742
  qTrace("move cursor to next");
11,427!
743
  if (pCur && pCur->buffIndex >= 0) {
11,427✔
744
    pCur->buffIndex++;
7,756✔
745
  } else {
746
    streamStateCurNext_rocksdb(pCur);
3,671✔
747
  }
748
}
11,429✔
749

750
int32_t sessionWinStateGetKeyByRange(SStreamFileState* pFileState, const SSessionKey* key, SSessionKey* curKey,
4,522✔
751
                                     range_cmpr_fn cmpFn) {
752
  SStreamStateCur* pCur = sessionWinStateSeekKeyCurrentPrev(pFileState, key);
4,522✔
753
  SSessionKey      tmpKey = *key;
4,522✔
754
  int32_t          code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL);
4,522✔
755
  bool             hasCurrentPrev = true;
4,519✔
756
  if (code == TSDB_CODE_FAILED) {
4,519✔
757
    streamStateFreeCur(pCur);
848✔
758
    pCur = sessionWinStateSeekKeyNext(pFileState, key);
848✔
759
    code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL);
850✔
760
    hasCurrentPrev = false;
850✔
761
  }
762

763
  if (code == TSDB_CODE_FAILED) {
4,521✔
764
    code = TSDB_CODE_FAILED;
614✔
765
    goto _end;
614✔
766
  }
767

768
  if (cmpFn(key, &tmpKey) == 0) {
3,907✔
769
    *curKey = tmpKey;
2,896✔
770
    goto _end;
2,896✔
771
  } else if (!hasCurrentPrev) {
1,011✔
772
    code = TSDB_CODE_FAILED;
215✔
773
    goto _end;
215✔
774
  }
775

776
  sessionWinStateMoveToNext(pCur);
796✔
777
  code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL);
797✔
778
  if (code == TSDB_CODE_SUCCESS && cmpFn(key, &tmpKey) == 0) {
799✔
779
    *curKey = tmpKey;
365✔
780
  } else {
781
    code = TSDB_CODE_FAILED;
434✔
782
  }
783

784
_end:
4,524✔
785
  streamStateFreeCur(pCur);
4,524✔
786
  return code;
4,523✔
787
}
788

789
int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
3,025✔
790
                              state_key_cmpr_fn fn, void** pVal, int32_t* pVLen, int32_t* pWinCode) {
791
  int32_t code = TSDB_CODE_SUCCESS;
3,025✔
792
  int32_t lino = 0;
3,025✔
793
  (*pWinCode) = TSDB_CODE_SUCCESS;
3,025✔
794

795
  SSessionKey* pWinKey = key;
3,025✔
796
  TSKEY        gap = 0;
3,025✔
797
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
3,025✔
798
  SArray*      pWinStates = NULL;
3,025✔
799

800
  void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
3,025✔
801
  if (ppBuff) {
3,025✔
802
    pWinStates = (SArray*)(*ppBuff);
2,598✔
803
  } else {
804
    pWinStates = taosArrayInit(16, POINTER_BYTES);
427✔
805
    if (!pWinStates) {
427!
UNCOV
806
      code = terrno;
×
UNCOV
807
      QUERY_CHECK_CODE(code, lino, _end);
×
808
    }
809

810
    code = tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
427✔
811
    QUERY_CHECK_CODE(code, lino, _end);
427!
812
  }
813

814
  TSKEY startTs = pWinKey->win.skey;
3,025✔
815
  TSKEY endTs = pWinKey->win.ekey;
3,025✔
816

817
  int32_t size = taosArrayGetSize(pWinStates);
3,025✔
818
  if (size == 0) {
3,025✔
819
    void*   pFileStore = getStateFileStore(pFileState);
735✔
820
    void*   p = NULL;
735✔
821
    int32_t code_file = streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, pVLen);
735✔
822
    if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
735✔
823
      (*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen);
26✔
824
      if (!(*pVal)) {
26!
UNCOV
825
        code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
826
        QUERY_CHECK_CODE(code, lino, _end);
×
827
      }
828

829
      (*pWinCode) = code_file;
26✔
830
      qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
26✔
831
             pWinKey->win.ekey, code_file);
832
    } else {
833
      code = addNewSessionWindow(pFileState, pWinStates, key, (SRowBuffPos**)pVal);
709✔
834
      (*pWinCode) = TSDB_CODE_FAILED;
709✔
835
      taosMemoryFree(p);
709✔
836
      QUERY_CHECK_CODE(code, lino, _end);
709!
837
    }
838
    goto _end;
735✔
839
  }
840

841
  // find the first position which is smaller than the pWinKey
842
  int32_t      index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
2,290✔
843
  SRowBuffPos* pPos = NULL;
2,290✔
844
  int32_t      valSize = *pVLen;
2,290✔
845

846
  if (index >= 0) {
2,290✔
847
    pPos = taosArrayGetP(pWinStates, index);
2,269✔
848
    void* stateKey = (char*)(pPos->pRowBuff) + (valSize - keyDataLen);
2,269✔
849
    if (inSessionWindow(pPos->pKey, startTs, gap) || fn(pKeyData, stateKey) == true) {
2,269✔
850
      (*pVal) = pPos;
1,930✔
851
      SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
1,930✔
852
      pPos->beUsed = true;
1,930✔
853
      pPos->beFlushed = false;
1,930✔
854
      *key = *pDestWinKey;
1,930✔
855
      goto _end;
1,930✔
856
    }
857
  }
858

859
  if (index + 1 < size) {
360✔
860
    pPos = taosArrayGetP(pWinStates, index + 1);
34✔
861
    void* stateKey = (char*)(pPos->pRowBuff) + (valSize - keyDataLen);
34✔
862
    if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap)) ||
66!
863
        fn(pKeyData, stateKey) == true) {
32✔
864
      (*pVal) = pPos;
4✔
865
      SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
4✔
866
      pPos->beUsed = true;
4✔
867
      pPos->beFlushed = false;
4✔
868
      *key = *pDestWinKey;
4✔
869
      goto _end;
4✔
870
    }
871
  }
872

873
  if (index + 1 == 0) {
356✔
874
    if (!isDeteled(pFileState, endTs)) {
19!
875
      void*   p = NULL;
19✔
876
      void*   pFileStore = getStateFileStore(pFileState);
19✔
877
      int32_t code_file =
878
          streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, pVLen);
19✔
879
      if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
19✔
880
        (*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen);
10✔
881
        if (!(*pVal)) {
10!
UNCOV
882
          code = TSDB_CODE_OUT_OF_MEMORY;
×
883
          QUERY_CHECK_CODE(code, lino, _end);
10!
884
        }
885

886
        (*pWinCode) = code_file;
10✔
887
        qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
10✔
888
               pWinKey->win.ekey, code_file);
889
        goto _end;
10✔
890
      } else {
891
        taosMemoryFree(p);
9✔
892
      }
893
    }
894
  }
895

896
  if (index == size - 1) {
346✔
897
    code = addNewSessionWindow(pFileState, pWinStates, key, (SRowBuffPos**)pVal);
326✔
898
    QUERY_CHECK_CODE(code, lino, _end);
326!
899

900
    (*pWinCode) = TSDB_CODE_FAILED;
326✔
901
    goto _end;
326✔
902
  }
903
  code = insertNewSessionWindow(pFileState, pWinStates, key, index + 1, (SRowBuffPos**)pVal);
20✔
904
  QUERY_CHECK_CODE(code, lino, _end);
20!
905

906
  (*pWinCode) = TSDB_CODE_FAILED;
20✔
907

908
_end:
3,025✔
909
  if (code != TSDB_CODE_SUCCESS) {
3,025!
UNCOV
910
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
911
  }
912
  return code;
3,025✔
913
}
914

915
int32_t getCountWinStateFromDisc(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
1,544✔
916
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, pKey);
1,544✔
917
  int32_t          code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, pKey, pVal, pVLen);
1,544✔
918
  streamStateFreeCur(pCur);
1,544✔
919
  if (code == TSDB_CODE_SUCCESS) {
1,544!
UNCOV
920
    return code;
×
921
  } else {
922
    pCur = streamStateSessionSeekKeyPrev_rocksdb(pState, pKey);
1,544✔
923
  }
924

925
  code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, pKey, pVal, pVLen);
1,544✔
926
  streamStateFreeCur(pCur);
1,544✔
927
  return code;
1,544✔
928
}
929

930
int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal,
4,823✔
931
                              int32_t* pVLen, int32_t* pWinCount) {
932
  int32_t code = TSDB_CODE_SUCCESS;
4,823✔
933
  int32_t lino = 0;
4,823✔
934
  (*pWinCount) = TSDB_CODE_SUCCESS;
4,823✔
935

936
  SSessionKey* pWinKey = pKey;
4,823✔
937
  const TSKEY  gap = 0;
4,823✔
938
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
4,823✔
939
  SArray*      pWinStates = NULL;
4,828✔
940
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
4,828✔
941
  if (ppBuff) {
4,832✔
942
    pWinStates = (SArray*)(*ppBuff);
3,511✔
943
  } else {
944
    pWinStates = taosArrayInit(16, POINTER_BYTES);
1,321✔
945
    if (!pWinStates) {
1,321!
UNCOV
946
      code = terrno;
×
UNCOV
947
      QUERY_CHECK_CODE(code, lino, _end);
×
948
    }
949

950
    code = tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
1,321✔
951
    QUERY_CHECK_CODE(code, lino, _end);
1,320!
952
  }
953

954
  TSKEY startTs = pWinKey->win.skey;
4,831✔
955
  TSKEY endTs = pWinKey->win.ekey;
4,831✔
956

957
  int32_t size = taosArrayGetSize(pWinStates);
4,831✔
958
  if (size == 0) {
4,831✔
959
    void* pFileStore = getStateFileStore(pFileState);
1,329✔
960
    void* pRockVal = NULL;
1,329✔
961
    (*pWinCount) = getCountWinStateFromDisc(pFileStore, pWinKey, &pRockVal, pVLen);
1,329✔
962
    if ((*pWinCount) == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
1,329!
963
      qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
56✔
964
             pWinKey->win.ekey, (*pWinCount));
965
      if ((*pWinCount) == TSDB_CODE_SUCCESS) {
56!
966
        int32_t     valSize = *pVLen;
56✔
967
        COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)((char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)));
56✔
968
        if (inSessionWindow(pWinKey, startTs, gap) || (*pWinStateCout) < winCount) {
56!
969
          (*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
48✔
970
          if (!(*pVal)) {
48!
UNCOV
971
            code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
972
            QUERY_CHECK_CODE(code, lino, _end);
×
973
          }
974

975
          goto _end;
48✔
976
        }
977
      }
978
      pWinKey->win.skey = startTs;
8✔
979
      pWinKey->win.ekey = endTs;
8✔
980
      (*pVal) = createSessionWinBuff(pFileState, pWinKey, NULL, NULL);
8✔
981
      taosMemoryFree(pRockVal);
8✔
982
      if (!(*pVal)) {
8!
UNCOV
983
        code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
984
        QUERY_CHECK_CODE(code, lino, _end);
×
985
      }
986
    } else {
987
      code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
1,273✔
988
      QUERY_CHECK_CODE(code, lino, _end);
1,273!
989

990
      (*pWinCount) = TSDB_CODE_FAILED;
1,273✔
991
    }
992
    goto _end;
1,281✔
993
  }
994

995
  // find the first position which is smaller than the pWinKey
996
  int32_t      index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
3,502✔
997
  SRowBuffPos* pPos = NULL;
3,493✔
998
  int32_t      valSize = *pVLen;
3,493✔
999

1000
  if (index >= 0) {
3,493✔
1001
    pPos = taosArrayGetP(pWinStates, index);
3,486✔
1002
    COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)((char*)(pPos->pRowBuff) + (valSize - sizeof(COUNT_TYPE)));
3,490✔
1003
    if (inSessionWindow(pPos->pKey, startTs, gap) || (index == size - 1 && (*pWinStateCout) < winCount)) {
3,490✔
1004
      (*pVal) = pPos;
2,825✔
1005
      SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
2,825✔
1006
      pPos->beUsed = true;
2,825✔
1007
      pPos->beFlushed = false;
2,825✔
1008
      *pWinKey = *pDestWinKey;
2,825✔
1009
      goto _end;
2,825✔
1010
    }
1011
  }
1012

1013
  if (index == -1) {
673✔
1014
    if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs, 0)) {
10!
UNCOV
1015
      SSessionKey tmpKey = *pWinKey;
×
UNCOV
1016
      void*       pRockVal = NULL;
×
1017
      void*       pFileStore = getStateFileStore(pFileState);
×
1018
      int32_t     code_file = getCountWinStateFromDisc(pFileStore, &tmpKey, &pRockVal, pVLen);
×
1019
      if (code_file == TSDB_CODE_SUCCESS) {
×
1020
        SRowBuffPos* pFirstPos = taosArrayGetP(pWinStates, 0);
×
1021
        SSessionKey* pFirstWinKey = (SSessionKey*)pFirstPos->pKey;
×
1022
        if (tmpKey.win.ekey < pFirstWinKey->win.skey) {
×
1023
          *pWinKey = tmpKey;
×
1024
          (*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
×
1025
          if (!(*pVal)) {
×
1026
            code = TSDB_CODE_OUT_OF_MEMORY;
×
1027
            QUERY_CHECK_CODE(code, lino, _end);
×
1028
          }
1029

UNCOV
1030
          (*pWinCount) = code_file;
×
UNCOV
1031
          qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
×
1032
                 pWinKey->win.ekey, code_file);
1033
          goto _end;
×
1034
        }
1035
      }
UNCOV
1036
      taosMemoryFree(pRockVal);
×
1037
    }
1038
  }
1039

1040
  if (index + 1 < size) {
676✔
1041
    pPos = taosArrayGetP(pWinStates, index + 1);
42✔
1042
    (*pVal) = pPos;
42✔
1043
    SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
42✔
1044
    pPos->beUsed = true;
42✔
1045
    pPos->beFlushed = false;
42✔
1046
    *pWinKey = *pDestWinKey;
42✔
1047
    goto _end;
42✔
1048
  }
1049

1050
  code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
634✔
1051
  QUERY_CHECK_CODE(code, lino, _end);
634!
1052

1053
  (*pWinCount) = TSDB_CODE_FAILED;
634✔
1054

1055
_end:
4,830✔
1056
  if (code != TSDB_CODE_SUCCESS) {
4,830!
UNCOV
1057
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1058
  }
1059
  return code;
4,827✔
1060
}
1061

1062
int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal,
250✔
1063
                                 int32_t* pVLen) {
1064
  SSessionKey* pWinKey = pKey;
250✔
1065
  const TSKEY  gap = 0;
250✔
1066
  int32_t      code = TSDB_CODE_SUCCESS;
250✔
1067
  int32_t      lino = 0;
250✔
1068
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
250✔
1069
  SArray*      pWinStates = NULL;
250✔
1070
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
250✔
1071
  if (ppBuff) {
250✔
1072
    pWinStates = (SArray*)(*ppBuff);
45✔
1073
  } else {
1074
    pWinStates = taosArrayInit(16, POINTER_BYTES);
205✔
1075
    code = tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
205✔
1076
    QUERY_CHECK_CODE(code, lino, _end);
205!
1077
  }
1078

1079
  TSKEY startTs = pWinKey->win.skey;
250✔
1080
  TSKEY endTs = pWinKey->win.ekey;
250✔
1081

1082
  int32_t size = taosArrayGetSize(pWinStates);
250✔
1083
  if (size == 0) {
250✔
1084
    void* pFileStore = getStateFileStore(pFileState);
215✔
1085
    void* pRockVal = NULL;
215✔
1086

1087
    int32_t code_file = getCountWinStateFromDisc(pFileStore, pWinKey, &pRockVal, pVLen);
215✔
1088
    if (code_file == TSDB_CODE_SUCCESS && isFlushedState(pFileState, endTs, 0)) {
215!
UNCOV
1089
      int32_t     valSize = *pVLen;
×
UNCOV
1090
      COUNT_TYPE* pWinStateCount = (COUNT_TYPE*)((char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)));
×
1091
      if ((*pWinStateCount) == winCount) {
×
1092
        code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
×
1093
        QUERY_CHECK_CODE(code, lino, _end);
×
1094
      } else {
1095
        (*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
×
UNCOV
1096
        if (!(*pVal)) {
×
1097
          code = TSDB_CODE_OUT_OF_MEMORY;
×
1098
          QUERY_CHECK_CODE(code, lino, _end);
×
1099
        }
1100
        qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
×
1101
               pWinKey->win.ekey, code_file);
1102
      }
1103
    } else {
1104
      code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
215✔
1105
      taosMemoryFree(pRockVal);
215✔
1106
      QUERY_CHECK_CODE(code, lino, _end);
215!
1107
    }
1108
  } else {
1109
    code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
35✔
1110
    QUERY_CHECK_CODE(code, lino, _end);
35!
1111
  }
1112

1113
_end:
35✔
1114
  if (code != TSDB_CODE_SUCCESS) {
250!
UNCOV
1115
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1116
  }
1117
  return code;
250✔
1118
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc