• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3720

26 Mar 2025 06:20AM UTC coverage: 30.242% (-31.7%) from 61.936%
#3720

push

travis-ci

web-flow
feat(taosBenchmark): supports decimal data type (#30456)

* feat: taosBenchmark supports decimal data type

* build: decimal script not use pytest.sh

* fix: fix typo for decimal script

* test: insertBasic.py debug

71234 of 313946 branches covered (22.69%)

Branch coverage included in aggregate %.

38 of 423 new or added lines in 8 files covered. (8.98%)

120240 existing lines in 447 files now uncovered.

118188 of 312400 relevant lines covered (37.83%)

1450220.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/libs/stream/src/streamSessionState.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tstreamFileState.h"
17

18
#include "query.h"
19
#include "streamBackendRocksdb.h"
20
#include "tcommon.h"
21
#include "tsimplehash.h"
22

23
#define MAX_SCAN_RANGE_SIZE 102400
24

UNCOV
25
int sessionStateKeyCompare(const void* pWin1, const void* pDatas, int pos) {
×
UNCOV
26
  SRowBuffPos* pPos2 = taosArrayGetP(pDatas, pos);
×
UNCOV
27
  SSessionKey* pWin2 = (SSessionKey*)pPos2->pKey;
×
UNCOV
28
  return sessionWinKeyCmpr((SSessionKey*)pWin1, pWin2);
×
29
}
30

UNCOV
31
int sessionStateRangeKeyCompare(const SSessionKey* pWin1, const void* pDatas, int pos) {
×
UNCOV
32
  SRowBuffPos* pPos2 = taosArrayGetP(pDatas, pos);
×
UNCOV
33
  SSessionKey* pWin2 = (SSessionKey*)pPos2->pKey;
×
UNCOV
34
  return sessionRangeKeyCmpr(pWin1, pWin2);
×
35
}
36

UNCOV
37
int32_t binarySearch(void* keyList, int num, const void* key, __session_compare_fn_t cmpFn) {
×
UNCOV
38
  int firstPos = 0, lastPos = num - 1, midPos = -1;
×
UNCOV
39
  int numOfRows = 0;
×
40

UNCOV
41
  if (num <= 0) return -1;
×
42
  // find the first position which is smaller or equal than the key.
43
  // if all data is bigger than the key return -1
44
  while (1) {
UNCOV
45
    if (cmpFn(key, keyList, lastPos) >= 0) return lastPos;
×
UNCOV
46
    if (cmpFn(key, keyList, firstPos) == 0) return firstPos;
×
UNCOV
47
    if (cmpFn(key, keyList, firstPos) < 0) return firstPos - 1;
×
48

UNCOV
49
    numOfRows = lastPos - firstPos + 1;
×
UNCOV
50
    midPos = (numOfRows >> 1) + firstPos;
×
51

UNCOV
52
    if (cmpFn(key, keyList, midPos) < 0) {
×
UNCOV
53
      lastPos = midPos - 1;
×
UNCOV
54
    } else if (cmpFn(key, keyList, midPos) > 0) {
×
UNCOV
55
      firstPos = midPos + 1;
×
56
    } else {
UNCOV
57
      break;
×
58
    }
59
  }
60

UNCOV
61
  return midPos;
×
62
}
63

64
int64_t getSessionWindowEndkey(void* data, int32_t index) {
×
65
  SArray*       pWinInfos = (SArray*)data;
×
66
  SRowBuffPos** ppos = taosArrayGet(pWinInfos, index);
×
67
  if (ppos != NULL) {
×
68
    SSessionKey* pWin = (SSessionKey*)((*ppos)->pKey);
×
69
    return pWin->win.ekey;
×
70
  } else {
71
    return 0;
×
72
  }
73
}
74

UNCOV
75
bool inSessionWindow(SSessionKey* pKey, TSKEY ts, int64_t gap) {
×
UNCOV
76
  if (ts + gap >= pKey->win.skey && ts - gap <= pKey->win.ekey) {
×
UNCOV
77
    return true;
×
78
  }
UNCOV
79
  return false;
×
80
}
81

UNCOV
82
SStreamStateCur* createStateCursor(SStreamFileState* pFileState) {
×
UNCOV
83
  SStreamStateCur* pCur = createStreamStateCursor();
×
UNCOV
84
  if (pCur == NULL) {
×
85
    return NULL;
×
86
  }
UNCOV
87
  pCur->pStreamFileState = pFileState;
×
UNCOV
88
  return pCur;
×
89
}
90

UNCOV
91
static int32_t addNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInfos, const SSessionKey* pKey,
×
92
                                   SRowBuffPos** ppPos) {
UNCOV
93
  int32_t      code = TSDB_CODE_SUCCESS;
×
UNCOV
94
  int32_t      lino = 0;
×
UNCOV
95
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
×
UNCOV
96
  if (!pNewPos || !pNewPos->pRowBuff) {
×
97
    code = TSDB_CODE_OUT_OF_MEMORY;
×
98
    QUERY_CHECK_CODE(code, lino, _end);
×
99
  }
100

UNCOV
101
  memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
×
UNCOV
102
  void* tmp = taosArrayPush(pWinInfos, &pNewPos);
×
UNCOV
103
  if (!tmp) {
×
104
    code = terrno;
×
105
    QUERY_CHECK_CODE(code, lino, _end);
×
106
  }
UNCOV
107
  (*ppPos) = pNewPos;
×
108

UNCOV
109
_end:
×
UNCOV
110
  if (code != TSDB_CODE_SUCCESS) {
×
111
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
112
  }
UNCOV
113
  return code;
×
114
}
115

UNCOV
116
static int32_t insertNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInfos, const SSessionKey* pKey,
×
117
                                      int32_t index, SRowBuffPos** ppPos) {
UNCOV
118
  int32_t      code = TSDB_CODE_SUCCESS;
×
UNCOV
119
  int32_t      lino = 0;
×
UNCOV
120
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
×
UNCOV
121
  if (!pNewPos || !pNewPos->pRowBuff) {
×
122
    code = TSDB_CODE_OUT_OF_MEMORY;
×
123
    QUERY_CHECK_CODE(code, lino, _end);
×
124
  }
125

UNCOV
126
  memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
×
UNCOV
127
  void* tmp = taosArrayInsert(pWinInfos, index, &pNewPos);
×
UNCOV
128
  if (!tmp) {
×
129
    code = terrno;
×
130
    QUERY_CHECK_CODE(code, lino, _end);
×
131
  }
132

UNCOV
133
  *ppPos = pNewPos;
×
134

UNCOV
135
_end:
×
UNCOV
136
  if (code != TSDB_CODE_SUCCESS) {
×
137
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
138
  }
UNCOV
139
  return code;
×
140
}
141

UNCOV
142
SRowBuffPos* createSessionWinBuff(SStreamFileState* pFileState, SSessionKey* pKey, void* p, int32_t* pVLen) {
×
UNCOV
143
  int32_t      code = TSDB_CODE_SUCCESS;
×
UNCOV
144
  int32_t      lino = 0;
×
UNCOV
145
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
×
UNCOV
146
  if (!pNewPos || !pNewPos->pRowBuff) {
×
147
    code = TSDB_CODE_OUT_OF_MEMORY;
×
148
    QUERY_CHECK_CODE(code, lino, _end);
×
149
  }
150

UNCOV
151
  memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
×
UNCOV
152
  pNewPos->needFree = true;
×
UNCOV
153
  pNewPos->beFlushed = true;
×
UNCOV
154
  int32_t len = getRowStateRowSize(pFileState);
×
UNCOV
155
  if (p) {
×
UNCOV
156
    if (*pVLen > len){
×
157
      qError("[InternalERR] read key:[skey:%"PRId64 ",ekey:%"PRId64 ",groupId:%"PRIu64 "],session window buffer is too small, *pVLen:%d, len:%d", pKey->win.skey, pKey->win.ekey, pKey->groupId, *pVLen, len);
×
158
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
159
      QUERY_CHECK_CODE(code, lino, _end);
×
160
    }else{
UNCOV
161
      memcpy(pNewPos->pRowBuff, p, *pVLen);
×
162
    }
163
  } else {
UNCOV
164
    memset(pNewPos->pRowBuff, 0, len);
×
165
  }
166

UNCOV
167
_end:
×
UNCOV
168
  taosMemoryFree(p);
×
UNCOV
169
  if (code != TSDB_CODE_SUCCESS) {
×
170
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
171
    return NULL;
×
172
  }
UNCOV
173
  return pNewPos;
×
174
}
175

UNCOV
176
int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, int32_t* pVLen,
×
177
                                int32_t* pWinCode) {
UNCOV
178
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
179
  int32_t lino = 0;
×
UNCOV
180
  (*pWinCode) = TSDB_CODE_SUCCESS;
×
UNCOV
181
  SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
×
UNCOV
182
  SArray*    pWinStates = NULL;
×
UNCOV
183
  void**     ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t));
×
UNCOV
184
  if (ppBuff) {
×
UNCOV
185
    pWinStates = (SArray*)(*ppBuff);
×
186
  } else {
UNCOV
187
    pWinStates = taosArrayInit(16, POINTER_BYTES);
×
UNCOV
188
    if (!pWinStates) {
×
189
      code = terrno;
×
190
      QUERY_CHECK_CODE(code, lino, _end);
×
191
    }
UNCOV
192
    code = tSimpleHashPut(pSessionBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
×
UNCOV
193
    QUERY_CHECK_CODE(code, lino, _end);
×
194
  }
195

UNCOV
196
  TSKEY startTs = pKey->win.skey;
×
UNCOV
197
  TSKEY endTs = pKey->win.ekey;
×
198

UNCOV
199
  int32_t size = taosArrayGetSize(pWinStates);
×
UNCOV
200
  if (size == 0) {
×
UNCOV
201
    void*   pFileStore = getStateFileStore(pFileState);
×
UNCOV
202
    void*   p = NULL;
×
UNCOV
203
    int32_t code_file = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen);
×
UNCOV
204
    if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
×
UNCOV
205
      (*pVal) = createSessionWinBuff(pFileState, pKey, p, pVLen);
×
UNCOV
206
      if (!(*pVal)) {
×
207
        code = TSDB_CODE_OUT_OF_MEMORY;
×
208
        QUERY_CHECK_CODE(code, lino, _end);
×
209
      }
210

UNCOV
211
      (*pWinCode) = code_file;
×
UNCOV
212
      qDebug("===stream===0 get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code_file);
×
213
    } else {
UNCOV
214
      code = addNewSessionWindow(pFileState, pWinStates, pKey, (SRowBuffPos**)pVal);
×
UNCOV
215
      (*pWinCode) = TSDB_CODE_FAILED;
×
UNCOV
216
      taosMemoryFree(p);
×
UNCOV
217
      QUERY_CHECK_CODE(code, lino, _end);
×
218
    }
UNCOV
219
    goto _end;
×
220
  }
221

222
  // find the first position which is smaller than the pKey
UNCOV
223
  int32_t      index = binarySearch(pWinStates, size, pKey, sessionStateKeyCompare);
×
UNCOV
224
  SRowBuffPos* pPos = NULL;
×
225

UNCOV
226
  if (index >= 0) {
×
UNCOV
227
    pPos = taosArrayGetP(pWinStates, index);
×
UNCOV
228
    if (inSessionWindow(pPos->pKey, startTs, gap)) {
×
UNCOV
229
      (*pVal) = pPos;
×
UNCOV
230
      SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
×
UNCOV
231
      pPos->beUsed = true;
×
UNCOV
232
      pPos->beFlushed = false;
×
UNCOV
233
      *pKey = *pDestWinKey;
×
UNCOV
234
      goto _end;
×
235
    }
236
  }
237

UNCOV
238
  if (index + 1 < size) {
×
UNCOV
239
    pPos = taosArrayGetP(pWinStates, index + 1);
×
UNCOV
240
    if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap))) {
×
UNCOV
241
      (*pVal) = pPos;
×
UNCOV
242
      SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
×
UNCOV
243
      pPos->beUsed = true;
×
UNCOV
244
      pPos->beFlushed = false;
×
UNCOV
245
      *pKey = *pDestWinKey;
×
UNCOV
246
      goto _end;
×
247
    }
248
  }
249

UNCOV
250
  if (index + 1 == 0) {
×
UNCOV
251
    if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs, gap)) {
×
UNCOV
252
      void*   p = NULL;
×
UNCOV
253
      void*   pFileStore = getStateFileStore(pFileState);
×
UNCOV
254
      int32_t code_file = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen);
×
UNCOV
255
      if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
×
UNCOV
256
        (*pVal) = createSessionWinBuff(pFileState, pKey, p, pVLen);
×
UNCOV
257
        if (!(*pVal)) {
×
258
          code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
259
          QUERY_CHECK_CODE(code, lino, _end);
×
260
        }
261

UNCOV
262
        (*pWinCode) = code_file;
×
UNCOV
263
        qDebug("===stream===1 get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code_file);
×
UNCOV
264
        goto _end;
×
265
      } else {
266
        taosMemoryFree(p);
×
267
      }
268
    }
269
  }
270

UNCOV
271
  if (index == size - 1) {
×
UNCOV
272
    code = addNewSessionWindow(pFileState, pWinStates, pKey, (SRowBuffPos**)pVal);
×
UNCOV
273
    QUERY_CHECK_CODE(code, lino, _end);
×
274

UNCOV
275
    (*pWinCode) = TSDB_CODE_FAILED;
×
UNCOV
276
    goto _end;
×
277
  }
278

UNCOV
279
  code = insertNewSessionWindow(pFileState, pWinStates, pKey, index + 1, (SRowBuffPos**)pVal);
×
UNCOV
280
  QUERY_CHECK_CODE(code, lino, _end);
×
281

UNCOV
282
  (*pWinCode) = TSDB_CODE_FAILED;
×
283

UNCOV
284
_end:
×
UNCOV
285
  if (code != TSDB_CODE_SUCCESS) {
×
286
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
287
  }
UNCOV
288
  return code;
×
289
}
290

UNCOV
291
int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,
×
292
                          int32_t* pWinCode) {
UNCOV
293
  SWinKey*    pTmpkey = pKey;
×
UNCOV
294
  SSessionKey pWinKey = {.groupId = pTmpkey->groupId, .win.skey = pTmpkey->ts, .win.ekey = pTmpkey->ts};
×
UNCOV
295
  return getSessionWinResultBuff(pFileState, &pWinKey, 0, pVal, pVLen, pWinCode);
×
296
}
297

UNCOV
298
int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
×
UNCOV
299
  int32_t      code = TSDB_CODE_SUCCESS;
×
UNCOV
300
  int32_t      lino = 0;
×
UNCOV
301
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
×
UNCOV
302
  SSessionKey* pKey = pPos->pKey;
×
UNCOV
303
  SArray*      pWinStates = NULL;
×
UNCOV
304
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t));
×
UNCOV
305
  if (ppBuff) {
×
UNCOV
306
    pWinStates = (SArray*)(*ppBuff);
×
307
  } else {
UNCOV
308
    pWinStates = taosArrayInit(16, POINTER_BYTES);
×
UNCOV
309
    if (!pWinStates) {
×
310
      code = terrno;
×
311
      QUERY_CHECK_CODE(code, lino, _end);
×
312
    }
313

UNCOV
314
    code = tSimpleHashPut(pSessionBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
×
UNCOV
315
    QUERY_CHECK_CODE(code, lino, _end);
×
316
  }
317

UNCOV
318
  int32_t size = taosArrayGetSize(pWinStates);
×
UNCOV
319
  if (size == 0) {
×
UNCOV
320
    void* tmp = taosArrayPush(pWinStates, &pPos);
×
UNCOV
321
    if (!tmp) {
×
322
      code = terrno;
×
323
      QUERY_CHECK_CODE(code, lino, _end);
×
324
    }
UNCOV
325
    goto _end;
×
326
  }
327

328
  // find the first position which is smaller than the pKey
UNCOV
329
  int32_t index = binarySearch(pWinStates, size, pKey, sessionStateKeyCompare);
×
UNCOV
330
  if (index >= 0) {
×
331
    void* tmp = taosArrayInsert(pWinStates, index, &pPos);
×
332
    if (!tmp) {
×
333
      code = terrno;
×
334
      QUERY_CHECK_CODE(code, lino, _end);
×
335
    }
336
  } else {
UNCOV
337
    void* tmp = taosArrayInsert(pWinStates, 0, &pPos);
×
UNCOV
338
    if (!tmp) {
×
339
      code = terrno;
×
340
      QUERY_CHECK_CODE(code, lino, _end);
×
341
    }
342
  }
343

UNCOV
344
_end:
×
UNCOV
345
  pPos->needFree = false;
×
UNCOV
346
  if (code != TSDB_CODE_SUCCESS) {
×
347
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
348
  }
UNCOV
349
  return code;
×
350
}
351

UNCOV
352
int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen,
×
353
                              int32_t* pWinCode) {
UNCOV
354
  int32_t      code = TSDB_CODE_SUCCESS;
×
UNCOV
355
  int32_t      lino = 0;
×
UNCOV
356
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
×
UNCOV
357
  if (!pNewPos || !pNewPos->pRowBuff) {
×
358
    code = TSDB_CODE_OUT_OF_MEMORY;
×
359
    QUERY_CHECK_CODE(code, lino, _end);
×
360
  }
UNCOV
361
  pNewPos->needFree = true;
×
UNCOV
362
  pNewPos->beFlushed = true;
×
UNCOV
363
  void* pBuff = NULL;
×
UNCOV
364
  (*pWinCode) = streamStateSessionGet_rocksdb(getStateFileStore(pFileState), pKey, &pBuff, pVLen);
×
UNCOV
365
  if ((*pWinCode) != TSDB_CODE_SUCCESS) {
×
366
    goto _end;
×
367
  }
UNCOV
368
  memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
×
UNCOV
369
  memcpy(pNewPos->pRowBuff, pBuff, *pVLen);
×
UNCOV
370
  taosMemoryFreeClear(pBuff);
×
UNCOV
371
  (*pVal) = pNewPos;
×
372

UNCOV
373
_end:
×
UNCOV
374
  if (code != TSDB_CODE_SUCCESS) {
×
375
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
376
  }
UNCOV
377
  return code;
×
378
}
379

UNCOV
380
int32_t deleteSessionWinStateBuffFn(void* pBuff, const void* key, size_t keyLen) {
×
UNCOV
381
  SSHashObj*   pSessionBuff = (SSHashObj*)pBuff;
×
UNCOV
382
  SSessionKey* pWinKey = (SSessionKey*)key;
×
UNCOV
383
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
×
UNCOV
384
  if (!ppBuff) {
×
UNCOV
385
    return TSDB_CODE_SUCCESS;
×
386
  }
UNCOV
387
  SArray* pWinStates = (SArray*)(*ppBuff);
×
UNCOV
388
  int32_t size = taosArrayGetSize(pWinStates);
×
UNCOV
389
  TSKEY   gap = 0;
×
UNCOV
390
  int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
×
UNCOV
391
  if (index >= 0) {
×
UNCOV
392
    SRowBuffPos* pPos = taosArrayGetP(pWinStates, index);
×
UNCOV
393
    if (inSessionWindow(pPos->pKey, pWinKey->win.skey, gap)) {
×
UNCOV
394
      pPos->beFlushed = true;
×
UNCOV
395
      taosArrayRemove(pWinStates, index);
×
396
    }
397
  }
UNCOV
398
  return TSDB_CODE_SUCCESS;
×
399
}
400

UNCOV
401
void deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) {
×
UNCOV
402
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
×
UNCOV
403
  SSessionKey* pWinKey = (SSessionKey*)pPos->pKey;
×
UNCOV
404
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
×
UNCOV
405
  if (!ppBuff) {
×
406
    return;
×
407
  }
UNCOV
408
  SArray* pWinStates = (SArray*)(*ppBuff);
×
UNCOV
409
  int32_t size = taosArrayGetSize(pWinStates);
×
UNCOV
410
  TSKEY   gap = 0;
×
UNCOV
411
  int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
×
UNCOV
412
  if (index >= 0) {
×
UNCOV
413
    SRowBuffPos* pItemPos = taosArrayGetP(pWinStates, index);
×
UNCOV
414
    if (pItemPos == pPos) {
×
UNCOV
415
      taosArrayRemove(pWinStates, index);
×
416
    }
417
  }
418
}
419

UNCOV
420
int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStreamStateCur* pCur,
×
421
                                           const SSessionKey* pWinKey, void** ppVal, int32_t* pVLen) {
UNCOV
422
  int32_t      code = TSDB_CODE_SUCCESS;
×
UNCOV
423
  int32_t      lino = 0;
×
UNCOV
424
  SRowBuffPos* pNewPos = NULL;
×
UNCOV
425
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
×
UNCOV
426
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
×
UNCOV
427
  SArray*      pWinStates = NULL;
×
UNCOV
428
  if (!ppBuff) {
×
UNCOV
429
    pWinStates = taosArrayInit(16, POINTER_BYTES);
×
UNCOV
430
    if (!pWinStates) {
×
431
      code = terrno;
×
432
      QUERY_CHECK_CODE(code, lino, _end);
×
433
    }
434

UNCOV
435
    code = tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
×
UNCOV
436
    QUERY_CHECK_CODE(code, lino, _end);
×
437
  } else {
UNCOV
438
    pWinStates = (SArray*)(*ppBuff);
×
439
  }
UNCOV
440
  if (!pCur) {
×
UNCOV
441
    code = addNewSessionWindow(pFileState, pWinStates, pWinKey, &pNewPos);
×
UNCOV
442
    QUERY_CHECK_CODE(code, lino, _end);
×
443

UNCOV
444
    goto _end;
×
445
  }
446

UNCOV
447
  int32_t size = taosArrayGetSize(pWinStates);
×
UNCOV
448
  if (pCur->buffIndex >= 0) {
×
UNCOV
449
    if (pCur->buffIndex >= size) {
×
UNCOV
450
      code = addNewSessionWindow(pFileState, pWinStates, pWinKey, &pNewPos);
×
UNCOV
451
      QUERY_CHECK_CODE(code, lino, _end);
×
452

UNCOV
453
      goto _end;
×
454
    }
UNCOV
455
    code = insertNewSessionWindow(pFileState, pWinStates, pWinKey, pCur->buffIndex, &pNewPos);
×
UNCOV
456
    QUERY_CHECK_CODE(code, lino, _end);
×
457

UNCOV
458
    goto _end;
×
459
  } else {
UNCOV
460
    if (size > 0) {
×
UNCOV
461
      SRowBuffPos* pPos = taosArrayGetP(pWinStates, 0);
×
UNCOV
462
      if (sessionWinKeyCmpr(pWinKey, pPos->pKey) >= 0) {
×
463
        // pCur is invalid
464
        SSessionKey pTmpKey = *pWinKey;
×
465
        int32_t     winCode = TSDB_CODE_SUCCESS;
×
466
        code = getSessionWinResultBuff(pFileState, &pTmpKey, 0, (void**)&pNewPos, pVLen, &winCode);
×
467
        QUERY_CHECK_CONDITION((winCode == TSDB_CODE_FAILED), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
468
        QUERY_CHECK_CODE(code, lino, _end);
×
469
        goto _end;
×
470
      }
471
    }
UNCOV
472
    pNewPos = getNewRowPosForWrite(pFileState);
×
UNCOV
473
    if (!pNewPos || !pNewPos->pRowBuff) {
×
474
      code = TSDB_CODE_OUT_OF_MEMORY;
×
475
      QUERY_CHECK_CODE(code, lino, _end);
×
476
    }
477

UNCOV
478
    memcpy(pNewPos->pKey, pWinKey, sizeof(SSessionKey));
×
UNCOV
479
    pNewPos->needFree = true;
×
UNCOV
480
    pNewPos->beFlushed = true;
×
481
  }
482

UNCOV
483
_end:
×
UNCOV
484
  (*ppVal) = pNewPos;
×
UNCOV
485
  if (code != TSDB_CODE_SUCCESS) {
×
486
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
487
  }
UNCOV
488
  return code;
×
489
}
490

UNCOV
491
void sessionWinStateClear(SStreamFileState* pFileState) {
×
UNCOV
492
  int32_t buffSize = getRowStateRowSize(pFileState);
×
UNCOV
493
  void*   pIte = NULL;
×
UNCOV
494
  size_t  keyLen = 0;
×
UNCOV
495
  int32_t iter = 0;
×
UNCOV
496
  void*   pBuff = getRowStateBuff(pFileState);
×
UNCOV
497
  while ((pIte = tSimpleHashIterate(pBuff, pIte, &iter)) != NULL) {
×
UNCOV
498
    SArray* pWinStates = *((void**)pIte);
×
UNCOV
499
    int32_t size = taosArrayGetSize(pWinStates);
×
UNCOV
500
    for (int32_t i = 0; i < size; i++) {
×
UNCOV
501
      SRowBuffPos* pPos = taosArrayGetP(pWinStates, i);
×
UNCOV
502
      memset(pPos->pRowBuff, 0, buffSize);
×
503
    }
504
  }
UNCOV
505
}
×
506

UNCOV
507
void freeArrayPtr(void* ptr) {
×
UNCOV
508
  SArray* pArray = *(void**)ptr;
×
UNCOV
509
  taosArrayDestroy(pArray);
×
UNCOV
510
}
×
511

UNCOV
512
void sessionWinStateCleanup(void* pBuff) {
×
UNCOV
513
  if (pBuff == NULL) {
×
UNCOV
514
    return;
×
515
  }
UNCOV
516
  tSimpleHashSetFreeFp(pBuff, freeArrayPtr);
×
UNCOV
517
  tSimpleHashCleanup(pBuff);
×
518
}
519

UNCOV
520
static SStreamStateCur* seekKeyCurrentPrev_buff(SStreamFileState* pFileState, const SSessionKey* pWinKey,
×
521
                                                SArray** pWins, int32_t* pIndex) {
UNCOV
522
  SStreamStateCur* pCur = NULL;
×
UNCOV
523
  SSHashObj*       pSessionBuff = getRowStateBuff(pFileState);
×
UNCOV
524
  void**           ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
×
UNCOV
525
  if (!ppBuff) {
×
UNCOV
526
    return NULL;
×
527
  }
528

UNCOV
529
  SArray* pWinStates = (SArray*)(*ppBuff);
×
UNCOV
530
  int32_t size = taosArrayGetSize(pWinStates);
×
UNCOV
531
  TSKEY   gap = 0;
×
UNCOV
532
  int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
×
533

UNCOV
534
  if (pWins) {
×
UNCOV
535
    (*pWins) = pWinStates;
×
536
  }
537

UNCOV
538
  if (size > 0 && index == -1) {
×
UNCOV
539
    SRowBuffPos* pPos = taosArrayGetP(pWinStates, 0);
×
UNCOV
540
    SSessionKey* pWin = (SSessionKey*)pPos->pKey;
×
UNCOV
541
    if (pWinKey->win.skey == pWin->win.skey) {
×
UNCOV
542
      index = 0;
×
543
    }
544
  }
545

UNCOV
546
  if (index >= 0) {
×
UNCOV
547
    pCur = createStateCursor(pFileState);
×
UNCOV
548
    if (pCur == NULL) {
×
549
      return NULL;
×
550
    }
UNCOV
551
    pCur->buffIndex = index;
×
UNCOV
552
    if (pIndex) {
×
UNCOV
553
      *pIndex = index;
×
554
    }
555
  }
556

UNCOV
557
  return pCur;
×
558
}
559

UNCOV
560
SStreamStateCur* sessionWinStateSeekKeyCurrentPrev(SStreamFileState* pFileState, const SSessionKey* pWinKey) {
×
UNCOV
561
  SStreamStateCur* pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, NULL, NULL);
×
UNCOV
562
  if (pCur) {
×
UNCOV
563
    return pCur;
×
564
  }
565

UNCOV
566
  void* pFileStore = getStateFileStore(pFileState);
×
UNCOV
567
  pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pFileStore, pWinKey);
×
UNCOV
568
  if (!pCur) {
×
UNCOV
569
    return NULL;
×
570
  }
UNCOV
571
  pCur->buffIndex = -1;
×
UNCOV
572
  pCur->pStreamFileState = pFileState;
×
UNCOV
573
  return pCur;
×
574
}
575

576
SStreamStateCur* sessionWinStateSeekKeyPrev(SStreamFileState *pFileState, const SSessionKey *pWinKey) {
×
577
  SArray*          pWinStates = NULL;
×
578
  int32_t          index = -1;
×
579
  SStreamStateCur *pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index);
×
580
  if (pCur) {
×
581
    int32_t cmpRes= sessionStateRangeKeyCompare(pWinKey, pWinStates, index);
×
582
    if (cmpRes > 0) {
×
583
      return pCur;
×
584
    } else if (cmpRes == 0 && index > 0) {
×
585
      sessionWinStateMoveToPrev(pCur);
×
586
      return pCur;
×
587
    }
588
    streamStateFreeCur(pCur);
×
589
    pCur = NULL;
×
590
  }
591

592
  void* pFileStore = getStateFileStore(pFileState);
×
593
  pCur = streamStateSessionSeekKeyPrev_rocksdb(pFileStore, pWinKey);
×
594
  if (!pCur) {
×
595
    return NULL;
×
596
  }
597
  pCur->buffIndex = -1;
×
598
  pCur->pStreamFileState = pFileState;
×
599
  return pCur;
×
600
}
601

UNCOV
602
static void transformCursor(SStreamFileState* pFileState, SStreamStateCur* pCur) {
×
UNCOV
603
  if (!pCur) {
×
604
    return;
×
605
  }
UNCOV
606
  streamStateResetCur(pCur);
×
UNCOV
607
  pCur->buffIndex = 0;
×
UNCOV
608
  pCur->pStreamFileState = pFileState;
×
609
}
610

UNCOV
611
static void checkAndTransformCursor(SStreamFileState* pFileState, const uint64_t groupId, SArray* pWinStates,
×
612
                                    SStreamStateCur** ppCur) {
UNCOV
613
  SSessionKey key = {.groupId = groupId};
×
UNCOV
614
  int32_t     code = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pFileState), *ppCur, &key, NULL, NULL);
×
UNCOV
615
  if (taosArrayGetSize(pWinStates) > 0 &&
×
UNCOV
616
      (code == TSDB_CODE_FAILED || sessionStateKeyCompare(&key, pWinStates, 0) >= 0)) {
×
UNCOV
617
    if (!(*ppCur)) {
×
UNCOV
618
      (*ppCur) = createStateCursor(pFileState);
×
619
    }
UNCOV
620
    transformCursor(pFileState, *ppCur);
×
UNCOV
621
  } else if (*ppCur) {
×
UNCOV
622
    (*ppCur)->buffIndex = -1;
×
UNCOV
623
    (*ppCur)->pStreamFileState = pFileState;
×
624
  }
UNCOV
625
}
×
626

UNCOV
627
SStreamStateCur* sessionWinStateSeekKeyCurrentNext(SStreamFileState* pFileState, const SSessionKey* pWinKey) {
×
UNCOV
628
  SArray*          pWinStates = NULL;
×
UNCOV
629
  int32_t          index = -1;
×
UNCOV
630
  SStreamStateCur* pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index);
×
UNCOV
631
  if (pCur) {
×
UNCOV
632
    if (sessionStateRangeKeyCompare(pWinKey, pWinStates, index) > 0) {
×
UNCOV
633
      sessionWinStateMoveToNext(pCur);
×
634
    }
UNCOV
635
    return pCur;
×
636
  }
637

UNCOV
638
  void* pFileStore = getStateFileStore(pFileState);
×
UNCOV
639
  pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pFileStore, (SSessionKey*)pWinKey);
×
UNCOV
640
  checkAndTransformCursor(pFileState, pWinKey->groupId, pWinStates, &pCur);
×
UNCOV
641
  return pCur;
×
642
}
643

UNCOV
644
SStreamStateCur* sessionWinStateSeekKeyNext(SStreamFileState* pFileState, const SSessionKey* pWinKey) {
×
UNCOV
645
  SArray*          pWinStates = NULL;
×
UNCOV
646
  int32_t          index = -1;
×
UNCOV
647
  SStreamStateCur* pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index);
×
UNCOV
648
  if (pCur) {
×
UNCOV
649
    sessionWinStateMoveToNext(pCur);
×
UNCOV
650
    return pCur;
×
651
  }
652

UNCOV
653
  void* pFileStore = getStateFileStore(pFileState);
×
UNCOV
654
  pCur = streamStateSessionSeekKeyNext_rocksdb(pFileStore, pWinKey);
×
UNCOV
655
  checkAndTransformCursor(pFileState, pWinKey->groupId, pWinStates, &pCur);
×
UNCOV
656
  return pCur;
×
657
}
658

UNCOV
659
SStreamStateCur* countWinStateSeekKeyPrev(SStreamFileState* pFileState, const SSessionKey* pWinKey, COUNT_TYPE count) {
×
UNCOV
660
  SArray*          pWinStates = NULL;
×
UNCOV
661
  int32_t          index = -1;
×
UNCOV
662
  SStreamStateCur* pBuffCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index);
×
UNCOV
663
  int32_t          resSize = getRowStateRowSize(pFileState);
×
UNCOV
664
  COUNT_TYPE       winCount = 0;
×
UNCOV
665
  if (pBuffCur) {
×
UNCOV
666
    while (index >= 0) {
×
UNCOV
667
      SRowBuffPos* pPos = taosArrayGetP(pWinStates, index);
×
UNCOV
668
      winCount = *((COUNT_TYPE*)((char*)pPos->pRowBuff + (resSize - sizeof(COUNT_TYPE))));
×
UNCOV
669
      if (sessionStateRangeKeyCompare(pWinKey, pWinStates, index) == 0 || winCount < count) {
×
UNCOV
670
        index--;
×
UNCOV
671
      } else if (index >= 0) {
×
UNCOV
672
        pBuffCur->buffIndex = index + 1;
×
UNCOV
673
        return pBuffCur;
×
674
      }
675
    }
UNCOV
676
    pBuffCur->buffIndex = 0;
×
UNCOV
677
  } else if (taosArrayGetSize(pWinStates) > 0) {
×
UNCOV
678
    pBuffCur = createStateCursor(pFileState);
×
UNCOV
679
    if (pBuffCur == NULL) {
×
680
      return NULL;
×
681
    }
UNCOV
682
    pBuffCur->buffIndex = 0;
×
683
  }
684

UNCOV
685
  void*            pFileStore = getStateFileStore(pFileState);
×
UNCOV
686
  SStreamStateCur* pCur = streamStateSessionSeekKeyPrev_rocksdb(pFileStore, pWinKey);
×
UNCOV
687
  if (pCur) {
×
688
    pCur->pStreamFileState = pFileState;
×
689
    SSessionKey key = {0};
×
690
    void*       pVal = NULL;
×
691
    int         len = 0;
×
692
    int32_t     code = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, &key, &pVal, &len);
×
693
    if (code == TSDB_CODE_FAILED) {
×
694
      streamStateFreeCur(pCur);
×
695
      return pBuffCur;
×
696
    }
697
    winCount = *((COUNT_TYPE*)((char*)pVal + (resSize - sizeof(COUNT_TYPE))));
×
698
    taosMemoryFreeClear(pVal);
×
699
    streamStateFreeCur(pBuffCur);
×
700
    if (sessionRangeKeyCmpr(pWinKey, &key) != 0 && winCount == count) {
×
701
      streamStateCurNext(pFileStore, pCur);
×
702
      return pCur;
×
703
    }
704
    streamStateCurPrev(pFileStore, pCur);
×
705
    while (1) {
706
      code = streamStateSessionGetKVByCur_rocksdb(NULL, pCur, &key, &pVal, &len);
×
707
      if (code == TSDB_CODE_FAILED) {
×
708
        streamStateCurNext(pFileStore, pCur);
×
709
        return pCur;
×
710
      }
711
      winCount = *((COUNT_TYPE*)((char*)pVal + (resSize - sizeof(COUNT_TYPE))));
×
712
      taosMemoryFreeClear(pVal);
×
713
      if (sessionRangeKeyCmpr(pWinKey, &key) == 0 || winCount < count) {
×
714
        streamStateCurPrev(pFileStore, pCur);
×
715
      } else {
716
        streamStateCurNext(pFileStore, pCur);
×
717
        return pCur;
×
718
      }
719
    }
720
  }
UNCOV
721
  return pBuffCur;
×
722
}
723

UNCOV
724
int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
×
UNCOV
725
  if (!pCur) {
×
UNCOV
726
    return TSDB_CODE_FAILED;
×
727
  }
UNCOV
728
  int32_t code = TSDB_CODE_SUCCESS;
×
729

UNCOV
730
  SSHashObj* pSessionBuff = getRowStateBuff(pCur->pStreamFileState);
×
UNCOV
731
  void**     ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t));
×
UNCOV
732
  SArray*    pWinStates = NULL;
×
UNCOV
733
  if (ppBuff) {
×
UNCOV
734
    pWinStates = (SArray*)(*ppBuff);
×
735
  }
736

UNCOV
737
  if (pCur->buffIndex >= 0) {
×
UNCOV
738
    int32_t size = taosArrayGetSize(pWinStates);
×
UNCOV
739
    if (pCur->buffIndex >= size) {
×
UNCOV
740
      return TSDB_CODE_FAILED;
×
741
    }
UNCOV
742
    SRowBuffPos* pPos = taosArrayGetP(pWinStates, pCur->buffIndex);
×
UNCOV
743
    if (pVal) {
×
UNCOV
744
      *pVal = pPos;
×
745
    }
UNCOV
746
    *pKey = *(SSessionKey*)(pPos->pKey);
×
747
  } else {
UNCOV
748
    void* pData = NULL;
×
UNCOV
749
    code = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pCur->pStreamFileState), pCur, pKey, &pData, pVLen);
×
UNCOV
750
    if (taosArrayGetSize(pWinStates) > 0 &&
×
UNCOV
751
        (code == TSDB_CODE_FAILED || sessionStateRangeKeyCompare(pKey, pWinStates, 0) >= 0)) {
×
UNCOV
752
      transformCursor(pCur->pStreamFileState, pCur);
×
UNCOV
753
      SRowBuffPos* pPos = taosArrayGetP(pWinStates, pCur->buffIndex);
×
UNCOV
754
      if (pVal) {
×
755
        *pVal = pPos;
×
756
      }
UNCOV
757
      *pKey = *(SSessionKey*)(pPos->pKey);
×
UNCOV
758
      code = TSDB_CODE_SUCCESS;
×
UNCOV
759
    } else if (code == TSDB_CODE_SUCCESS && pVal) {
×
UNCOV
760
      SRowBuffPos* pNewPos = getNewRowPosForWrite(pCur->pStreamFileState);
×
UNCOV
761
      if (!pNewPos || !pNewPos->pRowBuff) {
×
762
        code = TSDB_CODE_OUT_OF_MEMORY;
×
763
        taosMemoryFreeClear(pData);
×
764
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
765
        return code;
×
766
      }
UNCOV
767
      memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
×
UNCOV
768
      pNewPos->needFree = true;
×
UNCOV
769
      pNewPos->beFlushed = true;
×
UNCOV
770
      memcpy(pNewPos->pRowBuff, pData, *pVLen);
×
UNCOV
771
      (*pVal) = pNewPos;
×
772
    }
UNCOV
773
    taosMemoryFreeClear(pData);
×
774
  }
UNCOV
775
  return code;
×
776
}
777

UNCOV
778
void sessionWinStateMoveToNext(SStreamStateCur* pCur) {
×
UNCOV
779
  qTrace("move cursor to next");
×
UNCOV
780
  if (pCur && pCur->buffIndex >= 0) {
×
UNCOV
781
    pCur->buffIndex++;
×
782
  } else {
UNCOV
783
    streamStateCurNext_rocksdb(pCur);
×
784
  }
UNCOV
785
}
×
786

787
void sessionWinStateMoveToPrev(SStreamStateCur* pCur) {
×
788
  qTrace("move cursor to prev");
×
789
  if (pCur && pCur->buffIndex >= 1) {
×
790
    pCur->buffIndex--;
×
791
  } else {
792
    streamStateCurPrev_rocksdb(pCur);
×
793
  }
794
}
×
795

UNCOV
796
int32_t sessionWinStateGetKeyByRange(SStreamFileState* pFileState, const SSessionKey* key, SSessionKey* curKey,
×
797
                                     range_cmpr_fn cmpFn) {
UNCOV
798
  SStreamStateCur* pCur = sessionWinStateSeekKeyCurrentPrev(pFileState, key);
×
UNCOV
799
  SSessionKey      tmpKey = *key;
×
UNCOV
800
  int32_t          code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL);
×
UNCOV
801
  bool             hasCurrentPrev = true;
×
UNCOV
802
  if (code == TSDB_CODE_FAILED) {
×
UNCOV
803
    streamStateFreeCur(pCur);
×
UNCOV
804
    pCur = sessionWinStateSeekKeyNext(pFileState, key);
×
UNCOV
805
    code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL);
×
UNCOV
806
    hasCurrentPrev = false;
×
807
  }
808

UNCOV
809
  if (code == TSDB_CODE_FAILED) {
×
UNCOV
810
    code = TSDB_CODE_FAILED;
×
UNCOV
811
    goto _end;
×
812
  }
813

UNCOV
814
  if (cmpFn(key, &tmpKey) == 0) {
×
UNCOV
815
    *curKey = tmpKey;
×
UNCOV
816
    goto _end;
×
UNCOV
817
  } else if (!hasCurrentPrev) {
×
UNCOV
818
    code = TSDB_CODE_FAILED;
×
UNCOV
819
    goto _end;
×
820
  }
821

UNCOV
822
  sessionWinStateMoveToNext(pCur);
×
UNCOV
823
  code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL);
×
UNCOV
824
  if (code == TSDB_CODE_SUCCESS && cmpFn(key, &tmpKey) == 0) {
×
UNCOV
825
    *curKey = tmpKey;
×
826
  } else {
UNCOV
827
    code = TSDB_CODE_FAILED;
×
828
  }
829

UNCOV
830
_end:
×
UNCOV
831
  streamStateFreeCur(pCur);
×
UNCOV
832
  return code;
×
833
}
834

UNCOV
835
int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
×
836
                              state_key_cmpr_fn fn, void** pVal, int32_t* pVLen, int32_t* pWinCode) {
UNCOV
837
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
838
  int32_t lino = 0;
×
UNCOV
839
  (*pWinCode) = TSDB_CODE_SUCCESS;
×
840

UNCOV
841
  SSessionKey* pWinKey = key;
×
UNCOV
842
  TSKEY        gap = 0;
×
UNCOV
843
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
×
UNCOV
844
  SArray*      pWinStates = NULL;
×
845

UNCOV
846
  void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
×
UNCOV
847
  if (ppBuff) {
×
UNCOV
848
    pWinStates = (SArray*)(*ppBuff);
×
849
  } else {
UNCOV
850
    pWinStates = taosArrayInit(16, POINTER_BYTES);
×
UNCOV
851
    if (!pWinStates) {
×
852
      code = terrno;
×
853
      QUERY_CHECK_CODE(code, lino, _end);
×
854
    }
855

UNCOV
856
    code = tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
×
UNCOV
857
    QUERY_CHECK_CODE(code, lino, _end);
×
858
  }
859

UNCOV
860
  TSKEY startTs = pWinKey->win.skey;
×
UNCOV
861
  TSKEY endTs = pWinKey->win.ekey;
×
862

UNCOV
863
  int32_t size = taosArrayGetSize(pWinStates);
×
UNCOV
864
  if (size == 0) {
×
UNCOV
865
    void*   pFileStore = getStateFileStore(pFileState);
×
UNCOV
866
    void*   p = NULL;
×
UNCOV
867
    int32_t code_file = streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, pVLen);
×
UNCOV
868
    if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
×
UNCOV
869
      (*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen);
×
UNCOV
870
      if (!(*pVal)) {
×
871
        code = TSDB_CODE_OUT_OF_MEMORY;
×
872
        QUERY_CHECK_CODE(code, lino, _end);
×
873
      }
874

UNCOV
875
      (*pWinCode) = code_file;
×
UNCOV
876
      qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
×
877
             pWinKey->win.ekey, code_file);
878
    } else {
UNCOV
879
      code = addNewSessionWindow(pFileState, pWinStates, key, (SRowBuffPos**)pVal);
×
UNCOV
880
      (*pWinCode) = TSDB_CODE_FAILED;
×
UNCOV
881
      taosMemoryFree(p);
×
UNCOV
882
      QUERY_CHECK_CODE(code, lino, _end);
×
883
    }
UNCOV
884
    goto _end;
×
885
  }
886

887
  // find the first position which is smaller than the pWinKey
UNCOV
888
  int32_t      index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
×
UNCOV
889
  SRowBuffPos* pPos = NULL;
×
UNCOV
890
  int32_t      valSize = *pVLen;
×
891

UNCOV
892
  if (index >= 0) {
×
UNCOV
893
    pPos = taosArrayGetP(pWinStates, index);
×
UNCOV
894
    void* stateKey = (char*)(pPos->pRowBuff) + (valSize - keyDataLen);
×
UNCOV
895
    if (inSessionWindow(pPos->pKey, startTs, gap) || fn(pKeyData, stateKey) == true) {
×
UNCOV
896
      (*pVal) = pPos;
×
UNCOV
897
      SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
×
UNCOV
898
      pPos->beUsed = true;
×
UNCOV
899
      pPos->beFlushed = false;
×
UNCOV
900
      *key = *pDestWinKey;
×
UNCOV
901
      goto _end;
×
902
    }
903
  }
904

UNCOV
905
  if (index + 1 < size) {
×
UNCOV
906
    pPos = taosArrayGetP(pWinStates, index + 1);
×
UNCOV
907
    void* stateKey = (char*)(pPos->pRowBuff) + (valSize - keyDataLen);
×
UNCOV
908
    if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap)) ||
×
UNCOV
909
        fn(pKeyData, stateKey) == true) {
×
UNCOV
910
      (*pVal) = pPos;
×
UNCOV
911
      SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
×
UNCOV
912
      pPos->beUsed = true;
×
UNCOV
913
      pPos->beFlushed = false;
×
UNCOV
914
      *key = *pDestWinKey;
×
UNCOV
915
      goto _end;
×
916
    }
917
  }
918

UNCOV
919
  if (index + 1 == 0) {
×
UNCOV
920
    if (!isDeteled(pFileState, endTs)) {
×
UNCOV
921
      void*   p = NULL;
×
UNCOV
922
      void*   pFileStore = getStateFileStore(pFileState);
×
923
      int32_t code_file =
UNCOV
924
          streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, pVLen);
×
UNCOV
925
      if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
×
UNCOV
926
        (*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen);
×
UNCOV
927
        if (!(*pVal)) {
×
928
          code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
929
          QUERY_CHECK_CODE(code, lino, _end);
×
930
        }
931

UNCOV
932
        (*pWinCode) = code_file;
×
UNCOV
933
        qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
×
934
               pWinKey->win.ekey, code_file);
UNCOV
935
        goto _end;
×
936
      } else {
UNCOV
937
        taosMemoryFree(p);
×
938
      }
939
    }
940
  }
941

UNCOV
942
  if (index == size - 1) {
×
UNCOV
943
    code = addNewSessionWindow(pFileState, pWinStates, key, (SRowBuffPos**)pVal);
×
UNCOV
944
    QUERY_CHECK_CODE(code, lino, _end);
×
945

UNCOV
946
    (*pWinCode) = TSDB_CODE_FAILED;
×
UNCOV
947
    goto _end;
×
948
  }
UNCOV
949
  code = insertNewSessionWindow(pFileState, pWinStates, key, index + 1, (SRowBuffPos**)pVal);
×
UNCOV
950
  QUERY_CHECK_CODE(code, lino, _end);
×
951

UNCOV
952
  (*pWinCode) = TSDB_CODE_FAILED;
×
953

UNCOV
954
_end:
×
UNCOV
955
  if (code != TSDB_CODE_SUCCESS) {
×
956
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
957
  }
UNCOV
958
  return code;
×
959
}
960

UNCOV
961
int32_t getCountWinStateFromDisc(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
×
UNCOV
962
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, pKey);
×
UNCOV
963
  int32_t          code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, pKey, pVal, pVLen);
×
UNCOV
964
  streamStateFreeCur(pCur);
×
UNCOV
965
  if (code == TSDB_CODE_SUCCESS) {
×
966
    return code;
×
967
  } else {
UNCOV
968
    pCur = streamStateSessionSeekKeyPrev_rocksdb(pState, pKey);
×
969
  }
970

UNCOV
971
  code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, pKey, pVal, pVLen);
×
UNCOV
972
  streamStateFreeCur(pCur);
×
UNCOV
973
  return code;
×
974
}
975

UNCOV
976
int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal,
×
977
                              int32_t* pVLen, int32_t* pWinCount) {
UNCOV
978
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
979
  int32_t lino = 0;
×
UNCOV
980
  (*pWinCount) = TSDB_CODE_SUCCESS;
×
981

UNCOV
982
  SSessionKey* pWinKey = pKey;
×
UNCOV
983
  const TSKEY  gap = 0;
×
UNCOV
984
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
×
UNCOV
985
  SArray*      pWinStates = NULL;
×
UNCOV
986
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
×
UNCOV
987
  if (ppBuff) {
×
UNCOV
988
    pWinStates = (SArray*)(*ppBuff);
×
989
  } else {
UNCOV
990
    pWinStates = taosArrayInit(16, POINTER_BYTES);
×
UNCOV
991
    if (!pWinStates) {
×
992
      code = terrno;
×
993
      QUERY_CHECK_CODE(code, lino, _end);
×
994
    }
995

UNCOV
996
    code = tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
×
UNCOV
997
    QUERY_CHECK_CODE(code, lino, _end);
×
998
  }
999

UNCOV
1000
  TSKEY startTs = pWinKey->win.skey;
×
UNCOV
1001
  TSKEY endTs = pWinKey->win.ekey;
×
1002

UNCOV
1003
  int32_t size = taosArrayGetSize(pWinStates);
×
UNCOV
1004
  if (size == 0) {
×
UNCOV
1005
    void* pFileStore = getStateFileStore(pFileState);
×
UNCOV
1006
    void* pRockVal = NULL;
×
UNCOV
1007
    (*pWinCount) = getCountWinStateFromDisc(pFileStore, pWinKey, &pRockVal, pVLen);
×
UNCOV
1008
    if ((*pWinCount) == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
×
UNCOV
1009
      qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
×
1010
             pWinKey->win.ekey, (*pWinCount));
UNCOV
1011
      if ((*pWinCount) == TSDB_CODE_SUCCESS) {
×
UNCOV
1012
        int32_t     valSize = *pVLen;
×
UNCOV
1013
        COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)((char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)));
×
UNCOV
1014
        if (inSessionWindow(pWinKey, startTs, gap) || (*pWinStateCout) < winCount) {
×
UNCOV
1015
          (*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
×
UNCOV
1016
          if (!(*pVal)) {
×
1017
            code = TSDB_CODE_OUT_OF_MEMORY;
×
1018
            QUERY_CHECK_CODE(code, lino, _end);
×
1019
          }
1020

UNCOV
1021
          goto _end;
×
1022
        }
1023
      }
UNCOV
1024
      pWinKey->win.skey = startTs;
×
UNCOV
1025
      pWinKey->win.ekey = endTs;
×
UNCOV
1026
      (*pVal) = createSessionWinBuff(pFileState, pWinKey, NULL, NULL);
×
UNCOV
1027
      taosMemoryFree(pRockVal);
×
UNCOV
1028
      if (!(*pVal)) {
×
1029
        code = TSDB_CODE_OUT_OF_MEMORY;
×
1030
        QUERY_CHECK_CODE(code, lino, _end);
×
1031
      }
1032
    } else {
UNCOV
1033
      code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
×
UNCOV
1034
      QUERY_CHECK_CODE(code, lino, _end);
×
1035

UNCOV
1036
      (*pWinCount) = TSDB_CODE_FAILED;
×
1037
    }
UNCOV
1038
    goto _end;
×
1039
  }
1040

1041
  // find the first position which is smaller than the pWinKey
UNCOV
1042
  int32_t      index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
×
UNCOV
1043
  SRowBuffPos* pPos = NULL;
×
UNCOV
1044
  int32_t      valSize = *pVLen;
×
1045

UNCOV
1046
  if (index >= 0) {
×
UNCOV
1047
    pPos = taosArrayGetP(pWinStates, index);
×
UNCOV
1048
    COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)((char*)(pPos->pRowBuff) + (valSize - sizeof(COUNT_TYPE)));
×
UNCOV
1049
    if (inSessionWindow(pPos->pKey, startTs, gap) || (index == size - 1 && (*pWinStateCout) < winCount)) {
×
UNCOV
1050
      (*pVal) = pPos;
×
UNCOV
1051
      SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
×
UNCOV
1052
      pPos->beUsed = true;
×
UNCOV
1053
      pPos->beFlushed = false;
×
UNCOV
1054
      *pWinKey = *pDestWinKey;
×
UNCOV
1055
      goto _end;
×
1056
    }
1057
  }
1058

UNCOV
1059
  if (index == -1) {
×
UNCOV
1060
    if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs, 0)) {
×
1061
      SSessionKey tmpKey = *pWinKey;
×
1062
      void*       pRockVal = NULL;
×
1063
      void*       pFileStore = getStateFileStore(pFileState);
×
1064
      int32_t     code_file = getCountWinStateFromDisc(pFileStore, &tmpKey, &pRockVal, pVLen);
×
1065
      if (code_file == TSDB_CODE_SUCCESS) {
×
1066
        SRowBuffPos* pFirstPos = taosArrayGetP(pWinStates, 0);
×
1067
        SSessionKey* pFirstWinKey = (SSessionKey*)pFirstPos->pKey;
×
1068
        if (tmpKey.win.ekey < pFirstWinKey->win.skey) {
×
1069
          *pWinKey = tmpKey;
×
1070
          (*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
×
1071
          if (!(*pVal)) {
×
1072
            code = TSDB_CODE_OUT_OF_MEMORY;
×
1073
            QUERY_CHECK_CODE(code, lino, _end);
×
1074
          }
1075

1076
          (*pWinCount) = code_file;
×
1077
          qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
×
1078
                 pWinKey->win.ekey, code_file);
1079
          goto _end;
×
1080
        }
1081
      }
1082
      taosMemoryFree(pRockVal);
×
1083
    }
1084
  }
1085

UNCOV
1086
  if (index + 1 < size) {
×
UNCOV
1087
    pPos = taosArrayGetP(pWinStates, index + 1);
×
UNCOV
1088
    (*pVal) = pPos;
×
UNCOV
1089
    SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
×
UNCOV
1090
    pPos->beUsed = true;
×
UNCOV
1091
    pPos->beFlushed = false;
×
UNCOV
1092
    *pWinKey = *pDestWinKey;
×
UNCOV
1093
    goto _end;
×
1094
  }
1095

UNCOV
1096
  code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
×
UNCOV
1097
  QUERY_CHECK_CODE(code, lino, _end);
×
1098

UNCOV
1099
  (*pWinCount) = TSDB_CODE_FAILED;
×
1100

UNCOV
1101
_end:
×
UNCOV
1102
  if (code != TSDB_CODE_SUCCESS) {
×
1103
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1104
  }
UNCOV
1105
  return code;
×
1106
}
1107

UNCOV
1108
int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal,
×
1109
                                 int32_t* pVLen) {
UNCOV
1110
  SSessionKey* pWinKey = pKey;
×
UNCOV
1111
  const TSKEY  gap = 0;
×
UNCOV
1112
  int32_t      code = TSDB_CODE_SUCCESS;
×
UNCOV
1113
  int32_t      lino = 0;
×
UNCOV
1114
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
×
UNCOV
1115
  SArray*      pWinStates = NULL;
×
UNCOV
1116
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
×
UNCOV
1117
  if (ppBuff) {
×
UNCOV
1118
    pWinStates = (SArray*)(*ppBuff);
×
1119
  } else {
UNCOV
1120
    pWinStates = taosArrayInit(16, POINTER_BYTES);
×
UNCOV
1121
    code = tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
×
UNCOV
1122
    QUERY_CHECK_CODE(code, lino, _end);
×
1123
  }
1124

UNCOV
1125
  TSKEY startTs = pWinKey->win.skey;
×
UNCOV
1126
  TSKEY endTs = pWinKey->win.ekey;
×
1127

UNCOV
1128
  int32_t size = taosArrayGetSize(pWinStates);
×
UNCOV
1129
  if (size == 0) {
×
UNCOV
1130
    void* pFileStore = getStateFileStore(pFileState);
×
UNCOV
1131
    void* pRockVal = NULL;
×
1132

UNCOV
1133
    int32_t code_file = getCountWinStateFromDisc(pFileStore, pWinKey, &pRockVal, pVLen);
×
UNCOV
1134
    if (code_file == TSDB_CODE_SUCCESS && isFlushedState(pFileState, endTs, 0)) {
×
1135
      int32_t     valSize = *pVLen;
×
1136
      COUNT_TYPE* pWinStateCount = (COUNT_TYPE*)((char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)));
×
1137
      if ((*pWinStateCount) == winCount) {
×
1138
        code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
×
1139
        QUERY_CHECK_CODE(code, lino, _end);
×
1140
      } else {
1141
        (*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
×
1142
        if (!(*pVal)) {
×
1143
          code = TSDB_CODE_OUT_OF_MEMORY;
×
1144
          QUERY_CHECK_CODE(code, lino, _end);
×
1145
        }
1146
        qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
×
1147
               pWinKey->win.ekey, code_file);
1148
      }
1149
    } else {
UNCOV
1150
      code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
×
UNCOV
1151
      taosMemoryFree(pRockVal);
×
UNCOV
1152
      QUERY_CHECK_CODE(code, lino, _end);
×
1153
    }
1154
  } else {
UNCOV
1155
    code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
×
UNCOV
1156
    QUERY_CHECK_CODE(code, lino, _end);
×
1157
  }
1158

UNCOV
1159
_end:
×
UNCOV
1160
  if (code != TSDB_CODE_SUCCESS) {
×
1161
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1162
  }
UNCOV
1163
  return code;
×
1164
}
1165

UNCOV
1166
static int compareRangeKey(const void* pKey, const void* data, int index) {
×
UNCOV
1167
  SScanRange* pRange1 = (SScanRange*) pKey;
×
UNCOV
1168
  SScanRange* pRange2 = taosArrayGet((SArray*)data, index);
×
UNCOV
1169
  if (pRange1->win.skey > pRange2->win.skey) {
×
UNCOV
1170
    return 1;
×
1171
  } else if (pRange1->win.skey < pRange2->win.skey) {
×
1172
    return -1;
×
1173
  }
1174

1175
  if (pRange1->win.ekey > pRange2->win.ekey) {
×
1176
    return 1;
×
1177
  } else if (pRange1->win.ekey < pRange2->win.ekey) {
×
1178
    return -1;
×
1179
  }
1180

1181
  return 0;
×
1182
}
1183

UNCOV
1184
static int32_t scanRangeKeyCmpr(const SScanRange* pRange1, const SScanRange* pRange2) {
×
UNCOV
1185
  if (pRange1->win.skey > pRange2->win.ekey) {
×
1186
    return 1;
×
UNCOV
1187
  } else if (pRange1->win.ekey < pRange2->win.skey) {
×
UNCOV
1188
    return -1;
×
1189
  }
1190

1191
  return 0;
×
1192
}
1193

UNCOV
1194
static int32_t putRangeIdInfo(SScanRange* pRangeKey, uint64_t gpId, uint64_t uId) {
×
UNCOV
1195
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1196
  int32_t lino = 0;
×
1197

UNCOV
1198
  code = tSimpleHashPut(pRangeKey->pUIds, &uId, sizeof(uint64_t), NULL, 0);
×
UNCOV
1199
  QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
1200
  code = tSimpleHashPut(pRangeKey->pGroupIds, &gpId, sizeof(uint64_t), NULL, 0);
×
UNCOV
1201
  QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
1202
_end:
×
UNCOV
1203
  if (code != TSDB_CODE_SUCCESS) {
×
1204
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1205
  }
UNCOV
1206
  return code;
×
1207
}
1208

1209
void mergeRangeKey(SScanRange* pRangeDest, SScanRange* pRangeSrc) {
×
1210
  pRangeDest->win.skey = TMIN(pRangeDest->win.skey, pRangeSrc->win.skey);
×
1211
  pRangeDest->win.ekey = TMAX(pRangeDest->win.ekey, pRangeSrc->win.ekey);
×
1212
  pRangeDest->calWin.skey = TMIN(pRangeDest->calWin.skey, pRangeSrc->calWin.skey);
×
1213
  pRangeDest->calWin.ekey = TMAX(pRangeDest->calWin.ekey, pRangeSrc->calWin.ekey);
×
1214
}
×
1215

UNCOV
1216
int32_t mergeScanRange(SArray* pRangeArray, SScanRange* pRangeKey, uint64_t gpId, uint64_t uId, int32_t* pIndex, bool* pRes, char* idStr) {
×
UNCOV
1217
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1218
  int32_t lino = 0;
×
UNCOV
1219
  int32_t size = taosArrayGetSize(pRangeArray);
×
UNCOV
1220
  int32_t index = binarySearch(pRangeArray, size, pRangeKey, compareRangeKey);
×
UNCOV
1221
  if (index >= 0) {
×
UNCOV
1222
    SScanRange* pFindRangeKey = (SScanRange*) taosArrayGet(pRangeArray, index);
×
UNCOV
1223
    if (scanRangeKeyCmpr(pFindRangeKey, pRangeKey) == 0) {
×
1224
      mergeRangeKey(pFindRangeKey, pRangeKey);
×
1225
      code = putRangeIdInfo(pFindRangeKey, gpId, uId);
×
1226
      QUERY_CHECK_CODE(code, lino, _end);
×
1227
      *pRes = true;
×
1228
      goto _end;
×
1229
    }
1230
  }
UNCOV
1231
  if (index + 1 < size) {
×
1232
    SScanRange* pFindRangeKey = (SScanRange*) taosArrayGet(pRangeArray, index + 1);
×
1233
    if (scanRangeKeyCmpr(pFindRangeKey, pRangeKey) == 0) {
×
1234
      mergeRangeKey(pFindRangeKey, pRangeKey);
×
1235
      code = putRangeIdInfo(pFindRangeKey, gpId, uId);
×
1236
      QUERY_CHECK_CODE(code, lino, _end);
×
1237
      *pRes = true;
×
1238
      goto _end;
×
1239
    }
1240
  }
UNCOV
1241
  (*pIndex) = index;
×
UNCOV
1242
  *pRes = false;
×
1243

UNCOV
1244
_end:
×
UNCOV
1245
  qDebug("===stream===%s mergeScanRange start ts:%" PRId64 ",end ts:%" PRId64 ",group id:%" PRIu64 ", uid:%" PRIu64
×
1246
         ", res:%d", idStr, pRangeKey->win.skey, pRangeKey->win.ekey, gpId, uId, *pRes);
UNCOV
1247
  if (code != TSDB_CODE_SUCCESS) {
×
1248
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1249
  }
UNCOV
1250
  return code;
×
1251
}
1252

UNCOV
1253
int32_t mergeAndSaveScanRange(STableTsDataState* pTsDataState, STimeWindow* pWin, uint64_t gpId, SRecDataInfo* pRecData, int32_t recLen) {
×
UNCOV
1254
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1255
  int32_t lino = 0;
×
1256

UNCOV
1257
  SArray* pRangeArray = pTsDataState->pScanRanges;
×
UNCOV
1258
  uint64_t uId = pRecData->tableUid;
×
1259

UNCOV
1260
  int32_t index = 0;
×
UNCOV
1261
  bool merge = false;
×
UNCOV
1262
  SScanRange rangeKey = {.win = *pWin, .calWin = pRecData->calWin, .pUIds = NULL, .pGroupIds = NULL};
×
UNCOV
1263
  code = mergeScanRange(pRangeArray, &rangeKey, gpId, uId, &index, &merge, pTsDataState->pState->pTaskIdStr);
×
UNCOV
1264
  QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
1265
  if (merge == true) {
×
1266
    goto _end;
×
1267
  }
1268

UNCOV
1269
  int32_t size = taosArrayGetSize(pRangeArray);
×
UNCOV
1270
  if (size > MAX_SCAN_RANGE_SIZE) {
×
1271
    SSessionKey sesKey = {.win = *pWin, .groupId = gpId};
×
1272
    void* pVal = NULL;
×
1273
    int32_t len = 0;
×
1274
    int32_t winCode = streamStateSessionGet_rocksdb(pTsDataState->pState, &sesKey, &pVal, &len);
×
1275
    if (winCode != TSDB_CODE_SUCCESS) {
×
1276
      code = streamStateSessionPut_rocksdb(pTsDataState->pState, &sesKey, &uId, sizeof(uint64_t));
×
1277
    } else {
1278
      char* pTempBuf = taosMemoryRealloc(pVal, len + sizeof(uint64_t));
×
1279
      QUERY_CHECK_NULL(pTempBuf, code, lino, _end, terrno);
×
1280
      memcpy(pTempBuf+len, &uId, sizeof(uint64_t));
×
1281
      code = streamStateSessionPut_rocksdb(pTsDataState->pState, &sesKey, pTempBuf, len + sizeof(uint64_t));
×
1282
      taosMemFreeClear(pTempBuf);
×
1283
    }
1284
    QUERY_CHECK_CODE(code, lino, _end);
×
1285
    goto _end;
×
1286
  }
UNCOV
1287
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
×
UNCOV
1288
  rangeKey.pGroupIds = tSimpleHashInit(8, hashFn);
×
UNCOV
1289
  rangeKey.pUIds = tSimpleHashInit(8, hashFn);
×
UNCOV
1290
  code = putRangeIdInfo(&rangeKey, gpId, uId);
×
UNCOV
1291
  QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
1292
  index++;
×
UNCOV
1293
  taosArrayInsert(pRangeArray, index, &rangeKey);
×
1294

UNCOV
1295
_end:
×
UNCOV
1296
  if (code != TSDB_CODE_SUCCESS) {
×
1297
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1298
  }
UNCOV
1299
  return code;
×
1300
}
1301

1302
int32_t mergeSimpleHashMap(SSHashObj* pDest, SSHashObj* pSource) {
×
1303
  int32_t code = TSDB_CODE_SUCCESS;
×
1304
  int32_t lino = 0;
×
1305
  void*   pIte = NULL;
×
1306
  int32_t iter = 0;
×
1307
  while ((pIte = tSimpleHashIterate(pSource, pIte, &iter)) != NULL) {
×
1308
    size_t keyLen = 0;
×
1309
    void* pKey = tSimpleHashGetKey(pIte, &keyLen);
×
1310
    code = tSimpleHashPut(pDest, pKey, keyLen, pIte, sizeof(uint64_t));
×
1311
    QUERY_CHECK_CODE(code, lino, _end);
×
1312
  }
1313
  tSimpleHashCleanup(pSource);
×
1314

1315
_end:
×
1316
  if (code != TSDB_CODE_SUCCESS) {
×
1317
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1318
  }
1319
  return code;
×
1320
}
1321

UNCOV
1322
int32_t mergeAllScanRange(STableTsDataState* pTsDataState) {
×
UNCOV
1323
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1324
  int32_t lino = 0;
×
UNCOV
1325
  SStreamStateCur* pCur = NULL;
×
UNCOV
1326
  SArray* pRangeArray = pTsDataState->pScanRanges;
×
UNCOV
1327
  if (taosArrayGetSize(pRangeArray) < 2) {
×
UNCOV
1328
    return code;
×
1329
  }
1330

UNCOV
1331
  for (int32_t i = 0; i < taosArrayGetSize(pRangeArray) - 1;) {
×
UNCOV
1332
    SScanRange* pCurRange = taosArrayGet(pRangeArray, i);
×
UNCOV
1333
    SScanRange* pNextRange = taosArrayGet(pRangeArray, i + 1);
×
UNCOV
1334
    if (scanRangeKeyCmpr(pCurRange, pNextRange) == 0) {
×
1335
      pCurRange->win.skey = TMIN(pCurRange->win.skey, pNextRange->win.skey);
×
1336
      pCurRange->win.ekey = TMAX(pCurRange->win.ekey, pNextRange->win.ekey);
×
1337
      code = mergeSimpleHashMap(pCurRange->pGroupIds, pNextRange->pGroupIds);
×
1338
      QUERY_CHECK_CODE(code, lino, _end);
×
1339
      code = mergeSimpleHashMap(pCurRange->pUIds, pNextRange->pUIds);
×
1340
      QUERY_CHECK_CODE(code, lino, _end);
×
1341
      taosArrayRemove(pRangeArray, i+1);
×
1342
      continue;
×
1343
    }
UNCOV
1344
    i++;
×
1345
  }
1346

UNCOV
1347
  int32_t winRes = TSDB_CODE_SUCCESS;
×
UNCOV
1348
  pCur = streamStateSessionSeekToLast_rocksdb(pTsDataState->pState, INT64_MAX);
×
UNCOV
1349
  while (winRes == TSDB_CODE_SUCCESS) {
×
UNCOV
1350
    void*       pVal = NULL;
×
UNCOV
1351
    int32_t     vlen = 0;
×
UNCOV
1352
    SSessionKey key = {0};
×
UNCOV
1353
    winRes = streamStateSessionGetKVByCur_rocksdb(pTsDataState->pState, pCur, &key, &pVal, &vlen);
×
UNCOV
1354
    if (winRes != TSDB_CODE_SUCCESS) {
×
UNCOV
1355
      break;
×
1356
    }
1357
    int32_t index = 0;
×
1358
    bool merge = false;
×
1359
    SScanRange tmpRange = {.win = key.win, .pUIds = NULL, .pGroupIds = NULL};
×
1360
    int32_t num = vlen / sizeof(uint64_t);
×
1361
    uint64_t* pUids = (uint64_t*) pVal;
×
1362
    for (int32_t i = 0; i < num; i++) {
×
1363
      code = mergeScanRange(pRangeArray, &tmpRange, key.groupId, pUids[i], &index, &merge, pTsDataState->pState->pTaskIdStr);
×
1364
      QUERY_CHECK_CODE(code, lino, _end);
×
1365
    }
1366
    if (merge == true) {
×
1367
      code = streamStateSessionDel_rocksdb(pTsDataState->pState, &key);
×
1368
      QUERY_CHECK_CODE(code, lino, _end);
×
1369
    }
1370
  }
1371

1372
_end:
×
UNCOV
1373
  streamStateFreeCur(pCur);
×
UNCOV
1374
  if (code != TSDB_CODE_SUCCESS) {
×
1375
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1376
  }
UNCOV
1377
  return code;
×
1378
}
1379

UNCOV
1380
int32_t popScanRange(STableTsDataState* pTsDataState, SScanRange* pRange) {
×
UNCOV
1381
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1382
  int32_t lino = 0;
×
UNCOV
1383
  SStreamStateCur* pCur = NULL;
×
UNCOV
1384
  SArray* pRangeArray = pTsDataState->pScanRanges;
×
UNCOV
1385
  if (taosArrayGetSize(pRangeArray) > 0) {
×
UNCOV
1386
    (*pRange) = *(SScanRange*) taosArrayGet(pRangeArray, 0);
×
UNCOV
1387
    taosArrayRemove(pRangeArray, 0);
×
UNCOV
1388
    goto _end;
×
1389
  }
1390

1391
  {
UNCOV
1392
    pCur = streamStateSessionSeekToLast_rocksdb(pTsDataState->pState, INT64_MAX);
×
UNCOV
1393
    SRecDataInfo* pRecVal = NULL;
×
UNCOV
1394
    int32_t       vlen = 0;
×
UNCOV
1395
    SSessionKey   key = {0};
×
UNCOV
1396
    int32_t winRes = streamStateSessionGetKVByCur_rocksdb(pTsDataState->pState, pCur, &key, (void**)&pRecVal, &vlen);
×
UNCOV
1397
    if (winRes != TSDB_CODE_SUCCESS) {
×
UNCOV
1398
      goto _end;
×
1399
    }
1400
    qDebug("===stream===get scan range from disc start ts:%" PRId64 ",end ts:%" PRId64 ",group id:%" PRIu64,
×
1401
           key.win.skey, key.win.ekey, key.groupId);
1402

1403
    pRange->win = key.win;
×
1404
    pRange->calWin = pRecVal->calWin;
×
1405
    _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
×
1406
    if (pRange->pGroupIds == NULL) {
×
1407
      pRange->pGroupIds = tSimpleHashInit(8, hashFn);
×
1408
    }
1409
    if (pRange->pUIds == NULL) {
×
1410
      pRange->pUIds = tSimpleHashInit(8, hashFn);
×
1411
    }
1412
    code = putRangeIdInfo(pRange, key.groupId, pRecVal->tableUid);
×
1413
    QUERY_CHECK_CODE(code, lino, _end);
×
1414
    code = streamStateSessionDel_rocksdb(pTsDataState->pState, &key);
×
1415
    QUERY_CHECK_CODE(code, lino, _end);
×
1416
  }
1417

UNCOV
1418
_end:
×
UNCOV
1419
  streamStateFreeCur(pCur);
×
UNCOV
1420
  if (code != TSDB_CODE_SUCCESS) {
×
1421
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1422
  }
UNCOV
1423
  return code;
×
1424
}
1425

1426
int32_t getNLastSessionStateKVByCur(SStreamStateCur* pCur, int32_t num, SArray* pRes) {
×
1427
  int32_t code = TSDB_CODE_SUCCESS;
×
1428
  int32_t lino = 0;
×
1429
  if (pCur->pHashData == NULL) {
×
1430
    return TSDB_CODE_FAILED;
×
1431
  }
1432
  SArray*  pWinStates = *((void**)pCur->pHashData);
×
1433
  int32_t size = taosArrayGetSize(pWinStates);
×
1434
  if (size == 0) {
×
1435
    return TSDB_CODE_FAILED;
×
1436
  }
1437

1438
  int32_t i = TMAX(size - num, 0);
×
1439

1440
  for ( ; i < size; i++) {
×
1441
    SRowBuffPos* pPos = taosArrayGetP(pWinStates, i);
×
1442
    QUERY_CHECK_NULL(pPos, code, lino, _end, terrno);
×
1443

1444
    SResultWindowInfo winInfo = {0};
×
1445
    winInfo.pStatePos = pPos;
×
1446
    winInfo.sessionWin = *(SSessionKey*)pPos->pKey;
×
1447

1448
    void* pTempRes = taosArrayPush(pRes, &winInfo);
×
1449
    QUERY_CHECK_NULL(pTempRes, code, lino, _end, terrno);
×
1450
  }
1451

1452
_end:
×
1453
  if (code != TSDB_CODE_SUCCESS) {
×
1454
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1455
  }
1456
  return code;
×
1457
}
1458

1459
bool hasSessionState(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, bool* pIsLast) {
×
1460
  SSHashObj*   pRowStateBuff = getRowStateBuff(pFileState);
×
1461
  void**       ppBuff = (void**)tSimpleHashGet(pRowStateBuff, &pKey->groupId, sizeof(uint64_t));
×
1462
  if (ppBuff == NULL) {
×
1463
    return false;
×
1464
  }
1465
  SArray*      pWinStates = (SArray*)(*ppBuff);
×
1466
  int32_t      size = taosArrayGetSize(pWinStates);
×
1467
  int32_t      index = binarySearch(pWinStates, size, pKey, sessionStateKeyCompare);
×
1468
  SRowBuffPos* pPos = NULL;
×
1469

1470
  if (index >= 0) {
×
1471
    pPos = taosArrayGetP(pWinStates, index);
×
1472
    if (inSessionWindow(pPos->pKey, pKey->win.skey, gap)) {
×
1473
      *pKey = *((SSessionKey*)pPos->pKey);
×
1474
      *pIsLast = (index == size - 1);
×
1475
      return true;
×
1476
    }
1477
  }
1478

1479
  if (index + 1 < size) {
×
1480
    pPos = taosArrayGetP(pWinStates, index + 1);
×
1481
    if (inSessionWindow(pPos->pKey, pKey->win.skey, gap) ||
×
1482
        (pKey->win.ekey != INT64_MIN && inSessionWindow(pPos->pKey, pKey->win.ekey, gap))) {
×
1483
      *pKey = *((SSessionKey*)pPos->pKey);
×
1484
      *pIsLast = (index + 1 == size - 1);
×
1485
      return true;
×
1486
    }
1487
  }
1488
  return false;
×
1489
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc