• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.62
/source/libs/stream/src/tstreamFileState.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tstreamFileState.h"
17

18
#include "query.h"
19
#include "streamBackendRocksdb.h"
20
#include "taos.h"
21
#include "tcommon.h"
22
#include "tcompare.h"
23
#include "thash.h"
24
#include "tsimplehash.h"
25

26
#define FLUSH_RATIO                    0.5
27
#define FLUSH_NUM                      4
28
#define DEFAULT_MAX_STREAM_BUFFER_SIZE (128 * 1024 * 1024)
29
#define MIN_NUM_OF_ROW_BUFF            10240
30
#define MIN_NUM_OF_RECOVER_ROW_BUFF    128
31
#define MIN_NUM_SEARCH_BUCKET          128
32
#define MAX_ARRAY_SIZE                 1024
33
#define MAX_GROUP_ID_NUM               200000
34
#define NUM_OF_CACHE_WIN               64
35
#define MAX_NUM_OF_CACHE_WIN           128
36
#define MIN_NUM_OF_SORT_CACHE_WIN      40960
37
#define BATCH_LIMIT                    256
38

39
#define DEFAULT_STATE_MAP_CAPACITY 10240
40
#define MAX_STATE_MAP_SIZE         10240000
41

42
#define SET_TSDATA_FLAG(ptr, len)   ((*(char*)POINTER_SHIFT(ptr, (len - 1))) |= 1)
43
#define UNSET_TSDATA_FLAG(ptr, len) ((*(char*)POINTER_SHIFT(ptr, (len - 1))) &= 0)
44
#define HAS_TSDATA_FLAG(ptr, len)   ((*(char*)POINTER_SHIFT(ptr, (len - 1))) & 1)
45

46
#define TASK_KEY               "streamFileState"
47
#define STREAM_STATE_INFO_NAME "StreamStateCheckPoint"
48

49
struct SStreamFileState {
50
  SList*     usedBuffs;
51
  SList*     freeBuffs;
52
  void*      rowStateBuff;
53
  void*      pFileStore;
54
  int32_t    rowSize;
55
  int32_t    selectivityRowSize;
56
  int32_t    keyLen;
57
  uint64_t   preCheckPointVersion;
58
  uint64_t   checkPointVersion;
59
  TSKEY      maxTs;
60
  TSKEY      deleteMark;
61
  TSKEY      flushMark;
62
  uint64_t   maxRowCount;
63
  uint64_t   curRowCount;
64
  GetTsFun   getTs;
65
  char*      id;
66
  char*      cfName;
67
  void*      searchBuff;
68
  SSHashObj* pGroupIdMap;
69
  bool       hasFillCatch;
70
  SSHashObj* pRecFlagMap;
71

72
  _state_buff_cleanup_fn         stateBuffCleanupFn;
73
  _state_buff_remove_fn          stateBuffRemoveFn;
74
  _state_buff_remove_by_pos_fn   stateBuffRemoveByPosFn;
75
  _state_buff_create_statekey_fn stateBuffCreateStateKeyFn;
76

77
  _state_file_remove_fn stateFileRemoveFn;
78
  _state_file_get_fn    stateFileGetFn;
79

80
  _state_fun_get_fn stateFunctionGetFn;
81
};
82

83
typedef SRowBuffPos SRowBuffInfo;
84

85
int fillStateKeyCompare(const void* pWin1, const void* pDatas, int pos) {
5,159✔
86
  SWinKey* pWin2 = taosArrayGet(pDatas, pos);
5,159✔
87
  return winKeyCmprImpl((SWinKey*)pWin1, pWin2);
5,159✔
88
}
89

90
int fillTSKeyCompare(const void* pKey1, const void* pDatas, int pos) {
3,882✔
91
  SWinKey* pWin1 = (SWinKey*)pKey1;
3,882✔
92
  SWinKey* pWin2 = taosArrayGet(pDatas, pos);
3,882✔
93
  if (pWin1->ts > pWin2->ts) {
3,882✔
94
    return 1;
1,292✔
95
  } else if (pWin1->ts < pWin2->ts) {
2,590✔
96
    return -1;
751✔
97
  }
98

99
  return 0;
1,839✔
100
}
101

102
int32_t stateHashBuffRemoveFn(void* pBuff, const void* pKey, size_t keyLen) {
4,055✔
103
  SRowBuffPos** pos = tSimpleHashGet(pBuff, pKey, keyLen);
4,055✔
104
  if (pos) {
4,058✔
105
    (*pos)->beFlushed = true;
2,409✔
106
    (*pos)->invalid = true;
2,409✔
107
  }
108
  return tSimpleHashRemove(pBuff, pKey, keyLen);
4,058✔
109
}
110

111
void stateHashBuffRemoveByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) {
6,390,333✔
112
  size_t        keyLen = pFileState->keyLen;
6,390,333✔
113
  SRowBuffPos** ppPos = tSimpleHashGet(pFileState->rowStateBuff, pPos->pKey, keyLen);
6,390,333✔
114
  if (ppPos) {
6,390,333✔
115
    if ((*ppPos) == pPos) {
6,390,251!
116
      int32_t tmpRes = tSimpleHashRemove(pFileState->rowStateBuff, pPos->pKey, keyLen);
6,390,251✔
117
      qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
6,390,251✔
118
    }
119
  }
120
}
6,390,333✔
121

UNCOV
122
void stateHashBuffClearFn(void* pBuff) { tSimpleHashClear(pBuff); }
×
123

124
void stateHashBuffCleanupFn(void* pBuff) { tSimpleHashCleanup(pBuff); }
4,392✔
125

126
int32_t intervalFileRemoveFn(SStreamFileState* pFileState, const void* pKey) {
3,574✔
127
  return streamStateDel_rocksdb(pFileState->pFileStore, pKey);
3,574✔
128
}
129

130
int32_t intervalFileGetFn(SStreamFileState* pFileState, void* pKey, void** data, int32_t* pDataLen) {
280,921✔
131
  return streamStateGet_rocksdb(pFileState->pFileStore, pKey, data, pDataLen);
280,921✔
132
}
133

134
void* intervalCreateStateKey(SRowBuffPos* pPos, int64_t num) {
2,030,620✔
135
  SStateKey* pStateKey = taosMemoryCalloc(1, sizeof(SStateKey));
2,030,620!
136
  if (pStateKey == NULL) {
2,030,645!
UNCOV
137
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
138
    return NULL;
×
139
  }
140
  SWinKey* pWinKey = pPos->pKey;
2,030,645✔
141
  pStateKey->key = *pWinKey;
2,030,645✔
142
  pStateKey->opNum = num;
2,030,645✔
143
  return pStateKey;
2,030,645✔
144
}
145

146
void* defaultCreateStateKey(SRowBuffPos* pPos, int64_t num) {
18✔
147
  SWinKey* pStateKey = taosMemoryCalloc(1, sizeof(SWinKey));
18!
148
  if (pStateKey == NULL) {
18!
UNCOV
149
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
150
    return NULL;
×
151
  }
152
  SWinKey* pWinKey = pPos->pKey;
18✔
153
  *pStateKey = *pWinKey;
18✔
154
  return pStateKey;
18✔
155
}
156

157
int32_t sessionFileRemoveFn(SStreamFileState* pFileState, const void* pKey) {
2,084✔
158
  return streamStateSessionDel_rocksdb(pFileState->pFileStore, pKey);
2,084✔
159
}
160

161
int32_t sessionFileGetFn(SStreamFileState* pFileState, void* pKey, void** data, int32_t* pDataLen) {
335✔
162
  return streamStateSessionGet_rocksdb(pFileState->pFileStore, pKey, data, pDataLen);
335✔
163
}
164

165
void* sessionCreateStateKey(SRowBuffPos* pPos, int64_t num) {
1,030✔
166
  SStateSessionKey* pStateKey = taosMemoryCalloc(1, sizeof(SStateSessionKey));
1,030!
167
  if (pStateKey == NULL) {
1,030!
UNCOV
168
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
169
    return NULL;
×
170
  }
171
  SSessionKey* pWinKey = pPos->pKey;
1,030✔
172
  pStateKey->key = *pWinKey;
1,030✔
173
  pStateKey->opNum = num;
1,030✔
174
  return pStateKey;
1,030✔
175
}
176

177
static void streamFileStateDecode(TSKEY* pKey, void* pBuff, int32_t len) { pBuff = taosDecodeFixedI64(pBuff, pKey); }
34!
178

179
static int32_t streamFileStateEncode(TSKEY* pKey, void** pVal, int32_t* pLen) {
1,172✔
180
  *pLen = sizeof(TSKEY);
1,172✔
181
  (*pVal) = taosMemoryCalloc(1, *pLen);
1,172!
182
  if ((*pVal) == NULL) {
1,172!
UNCOV
183
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
184
    return terrno;
×
185
  }
186
  void*   buff = *pVal;
1,172✔
187
  int32_t tmp = taosEncodeFixedI64(&buff, *pKey);
1,172!
188
  return TSDB_CODE_SUCCESS;
1,172✔
189
}
190

191
int32_t streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, GetTsFun fp,
6,186✔
192
                            void* pFile, TSKEY delMark, const char* taskId, int64_t checkpointId, int8_t type,
193
                            SStreamFileState** ppFileState) {
194
  int32_t code = TSDB_CODE_SUCCESS;
6,186✔
195
  int32_t lino = 0;
6,186✔
196
  if (memSize <= 0) {
6,186!
UNCOV
197
    memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE;
×
198
  }
199
  if (rowSize == 0) {
6,186!
UNCOV
200
    code = TSDB_CODE_INVALID_PARA;
×
UNCOV
201
    QUERY_CHECK_CODE(code, lino, _end);
×
202
  }
203

204
  SStreamFileState* pFileState = taosMemoryCalloc(1, sizeof(SStreamFileState));
6,186!
205
  QUERY_CHECK_NULL(pFileState, code, lino, _end, terrno);
6,187!
206

207
  rowSize += selectRowSize;
6,187✔
208
  pFileState->maxRowCount = TMAX((uint64_t)memSize / rowSize, FLUSH_NUM * 2);
6,187✔
209
  pFileState->usedBuffs = tdListNew(POINTER_BYTES);
6,187✔
210
  QUERY_CHECK_NULL(pFileState->usedBuffs, code, lino, _end, terrno);
6,187!
211

212
  pFileState->freeBuffs = tdListNew(POINTER_BYTES);
6,187✔
213
  QUERY_CHECK_NULL(pFileState->freeBuffs, code, lino, _end, terrno);
6,189!
214

215
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
6,189✔
216
  int32_t    cap = TMIN(MIN_NUM_OF_ROW_BUFF, pFileState->maxRowCount);
6,188✔
217
  if (type == STREAM_STATE_BUFF_HASH || type == STREAM_STATE_BUFF_HASH_SEARCH) {
6,188✔
218
    pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
4,130✔
219
    pFileState->stateBuffCleanupFn = stateHashBuffCleanupFn;
4,128✔
220
    pFileState->stateBuffRemoveFn = stateHashBuffRemoveFn;
4,128✔
221
    pFileState->stateBuffRemoveByPosFn = stateHashBuffRemoveByPosFn;
4,128✔
222
    pFileState->stateBuffCreateStateKeyFn = intervalCreateStateKey;
4,128✔
223

224
    pFileState->stateFileRemoveFn = intervalFileRemoveFn;
4,128✔
225
    pFileState->stateFileGetFn = intervalFileGetFn;
4,128✔
226
    pFileState->cfName = taosStrdup("state");
4,128!
227
    pFileState->stateFunctionGetFn = addRowBuffIfNotExist;
4,128✔
228
  } else if (type == STREAM_STATE_BUFF_SORT) {
2,058✔
229
    pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
1,793✔
230
    pFileState->stateBuffCleanupFn = sessionWinStateCleanup;
1,797✔
231
    pFileState->stateBuffRemoveFn = deleteSessionWinStateBuffFn;
1,797✔
232
    pFileState->stateBuffRemoveByPosFn = deleteSessionWinStateBuffByPosFn;
1,797✔
233
    pFileState->stateBuffCreateStateKeyFn = sessionCreateStateKey;
1,797✔
234

235
    pFileState->stateFileRemoveFn = sessionFileRemoveFn;
1,797✔
236
    pFileState->stateFileGetFn = sessionFileGetFn;
1,797✔
237
    pFileState->cfName = taosStrdup("sess");
1,797!
238
    pFileState->stateFunctionGetFn = getSessionRowBuff;
1,796✔
239
  } else if (type == STREAM_STATE_BUFF_HASH_SORT) {
265!
240
    pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
265✔
241
    pFileState->searchBuff = tSimpleHashInit(MIN_NUM_SEARCH_BUCKET, hashFn);
265✔
242
    QUERY_CHECK_NULL(pFileState->searchBuff, code, lino, _end, terrno);
265!
243
    pFileState->stateBuffCleanupFn = stateHashBuffCleanupFn;
265✔
244
    pFileState->stateBuffRemoveFn = stateHashBuffRemoveFn;
265✔
245
    pFileState->stateBuffRemoveByPosFn = stateHashBuffRemoveByPosFn;
265✔
246
    pFileState->stateBuffCreateStateKeyFn = defaultCreateStateKey;
265✔
247

248
    pFileState->stateFileRemoveFn = hashSortFileRemoveFn;
265✔
249
    pFileState->stateFileGetFn = hashSortFileGetFn;
265✔
250
    pFileState->cfName = taosStrdup("fill");
265!
251
    pFileState->stateFunctionGetFn = NULL;
265✔
252
  }
253

254
  QUERY_CHECK_NULL(pFileState->usedBuffs, code, lino, _end, terrno);
6,189!
255
  QUERY_CHECK_NULL(pFileState->freeBuffs, code, lino, _end, terrno);
6,189!
256
  QUERY_CHECK_NULL(pFileState->rowStateBuff, code, lino, _end, terrno);
6,189!
257
  QUERY_CHECK_NULL(pFileState->cfName, code, lino, _end, terrno);
6,189!
258

259
  if (type == STREAM_STATE_BUFF_HASH_SEARCH) {
6,189✔
260
    pFileState->searchBuff = tSimpleHashInit(MIN_NUM_SEARCH_BUCKET, hashFn);
127✔
261
    QUERY_CHECK_NULL(pFileState->searchBuff, code, lino, _end, terrno);
127!
262
  }
263

264
  pFileState->keyLen = keySize;
6,189✔
265
  pFileState->rowSize = rowSize;
6,189✔
266
  pFileState->selectivityRowSize = selectRowSize;
6,189✔
267
  pFileState->preCheckPointVersion = 0;
6,189✔
268
  pFileState->checkPointVersion = 1;
6,189✔
269
  pFileState->pFileStore = pFile;
6,189✔
270
  pFileState->getTs = fp;
6,189✔
271
  pFileState->curRowCount = 0;
6,189✔
272
  pFileState->deleteMark = delMark;
6,189✔
273
  pFileState->flushMark = INT64_MIN;
6,189✔
274
  pFileState->maxTs = INT64_MIN;
6,189✔
275
  pFileState->id = taosStrdup(taskId);
6,189!
276
  QUERY_CHECK_NULL(pFileState->id, code, lino, _end, terrno);
6,188!
277

278
  pFileState->pGroupIdMap = tSimpleHashInit(1024, hashFn);
6,188✔
279
  QUERY_CHECK_NULL(pFileState->pGroupIdMap, code, lino, _end, terrno);
6,187!
280

281
  pFileState->pRecFlagMap = tSimpleHashInit(1024, hashFn);
6,187✔
282
  QUERY_CHECK_NULL(pFileState->pRecFlagMap, code, lino, _end, terrno);
6,187!
283

284

285
  pFileState->hasFillCatch = true;
6,187✔
286

287
  if (type == STREAM_STATE_BUFF_HASH || type == STREAM_STATE_BUFF_HASH_SEARCH) {
6,187✔
288
    code = recoverSnapshot(pFileState, checkpointId);
4,125✔
289
  } else if (type == STREAM_STATE_BUFF_SORT) {
2,062✔
290
    code = recoverSession(pFileState, checkpointId);
1,797✔
291
  } else if (type == STREAM_STATE_BUFF_HASH_SORT) {
265!
292
    code = recoverFillSnapshot(pFileState, checkpointId);
265✔
293
  }
294
  QUERY_CHECK_CODE(code, lino, _end);
6,190!
295

296
  void*   valBuf = NULL;
6,190✔
297
  int32_t len = 0;
6,190✔
298
  int32_t tmpRes = streamDefaultGet_rocksdb(pFileState->pFileStore, STREAM_STATE_INFO_NAME, &valBuf, &len);
6,190✔
299
  if (tmpRes == TSDB_CODE_SUCCESS) {
6,190✔
300
    QUERY_CHECK_CONDITION((len == sizeof(TSKEY)), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
34!
301
    streamFileStateDecode(&pFileState->flushMark, valBuf, len);
34✔
302
    qDebug("===stream===flushMark  read:%" PRId64, pFileState->flushMark);
34✔
303
  }
304
  taosMemoryFreeClear(valBuf);
6,190!
305
  (*ppFileState) = pFileState;
6,190✔
306

307
_end:
6,190✔
308
  if (code != TSDB_CODE_SUCCESS) {
6,190!
UNCOV
309
    streamFileStateDestroy(pFileState);
×
UNCOV
310
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
311
  }
312
  return code;
6,190✔
313
}
314

315
void destroyRowBuffPos(SRowBuffPos* pPos) {
9,700,497✔
316
  taosMemoryFreeClear(pPos->pKey);
9,700,497!
317
  taosMemoryFreeClear(pPos->pRowBuff);
9,719,106!
318
  taosMemoryFree(pPos);
9,704,213!
319
}
9,727,180✔
320

321
void destroyRowBuffPosPtr(void* ptr) {
604,081✔
322
  if (!ptr) {
604,081!
UNCOV
323
    return;
×
324
  }
325
  SRowBuffPos* pPos = *(SRowBuffPos**)ptr;
604,081✔
326
  if (!pPos->beUsed) {
604,081✔
327
    destroyRowBuffPos(pPos);
422✔
328
  }
329
}
330

331
void destroyRowBuffAllPosPtr(void* ptr) {
3,643,332✔
332
  if (!ptr) {
3,643,332!
UNCOV
333
    return;
×
334
  }
335
  SRowBuffPos* pPos = *(SRowBuffPos**)ptr;
3,643,332✔
336
  destroyRowBuffPos(pPos);
3,643,332✔
337
}
338

339
void destroyRowBuff(void* ptr) {
6,036,851✔
340
  if (!ptr) {
6,036,851!
UNCOV
341
    return;
×
342
  }
343
  taosMemoryFree(*(void**)ptr);
6,036,851!
344
}
345

346
void streamFileStateDestroy(SStreamFileState* pFileState) {
13,748✔
347
  if (!pFileState) {
13,748✔
348
    return;
7,559✔
349
  }
350

351
  taosMemoryFree(pFileState->id);
6,189!
352
  taosMemoryFree(pFileState->cfName);
6,189!
353
  tdListFreeP(pFileState->usedBuffs, destroyRowBuffAllPosPtr);
6,189✔
354
  tdListFreeP(pFileState->freeBuffs, destroyRowBuff);
6,189✔
355
  pFileState->stateBuffCleanupFn(pFileState->rowStateBuff);
6,189✔
356
  sessionWinStateCleanup(pFileState->searchBuff);
6,189✔
357
  tSimpleHashCleanup(pFileState->pGroupIdMap);
6,189✔
358
  tSimpleHashCleanup(pFileState->pRecFlagMap);
6,189✔
359
  taosMemoryFree(pFileState);
6,189!
360
}
361

362
int32_t putFreeBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
6,657,413✔
363
  int32_t code = TSDB_CODE_SUCCESS;
6,657,413✔
364
  int32_t lino = 0;
6,657,413✔
365
  if (pPos->pRowBuff) {
6,657,413✔
366
    code = tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff));
6,657,365✔
367
    QUERY_CHECK_CODE(code, lino, _end);
6,657,365!
368
    pPos->pRowBuff = NULL;
6,657,365✔
369
  }
370

371
_end:
48✔
372
  if (code != TSDB_CODE_SUCCESS) {
6,657,413!
373
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
374
  }
375
  return code;
6,657,413✔
376
}
377

378
void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) {
6,767✔
379
  int32_t   code = TSDB_CODE_SUCCESS;
6,767✔
380
  int32_t   lino = 0;
6,767✔
381
  SListIter iter = {0};
6,767✔
382
  tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
6,767✔
383

384
  SListNode* pNode = NULL;
6,767✔
385
  while ((pNode = tdListNext(&iter)) != NULL) {
8,065,260✔
386
    SRowBuffPos* pPos = *(SRowBuffPos**)(pNode->data);
8,058,493✔
387
    if (all || (pFileState->getTs(pPos->pKey) < ts && !pPos->beUsed)) {
8,058,493!
388
      code = putFreeBuff(pFileState, pPos);
6,052,175✔
389
      QUERY_CHECK_CODE(code, lino, _end);
6,052,175!
390

391
      if (!all) {
6,052,175✔
392
        pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
5,786,660✔
393
      }
394
      destroyRowBuffPos(pPos);
6,052,175✔
395
      SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode);
6,052,175✔
396
      taosMemoryFreeClear(tmp);
6,052,175!
397
    }
398
  }
399

400
_end:
6,767✔
401
  if (code != TSDB_CODE_SUCCESS) {
6,767!
UNCOV
402
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
403
  }
404
}
6,767✔
405

406
int32_t clearFlushedRowBuff(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool all) {
254✔
407
  int32_t   code = TSDB_CODE_SUCCESS;
254✔
408
  int32_t   lino = 0;
254✔
409
  uint64_t  i = 0;
254✔
410
  SListIter iter = {0};
254✔
411
  tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
254✔
412

413
  SListNode* pNode = NULL;
254✔
414
  while ((pNode = tdListNext(&iter)) != NULL && i < max) {
2,417,599✔
415
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
2,417,345✔
416
    if (isFlushedState(pFileState, pFileState->getTs(pPos->pKey), 0)) {
2,417,345✔
417
      if (all || !pPos->beUsed) {
1,214,482!
418
        if (all && !pPos->pRowBuff) {
406!
UNCOV
419
          continue;
×
420
        }
421
        code = tdListAppend(pFlushList, &pPos);
406✔
422
        QUERY_CHECK_CODE(code, lino, _end);
406!
423

424
        pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
406✔
425
        pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
406✔
426
        if (pPos->beUsed == false) {
406!
427
          SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode);
406✔
428
          taosMemoryFreeClear(tmp);
406!
429
        }
430
        if (pPos->pRowBuff) {
406✔
431
          i++;
358✔
432
        }
433
      }
434
    }
435
  }
436
  qDebug("clear flushed row buff. %d rows to disk. is all:%d", listNEles(pFlushList), all);
254✔
437

438
_end:
5✔
439
  if (code != TSDB_CODE_SUCCESS) {
254!
UNCOV
440
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
441
  }
442
  return code;
254✔
443
}
444

445
void streamFileStateClear(SStreamFileState* pFileState) {
1,926✔
446
  pFileState->flushMark = INT64_MIN;
1,926✔
447
  pFileState->maxTs = INT64_MIN;
1,926✔
448
  tSimpleHashClear(pFileState->rowStateBuff);
1,926✔
449
  clearExpiredRowBuff(pFileState, 0, true);
1,926✔
450
}
1,926✔
451

452
bool needClearDiskBuff(SStreamFileState* pFileState) { return pFileState->flushMark > 0; }
341✔
453

454
void streamFileStateReleaseBuff(SStreamFileState* pFileState, SRowBuffPos* pPos, bool used) { pPos->beUsed = used; }
9,212,163✔
455

456
int32_t popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool used) {
96✔
457
  int32_t   code = TSDB_CODE_SUCCESS;
96✔
458
  int32_t   lino = 0;
96✔
459
  uint64_t  i = 0;
96✔
460
  SListIter iter = {0};
96✔
461
  tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
96✔
462

463
  SListNode* pNode = NULL;
96✔
464
  while ((pNode = tdListNext(&iter)) != NULL && i < max) {
4,225,566✔
465
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
4,225,470✔
466
    if (pPos->beUsed == used) {
4,225,470✔
467
      if (used && !pPos->pRowBuff) {
1,810,925✔
468
        continue;
1,207,250✔
469
      }
470
      code = tdListAppend(pFlushList, &pPos);
603,675✔
471
      QUERY_CHECK_CODE(code, lino, _end);
603,675!
472

473
      pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
603,675✔
474
      pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
603,675✔
475
      if (pPos->beUsed == false) {
603,675✔
476
        SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode);
16✔
477
        taosMemoryFreeClear(tmp);
16!
478
      }
479
      if (pPos->pRowBuff) {
603,675!
480
        i++;
603,675✔
481
      }
482
    }
483
  }
484

485
  qInfo("%s stream state flush %d rows to disk. is used:%d", pFileState->id, listNEles(pFlushList), used);
96!
486

UNCOV
487
_end:
×
488
  if (code != TSDB_CODE_SUCCESS) {
96!
UNCOV
489
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
490
  }
491
  return code;
96✔
492
}
493

494
int32_t flushRowBuff(SStreamFileState* pFileState) {
254✔
495
  int32_t          code = TSDB_CODE_SUCCESS;
254✔
496
  int32_t          lino = 0;
254✔
497
  SStreamSnapshot* pFlushList = tdListNew(POINTER_BYTES);
254✔
498
  if (!pFlushList) {
254!
UNCOV
499
    code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
500
    QUERY_CHECK_CODE(code, lino, _end);
×
501
  }
502

503
  uint64_t num = (uint64_t)(pFileState->curRowCount * FLUSH_RATIO);
254✔
504
  num = TMAX(num, FLUSH_NUM);
254✔
505
  code = clearFlushedRowBuff(pFileState, pFlushList, num, false);
254✔
506
  QUERY_CHECK_CODE(code, lino, _end);
254!
507

508
  if (isListEmpty(pFlushList)) {
254✔
509
    code = popUsedBuffs(pFileState, pFlushList, num, false);
50✔
510
    QUERY_CHECK_CODE(code, lino, _end);
50!
511

512
    if (isListEmpty(pFlushList)) {
50✔
513
      code = popUsedBuffs(pFileState, pFlushList, num, true);
46✔
514
      QUERY_CHECK_CODE(code, lino, _end);
46!
515
    }
516
  }
517

518
  if (pFileState->searchBuff) {
254!
UNCOV
519
    code = clearFlushedRowBuff(pFileState, pFlushList, pFileState->curRowCount, true);
×
UNCOV
520
    QUERY_CHECK_CODE(code, lino, _end);
×
521
  }
522

523
  flushSnapshot(pFileState, pFlushList, false);
254✔
524

525
  SListIter fIter = {0};
254✔
526
  tdListInitIter(pFlushList, &fIter, TD_LIST_FORWARD);
254✔
527
  SListNode* pNode = NULL;
254✔
528
  while ((pNode = tdListNext(&fIter)) != NULL) {
604,335✔
529
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
604,081✔
530
    code = putFreeBuff(pFileState, pPos);
604,081✔
531
    QUERY_CHECK_CODE(code, lino, _end);
604,081!
532
  }
533

534
  tdListFreeP(pFlushList, destroyRowBuffPosPtr);
254✔
535

536
_end:
254✔
537
  if (code != TSDB_CODE_SUCCESS) {
254!
UNCOV
538
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
539
  }
540
  return code;
254✔
541
}
542

543
int32_t clearRowBuff(SStreamFileState* pFileState) {
253✔
544
  if (pFileState->deleteMark != INT64_MAX) {
253!
UNCOV
545
    clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark, false);
×
546
  }
547
  do {
548
    int32_t code = flushRowBuff(pFileState);
254✔
549
    if (code != TSDB_CODE_SUCCESS) {
254!
UNCOV
550
      return code;
×
551
    }
552
  } while (isListEmpty(pFileState->freeBuffs) && pFileState->curRowCount == pFileState->maxRowCount);
254!
553
  return TSDB_CODE_SUCCESS;
253✔
554
}
555

556
int32_t clearFlushedRowBuffByFlag(SStreamFileState* pFileState, uint64_t max) {
4,320✔
557
  int32_t   code = TSDB_CODE_SUCCESS;
4,320✔
558
  int32_t   lino = 0;
4,320✔
559
  uint64_t  i = 0;
4,320✔
560
  SListIter iter = {0};
4,320✔
561
  tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
4,320✔
562

563
  SListNode* pNode = NULL;
4,320✔
564
  while ((pNode = tdListNext(&iter)) != NULL && i < max) {
6,947!
565
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
2,627✔
566
    if (pPos->invalid) {
2,627✔
567
      if (!pPos->beUsed) {
445!
568
        SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode);
445✔
569
        taosMemoryFreeClear(tmp);
445!
570
        if (pPos->pRowBuff) {
445!
571
          i++;
445✔
572
        }
573
        code = putFreeBuff(pFileState, pPos);
445✔
574
        QUERY_CHECK_CODE(code, lino, _end);
445!
575
        destroyRowBuffPos(pPos);
445✔
576
      }
577
    }
578
  }
579

580
_end:
4,320✔
581
  if (code != TSDB_CODE_SUCCESS) {
4,320!
UNCOV
582
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
583
  }
584
  return code;
4,320✔
585
}
586

587
int32_t clearRowBuffNonFlush(SStreamFileState* pFileState) {
4,320✔
588
  int32_t code = TSDB_CODE_SUCCESS;
4,320✔
589
  int32_t lino = 0;
4,320✔
590

591
  if (pFileState->deleteMark != INT64_MAX) {
4,320!
UNCOV
592
    clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark, false);
×
593
  }
594

595
  uint64_t num = (uint64_t)(pFileState->curRowCount * FLUSH_RATIO);
4,320✔
596
  num = TMAX(num, FLUSH_NUM);
4,320✔
597
  code = clearFlushedRowBuffByFlag(pFileState, num);
4,320✔
598
  QUERY_CHECK_CODE(code, lino, _end);
4,320!
599

600
_end:
4,320✔
601
  if (code != TSDB_CODE_SUCCESS) {
4,320!
UNCOV
602
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
603
  }
604
  return code;
4,320✔
605
}
606

607
void* getFreeBuff(SStreamFileState* pFileState) {
9,727,640✔
608
  SList*     lists = pFileState->freeBuffs;
9,727,640✔
609
  int32_t    buffSize = pFileState->rowSize;
9,727,640✔
610
  SListNode* pNode = tdListPopHead(lists);
9,727,640✔
611
  if (!pNode) {
9,727,614✔
612
    return NULL;
9,183,963✔
613
  }
614
  void* ptr = *(void**)pNode->data;
543,651✔
615
  memset(ptr, 0, buffSize);
543,651✔
616
  taosMemoryFree(pNode);
543,651!
617
  return ptr;
543,662✔
618
}
619

UNCOV
620
void streamFileStateClearBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
×
UNCOV
621
  if (pPos->pRowBuff) {
×
UNCOV
622
    memset(pPos->pRowBuff, 0, pFileState->rowSize);
×
623
  }
UNCOV
624
}
×
625

626
SRowBuffPos* getNewRowPos(SStreamFileState* pFileState) {
9,726,876✔
627
  int32_t      code = TSDB_CODE_SUCCESS;
9,726,876✔
628
  int32_t      lino = 0;
9,726,876✔
629
  SRowBuffPos* pPos = taosMemoryCalloc(1, sizeof(SRowBuffPos));
9,726,876!
630
  if (!pPos) {
9,727,179!
UNCOV
631
    code = terrno;
×
UNCOV
632
    QUERY_CHECK_CODE(code, lino, _error);
×
633
  }
634

635
  pPos->pKey = taosMemoryCalloc(1, pFileState->keyLen);
9,727,179!
636
  if (!pPos->pKey) {
9,727,003!
UNCOV
637
    code = terrno;
×
UNCOV
638
    QUERY_CHECK_CODE(code, lino, _error);
×
639
  }
640

641
  void* pBuff = getFreeBuff(pFileState);
9,727,003✔
642
  if (pBuff) {
9,726,907✔
643
    pPos->pRowBuff = pBuff;
543,085✔
644
    goto _end;
543,085✔
645
  }
646

647
  if (pFileState->curRowCount < pFileState->maxRowCount) {
9,183,822✔
648
    pBuff = taosMemoryCalloc(1, pFileState->rowSize);
9,183,680!
649
    QUERY_CHECK_NULL(pBuff, code, lino, _error, terrno);
9,183,818!
650
    pPos->pRowBuff = pBuff;
9,183,818✔
651
    pFileState->curRowCount++;
9,183,818✔
652
    goto _end;
9,183,818✔
653
  }
654

655
  code = clearRowBuff(pFileState);
142✔
656
  QUERY_CHECK_CODE(code, lino, _error);
153!
657

658
  pPos->pRowBuff = getFreeBuff(pFileState);
153✔
659

660
_end:
9,727,056✔
661
  code = tdListAppend(pFileState->usedBuffs, &pPos);
9,727,056✔
662
  QUERY_CHECK_CODE(code, lino, _error);
9,727,111!
663

664
  QUERY_CHECK_CONDITION((pPos->pRowBuff != NULL), code, lino, _error, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
9,727,111!
665
_error:
9,727,111✔
666
  if (code != TSDB_CODE_SUCCESS) {
9,727,111!
UNCOV
667
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
668
    return NULL;
×
669
  }
670

671
  return pPos;
9,727,111✔
672
}
673

674
SRowBuffPos* getNewRowPosForWrite(SStreamFileState* pFileState) {
9,726,880✔
675
  int32_t      code = TSDB_CODE_SUCCESS;
9,726,880✔
676
  int32_t      lino = 0;
9,726,880✔
677
  SRowBuffPos* newPos = getNewRowPos(pFileState);
9,726,880✔
678
  if (!newPos) {
9,727,103!
UNCOV
679
    code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
680
    QUERY_CHECK_CODE(code, lino, _error);
×
681
  }
682
  newPos->beUsed = true;
9,727,103✔
683
  newPos->beFlushed = false;
9,727,103✔
684
  newPos->needFree = false;
9,727,103✔
685
  newPos->beUpdated = true;
9,727,103✔
686
  newPos->invalid = false;
9,727,103✔
687
  return newPos;
9,727,103✔
688

UNCOV
689
_error:
×
UNCOV
690
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
691
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
692
  }
UNCOV
693
  return NULL;
×
694
}
695

696
int32_t addRowBuffIfNotExist(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,
10,653,516✔
697
                             int32_t* pWinCode) {
698
  int32_t code = TSDB_CODE_SUCCESS;
10,653,516✔
699
  int32_t lino = 0;
10,653,516✔
700
  (*pWinCode) = TSDB_CODE_SUCCESS;
10,653,516✔
701
  pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey));
10,653,516✔
702
  SRowBuffPos** pos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen);
10,653,488✔
703
  if (pos) {
10,653,775✔
704
    if (pVal != NULL) {
938,378!
705
      *pVLen = pFileState->rowSize;
938,395✔
706
      *pVal = *pos;
938,395✔
707
      (*pos)->beUsed = true;
938,395✔
708
      (*pos)->beFlushed = false;
938,395✔
709
    }
710
    goto _end;
938,378✔
711
  }
712
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
9,715,397✔
713
  if (!pNewPos || !pNewPos->pRowBuff) {
9,715,605!
UNCOV
714
    code = TSDB_CODE_OUT_OF_MEMORY;
×
715
    QUERY_CHECK_CODE(code, lino, _end);
×
716
  }
717

718
  memcpy(pNewPos->pKey, pKey, keyLen);
9,715,605✔
719
  (*pWinCode) = TSDB_CODE_FAILED;
9,715,605✔
720

721
  TSKEY ts = pFileState->getTs(pKey);
9,715,605✔
722
  if (!isDeteled(pFileState, ts) && isFlushedState(pFileState, ts, 0)) {
9,715,589✔
723
    int32_t len = 0;
280,844✔
724
    void*   p = NULL;
280,844✔
725
    (*pWinCode) = pFileState->stateFileGetFn(pFileState, pKey, &p, &len);
280,844✔
726
    qDebug("===stream===get %" PRId64 " from disc, res %d", ts, (*pWinCode));
280,843✔
727
    if ((*pWinCode) == TSDB_CODE_SUCCESS) {
280,843✔
728
      memcpy(pNewPos->pRowBuff, p, len);
40✔
729
    }
730
    taosMemoryFree(p);
280,843!
731
  }
732

733
  code = tSimpleHashPut(pFileState->rowStateBuff, pKey, keyLen, &pNewPos, POINTER_BYTES);
9,715,546✔
734
  QUERY_CHECK_CODE(code, lino, _end);
9,715,485!
735

736
  if (pVal) {
9,715,485!
737
    *pVLen = pFileState->rowSize;
9,715,491✔
738
    *pVal = pNewPos;
9,715,491✔
739
  }
740

UNCOV
741
_end:
×
742
  if (code != TSDB_CODE_SUCCESS) {
10,653,863!
UNCOV
743
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
744
  }
745
  return code;
10,653,790✔
746
}
747

748
int32_t createRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) {
264✔
749
  int32_t code = TSDB_CODE_SUCCESS;
264✔
750
  int32_t lino = 0;
264✔
751

752
  pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey));
264✔
753

754
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
264✔
755
  if (!pNewPos || !pNewPos->pRowBuff) {
264!
756
    code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
757
    QUERY_CHECK_CODE(code, lino, _end);
×
758
  }
759

760
  memcpy(pNewPos->pKey, pKey, keyLen);
264✔
761

762
  code = tSimpleHashPut(pFileState->rowStateBuff, pKey, keyLen, &pNewPos, POINTER_BYTES);
264✔
763
  QUERY_CHECK_CODE(code, lino, _end);
264!
764

765
  if (pVal) {
264!
766
    *pVLen = pFileState->rowSize;
264✔
767
    *pVal = pNewPos;
264✔
768
  }
769

UNCOV
770
_end:
×
771
  if (code != TSDB_CODE_SUCCESS) {
264!
UNCOV
772
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
773
  }
774
  return code;
264✔
775
}
776

777
void deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen) {
5,694✔
778
  int32_t code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, keyLen);
5,694✔
779
  qTrace("%s at line %d res:%d", __func__, __LINE__, code_buff);
5,695✔
780
  int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey);
5,695✔
781
  qTrace("%s at line %d res:%d", __func__, __LINE__, code_file);
5,689✔
782
  if (pFileState->searchBuff != NULL) {
5,689✔
783
    deleteHashSortRowBuff(pFileState, pKey);
41✔
784
  }
785
}
5,689✔
786

787
void deleteRowBuffByGroupId(SStreamFileState* pFileState, uint64_t groupId) {
55✔
788
  SSHashObj* pRowMap = pFileState->rowStateBuff;
55✔
789
  void*   pIte = NULL;
55✔
790
  int32_t iter = 0;
55✔
791
  while ((pIte = tSimpleHashIterate(pRowMap, pIte, &iter)) != NULL) {
115✔
792
    size_t keyLen = 0;
60✔
793
    SWinKey* pKey = tSimpleHashGetKey(pIte, &keyLen);
60!
794
    if (pKey->groupId == groupId) {
60!
795
      int32_t tmpRes = tSimpleHashIterateRemove(pRowMap, pKey, keyLen, &pIte, &iter);
60✔
796
      qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
60!
797
    }
798
  }
799

800
  while (1) {
340✔
801
    SWinKey tmp = {.ts = INT64_MIN, .groupId = groupId};
395✔
802
    SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pFileState->pFileStore, &tmp);
395✔
803
    SWinKey delKey = {.groupId = groupId};
395✔
804
    int32_t code = streamStateGetGroupKVByCur_rocksdb(pFileState->pFileStore, pCur, &delKey, NULL, 0);
395✔
805
    if (code != TSDB_CODE_SUCCESS) {
395✔
806
      break;
55✔
807
    }
808
    code = streamStateDel_rocksdb(pFileState->pFileStore, &delKey);
340✔
809
    qTrace("%s at line %d res:%d", __func__, __LINE__, code);
340!
810
  }
811
}
55✔
812

813
static int32_t recoverSessionRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
423✔
814
  int32_t code = TSDB_CODE_SUCCESS;
423✔
815
  int32_t lino = 0;
423✔
816
  int32_t len = 0;
423✔
817
  void*   pBuff = NULL;
423✔
818
  code = pFileState->stateFileGetFn(pFileState, pPos->pKey, &pBuff, &len);
423✔
819
  QUERY_CHECK_CODE(code, lino, _end);
423!
820
  memcpy(pPos->pRowBuff, pBuff, len);
423✔
821
  taosMemoryFree(pBuff);
423!
822

823
_end:
423✔
824
  if (code != TSDB_CODE_SUCCESS) {
423!
UNCOV
825
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
826
  }
827
  return code;
423✔
828
}
829

830
static int32_t recoverStateRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
423✔
831
  int32_t code = TSDB_CODE_SUCCESS;
423✔
832
  int32_t lino = 0;
423✔
833
  pPos->pRowBuff = getFreeBuff(pFileState);
423✔
834
  if (!pPos->pRowBuff) {
423✔
835
    if (pFileState->curRowCount < pFileState->maxRowCount) {
102✔
836
      pPos->pRowBuff = taosMemoryCalloc(1, pFileState->rowSize);
2!
837
      if (!pPos->pRowBuff) {
2!
UNCOV
838
        code = terrno;
×
UNCOV
839
        QUERY_CHECK_CODE(code, lino, _end);
×
840
      }
841
      pFileState->curRowCount++;
2✔
842
    } else {
843
      code = clearRowBuff(pFileState);
100✔
844
      QUERY_CHECK_CODE(code, lino, _end);
100!
845
      pPos->pRowBuff = getFreeBuff(pFileState);
100✔
846
    }
847
    QUERY_CHECK_CONDITION((pPos->pRowBuff != NULL), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
102!
848
  }
849

850
  code = recoverSessionRowBuff(pFileState, pPos);
423✔
851
  QUERY_CHECK_CODE(code, lino, _end);
423!
852

853
_end:
423✔
854
  if (code != TSDB_CODE_SUCCESS) {
423!
UNCOV
855
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
856
  }
857
  return code;
423✔
858
}
859

860
int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal) {
9,133,265✔
861
  int32_t code = TSDB_CODE_SUCCESS;
9,133,265✔
862
  int32_t lino = 0;
9,133,265✔
863
  if (pPos->pRowBuff) {
9,133,265✔
864
    if (pPos->needFree) {
9,132,852!
UNCOV
865
      code = recoverSessionRowBuff(pFileState, pPos);
×
UNCOV
866
      QUERY_CHECK_CODE(code, lino, _end);
×
867
    }
868
    (*pVal) = pPos->pRowBuff;
9,132,852✔
869
    goto _end;
9,132,852✔
870
  }
871

872
  code = recoverStateRowBuff(pFileState, pPos);
413✔
873
  QUERY_CHECK_CODE(code, lino, _end);
423!
874

875
  (*pVal) = pPos->pRowBuff;
423✔
876
  // if (!pPos->needFree) {
877
  //   code = tdListPrepend(pFileState->usedBuffs, &pPos);
878
  //   QUERY_CHECK_CODE(code, lino, _end);
879
  // }
880

881
_end:
9,133,275✔
882
  if (code != TSDB_CODE_SUCCESS) {
9,133,275!
UNCOV
883
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
884
  }
885
  return code;
9,133,271✔
886
}
887

888
bool hasRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, bool hasLimit, bool* pIsLast) {
29✔
889
  bool res = false;
29✔
890
  if (pIsLast != NULL) {
29!
UNCOV
891
    (*pIsLast) = false;
×
892
  }
893
  
894
  SRowBuffPos** pos = tSimpleHashGet(pFileState->rowStateBuff, pKey, sizeof(SWinKey));
29✔
895
  if (pos) {
29✔
896
    res = true;
13✔
897
  }
898
  void* pSearchBuff = getSearchBuff(pFileState);
29✔
899
  if (pSearchBuff != NULL) {
29✔
900
    void** ppBuff = (void**)tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
3✔
901
    if (ppBuff != NULL) {
3✔
902
      SArray* pWinStates = (SArray*)(*ppBuff);
2✔
903
      if (pIsLast != NULL) {
2!
904
        SWinKey* pLastKey = (SWinKey*) taosArrayGetLast(pWinStates);
×
905
        *pIsLast = (winKeyCmprImpl(pKey, pLastKey) == 0);
×
906
      }
907
      if (hasLimit && taosArrayGetSize(pWinStates) <= MIN_NUM_OF_SORT_CACHE_WIN) {
2!
908
        res = true;
2✔
909
      }
910
      if (qDebugFlag & DEBUG_DEBUG) {
2!
911
        SWinKey* fistKey = (SWinKey*)taosArrayGet(pWinStates, 0);
2✔
912
        qDebug("===stream===check window state. buff min ts:%" PRId64 ",groupId:%" PRIu64 ".key ts:%" PRId64
2!
913
               ",groupId:%" PRIu64,
914
               fistKey->ts, fistKey->groupId, pKey->ts, pKey->groupId);
915
      }
916
    } else {
917
      res = true;
1✔
918
    }
919
  }
920
  return res;
29✔
921
}
922

923
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) {
4,841✔
924
  int64_t mark = (pFileState->deleteMark == INT64_MAX || pFileState->maxTs == INT64_MIN)
1,896✔
925
                     ? INT64_MIN
926
                     : pFileState->maxTs - pFileState->deleteMark;
6,737✔
927
  clearExpiredRowBuff(pFileState, mark, false);
4,841✔
928
  return pFileState->usedBuffs;
4,841✔
929
}
930

931
void flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState) {
5,095✔
932
  int32_t   code = TSDB_CODE_SUCCESS;
5,095✔
933
  int32_t   lino = 0;
5,095✔
934
  SListIter iter = {0};
5,095✔
935
  tdListInitIter(pSnapshot, &iter, TD_LIST_FORWARD);
5,095✔
936

937
  int64_t    st = taosGetTimestampMs();
5,095✔
938
  SListNode* pNode = NULL;
5,095✔
939

940
  int idx = streamStateGetCfIdx(pFileState->pFileStore, pFileState->cfName);
5,095✔
941

942
  int32_t len = (pFileState->rowSize + sizeof(uint64_t) + sizeof(int32_t) + 64) * 2;
5,095✔
943
  char*   buf = taosMemoryCalloc(1, len);
5,095!
944
  if (!buf) {
5,095!
UNCOV
945
    code = terrno;
×
UNCOV
946
    QUERY_CHECK_CODE(code, lino, _end);
×
947
  }
948

949
  void* batch = streamStateCreateBatch();
5,095✔
950
  if (!batch) {
5,095!
UNCOV
951
    code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
952
    QUERY_CHECK_CODE(code, lino, _end);
×
953
  }
954

955
  while ((pNode = tdListNext(&iter)) != NULL && code == TSDB_CODE_SUCCESS) {
2,615,447✔
956
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
2,610,335✔
957
    if (pPos->beFlushed || !pPos->pRowBuff) {
2,610,335!
958
      continue;
578,647✔
959
    }
960
    pPos->beFlushed = true;
2,031,688✔
961
    pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
2,031,688✔
962

963
    qDebug("===stream===flushed start:%" PRId64, pFileState->getTs(pPos->pKey));
2,031,648✔
964
    if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) {
2,031,648✔
965
      code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
7,854✔
966
      streamStateClearBatch(batch);
7,854✔
967
      QUERY_CHECK_CODE(code, lino, _end);
7,854!
968
    }
969

970
    void* pSKey = pFileState->stateBuffCreateStateKeyFn(pPos, ((SStreamState*)pFileState->pFileStore)->number);
2,031,671✔
971
    QUERY_CHECK_NULL(pSKey, code, lino, _end, terrno);
2,031,681!
972

973
    code = streamStatePutBatchOptimize(pFileState->pFileStore, idx, batch, pSKey, pPos->pRowBuff, pFileState->rowSize,
2,031,681✔
974
                                       0, buf);
975
    taosMemoryFreeClear(pSKey);
2,031,691!
976
    QUERY_CHECK_CODE(code, lino, _end);
2,031,705!
977
    // todo handle failure
978
    memset(buf, 0, len);
2,031,705✔
979
  }
980
  taosMemoryFreeClear(buf);
5,122!
981

982
  int32_t numOfElems = streamStateGetBatchSize(batch);
5,122✔
983
  if (numOfElems > 0) {
5,095✔
984
    code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
1,224✔
985
    QUERY_CHECK_CODE(code, lino, _end);
1,224!
986
  } else {
987
    goto _end;
3,871✔
988
  }
989

990
  streamStateClearBatch(batch);
1,224✔
991

992
  clearSearchBuff(pFileState);
1,224✔
993

994
  int64_t elapsed = taosGetTimestampMs() - st;
1,224✔
995
  qDebug("%s flush to disk in batch model completed, rows:%d, batch size:%d, elapsed time:%" PRId64 "ms",
1,224✔
996
         pFileState->id, numOfElems, BATCH_LIMIT, elapsed);
997

998
  if (flushState) {
1,224✔
999
    void*   valBuf = NULL;
1,172✔
1000
    int32_t len = 0;
1,172✔
1001
    code = streamFileStateEncode(&pFileState->flushMark, &valBuf, &len);
1,172✔
1002
    QUERY_CHECK_CODE(code, lino, _end);
1,172!
1003

1004
    qDebug("===stream===flushMark write:%" PRId64, pFileState->flushMark);
1,172✔
1005
    code = streamStatePutBatch(pFileState->pFileStore, "default", batch, STREAM_STATE_INFO_NAME, valBuf, len, 0);
1,172✔
1006
    taosMemoryFree(valBuf);
1,172!
1007
    QUERY_CHECK_CODE(code, lino, _end);
1,172!
1008

1009
    code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
1,172✔
1010
    QUERY_CHECK_CODE(code, lino, _end);
1,172!
1011
  }
1012

1013
_end:
52✔
1014
  if (code != TSDB_CODE_SUCCESS) {
5,095!
UNCOV
1015
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1016
  }
1017
  taosMemoryFree(buf);
5,095!
1018
  streamStateDestroyBatch(batch);
5,095✔
1019
}
5,095✔
1020

1021
int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId) {
×
1022
  char keyBuf[128] = {0};
×
1023
  TAOS_UNUSED(tsnprintf(keyBuf, sizeof(keyBuf), "%s:%" PRId64 "", TASK_KEY, checkpointId));
×
1024
  return streamDefaultDel_rocksdb(pFileState->pFileStore, keyBuf);
×
1025
}
1026

1027
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
×
1028
  int32_t code = TSDB_CODE_SUCCESS;
×
1029
  int64_t maxCheckPointId = 0;
×
1030
  {
1031
    char    buf[128] = {0};
×
1032
    void*   val = NULL;
×
1033
    int32_t len = 0;
×
1034
    memcpy(buf, TASK_KEY, strlen(TASK_KEY));
×
UNCOV
1035
    code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len);
×
1036
    if (code != 0 || len == 0 || val == NULL) {
×
UNCOV
1037
      return TSDB_CODE_FAILED;
×
1038
    }
1039
    memcpy(buf, val, len);
×
UNCOV
1040
    buf[len] = 0;
×
1041
    maxCheckPointId = taosStr2Int64((char*)buf, NULL, 10);
×
UNCOV
1042
    taosMemoryFree(val);
×
1043
  }
UNCOV
1044
  for (int64_t i = maxCheckPointId; i > 0; i--) {
×
UNCOV
1045
    char    buf[128] = {0};
×
UNCOV
1046
    void*   val = 0;
×
UNCOV
1047
    int32_t len = 0;
×
UNCOV
1048
    TAOS_UNUSED(tsnprintf(buf, sizeof(buf), "%s:%" PRId64 "", TASK_KEY, i));
×
UNCOV
1049
    code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len);
×
UNCOV
1050
    if (code != 0) {
×
UNCOV
1051
      return TSDB_CODE_FAILED;
×
1052
    }
UNCOV
1053
    memcpy(buf, val, len);
×
UNCOV
1054
    buf[len] = 0;
×
UNCOV
1055
    taosMemoryFree(val);
×
1056

1057
    TSKEY ts;
UNCOV
1058
    ts = taosStr2Int64((char*)buf, NULL, 10);
×
UNCOV
1059
    if (ts < mark) {
×
1060
      // statekey winkey.ts < mark
UNCOV
1061
      int32_t tmpRes = forceRemoveCheckpoint(pFileState, i);
×
UNCOV
1062
      qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
×
UNCOV
1063
      break;
×
1064
    }
1065
  }
UNCOV
1066
  return code;
×
1067
}
1068

1069
int32_t recoverSession(SStreamFileState* pFileState, int64_t ckId) {
1,793✔
1070
  int32_t code = TSDB_CODE_SUCCESS;
1,793✔
1071
  int32_t lino = 0;
1,793✔
1072
  int32_t winRes = TSDB_CODE_SUCCESS;
1,793✔
1073
  if (pFileState->maxTs != INT64_MIN) {
1,793!
UNCOV
1074
    int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs)
×
1075
                       ? INT64_MIN
UNCOV
1076
                       : pFileState->maxTs - pFileState->deleteMark;
×
UNCOV
1077
    int32_t tmpRes = deleteExpiredCheckPoint(pFileState, mark);
×
UNCOV
1078
    qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
×
1079
  }
1080

1081
  SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileState->pFileStore, INT64_MAX);
1,793✔
1082
  int32_t          recoverNum = TMIN(MIN_NUM_OF_RECOVER_ROW_BUFF, pFileState->maxRowCount);
1,797✔
1083
  while (winRes == TSDB_CODE_SUCCESS) {
1,798!
1084
    if (pFileState->curRowCount >= recoverNum) {
1,798!
1085
      break;
1,796✔
1086
    }
1087

1088
    void*       pVal = NULL;
1,798✔
1089
    int32_t     vlen = 0;
1,798✔
1090
    SSessionKey key = {0};
1,798✔
1091
    winRes = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, &key, &pVal, &vlen);
1,798✔
1092
    if (winRes != TSDB_CODE_SUCCESS) {
1,797✔
1093
      break;
1,796✔
1094
    }
1095

1096
    if (vlen != pFileState->rowSize) {
1!
UNCOV
1097
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
UNCOV
1098
      QUERY_CHECK_CODE(code, lino, _end);
×
1099
    }
1100

1101
    SRowBuffPos* pPos = createSessionWinBuff(pFileState, &key, pVal, &vlen);
1✔
1102
    pPos->beUsed = false;
1✔
1103
    winRes = putSessionWinResultBuff(pFileState, pPos);
1✔
1104
    if (winRes != TSDB_CODE_SUCCESS) {
1!
UNCOV
1105
      break;
×
1106
    }
1107

1108
    winRes = streamStateSessionCurPrev_rocksdb(pCur);
1✔
1109
  }
1110

1111
_end:
×
1112
  if (code != TSDB_CODE_SUCCESS) {
1,796!
1113
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1114
  }
1115
  streamStateFreeCur(pCur);
1,796✔
1116
  return code;
1,796✔
1117
}
1118

1119
int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
4,128✔
1120
  int32_t code = TSDB_CODE_SUCCESS;
4,128✔
1121
  int32_t lino = 0;
4,128✔
1122
  int32_t winCode = TSDB_CODE_SUCCESS;
4,128✔
1123
  if (pFileState->maxTs != INT64_MIN) {
4,128!
UNCOV
1124
    int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs)
×
1125
                       ? INT64_MIN
1126
                       : pFileState->maxTs - pFileState->deleteMark;
×
UNCOV
1127
    int32_t tmpRes = deleteExpiredCheckPoint(pFileState, mark);
×
UNCOV
1128
    qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
×
1129
  }
1130

1131
  SStreamStateCur* pCur = streamStateSeekToLast_rocksdb(pFileState->pFileStore);
4,128✔
1132
  int32_t          recoverNum = TMIN(MIN_NUM_OF_RECOVER_ROW_BUFF, pFileState->maxRowCount);
4,128✔
1133
  while (winCode == TSDB_CODE_SUCCESS) {
4,128!
1134
    if (pFileState->curRowCount >= recoverNum) {
4,128!
1135
      break;
4,128✔
1136
    }
1137

1138
    void*        pVal = NULL;
4,128✔
1139
    int32_t      vlen = 0;
4,128✔
1140
    SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
4,128✔
1141
    if (!pNewPos || !pNewPos->pRowBuff) {
4,128!
UNCOV
1142
      code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
1143
      QUERY_CHECK_CODE(code, lino, _end);
×
1144
    }
1145

1146
    winCode =
1147
        streamStateGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, pNewPos->pKey, (const void**)&pVal, &vlen);
4,128✔
1148
    qDebug("===stream=== get state by cur winres:%d. %s", winCode, __func__);
4,128✔
1149
    if (winCode != TSDB_CODE_SUCCESS || pFileState->getTs(pNewPos->pKey) < pFileState->flushMark) {
4,128!
1150
      destroyRowBuffPos(pNewPos);
4,128✔
1151
      SListNode* pNode = tdListPopTail(pFileState->usedBuffs);
4,128✔
1152
      taosMemoryFreeClear(pNode);
4,128!
1153
      taosMemoryFreeClear(pVal);
4,128!
1154
      break;
4,128✔
1155
    }
UNCOV
1156
    if (vlen != pFileState->rowSize) {
×
UNCOV
1157
      qError("row size mismatch, expect:%d, actual:%d", pFileState->rowSize, vlen);
×
UNCOV
1158
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
UNCOV
1159
      taosMemoryFreeClear(pVal);
×
UNCOV
1160
      QUERY_CHECK_CODE(code, lino, _end);
×
1161
    }
UNCOV
1162
    memcpy(pNewPos->pRowBuff, pVal, vlen);
×
UNCOV
1163
    taosMemoryFreeClear(pVal);
×
UNCOV
1164
    pNewPos->beFlushed = true;
×
UNCOV
1165
    pNewPos->beUsed = false;
×
UNCOV
1166
    qDebug("===stream=== read checkpoint state from disc. %s", __func__);
×
1167
    code = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
×
1168
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1169
      destroyRowBuffPos(pNewPos);
×
UNCOV
1170
      break;
×
1171
    }
UNCOV
1172
    streamStateCurPrev_rocksdb(pCur);
×
1173
  }
1174

UNCOV
1175
_end:
×
1176
  if (code != TSDB_CODE_SUCCESS) {
4,128!
UNCOV
1177
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1178
  }
1179
  streamStateFreeCur(pCur);
4,128✔
1180
  return code;
4,128✔
1181
}
1182

1183
int32_t streamFileStateGetSelectRowSize(SStreamFileState* pFileState) { return pFileState->selectivityRowSize; }
272,410✔
1184

1185
void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts) {
1,506✔
1186
  pFileState->flushMark = TMAX(pFileState->flushMark, ts);
1,506✔
1187
  pFileState->maxTs = TMAX(pFileState->maxTs, ts);
1,506✔
1188
}
1,506✔
1189

1190
void* getRowStateBuff(SStreamFileState* pFileState) { return pFileState->rowStateBuff; }
41,533✔
1191
void* getSearchBuff(SStreamFileState* pFileState) { return pFileState->searchBuff; }
10,385,642✔
1192

1193
void* getStateFileStore(SStreamFileState* pFileState) { return pFileState->pFileStore; }
20,872✔
1194

1195
bool isDeteled(SStreamFileState* pFileState, TSKEY ts) {
9,716,969✔
1196
  return pFileState->deleteMark != INT64_MAX && pFileState->maxTs > 0 &&
15,519,322!
1197
         ts < (pFileState->maxTs - pFileState->deleteMark);
5,802,353✔
1198
}
1199

1200
bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) { return ts <= (pFileState->flushMark + gap); }
8,669,608✔
1201

1202
TSKEY getFlushMark(SStreamFileState* pFileState) { return pFileState->flushMark; };
9✔
1203

1204
int32_t getRowStateRowSize(SStreamFileState* pFileState) { return pFileState->rowSize; }
273,346✔
1205

1206
int32_t getFunctionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) {
272,421✔
1207
  int32_t winCode = TSDB_CODE_SUCCESS;
272,421✔
1208
  return pFileState->stateFunctionGetFn(pFileState, pKey, keyLen, pVal, pVLen, &winCode);
272,421✔
1209
}
1210

1211
int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId) {
265✔
1212
  int32_t code = TSDB_CODE_SUCCESS;
265✔
1213
  int32_t lino = 0;
265✔
1214
  if (pFileState->maxTs != INT64_MIN) {
265!
UNCOV
1215
    int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs)
×
1216
                       ? INT64_MIN
UNCOV
1217
                       : pFileState->maxTs - pFileState->deleteMark;
×
UNCOV
1218
    code = deleteExpiredCheckPoint(pFileState, mark);
×
UNCOV
1219
    QUERY_CHECK_CODE(code, lino, _end);
×
1220
  }
1221

1222
  SStreamStateCur* pCur = streamStateFillSeekToLast_rocksdb(pFileState->pFileStore);
265✔
1223
  if (pCur == NULL) {
265✔
1224
    return code;
263✔
1225
  }
1226
  int32_t recoverNum = TMIN(MIN_NUM_OF_RECOVER_ROW_BUFF, pFileState->maxRowCount);
2✔
1227
  int32_t winRes = TSDB_CODE_SUCCESS;
2✔
1228
  while (winRes == TSDB_CODE_SUCCESS) {
4!
1229
    if (pFileState->curRowCount >= recoverNum) {
4!
1230
      break;
2✔
1231
    }
1232

1233
    void*        pVal = NULL;
4✔
1234
    int32_t      vlen = 0;
4✔
1235
    SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
4✔
1236
    winRes = streamStateFillGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &vlen);
4✔
1237
    qDebug("===stream=== get state by cur winres:%d. %s", winRes, __func__);
4!
1238
    if (winRes != TSDB_CODE_SUCCESS || isFlushedState(pFileState, pFileState->getTs(pNewPos->pKey), 0)) {
4!
1239
      destroyRowBuffPos(pNewPos);
2✔
1240
      SListNode* pNode = tdListPopTail(pFileState->usedBuffs);
2✔
1241
      taosMemoryFreeClear(pNode);
2!
1242
      taosMemoryFreeClear(pVal);
2!
1243
      break;
2✔
1244
    }
1245

1246
    if (vlen != pFileState->rowSize) {
2!
UNCOV
1247
      qError("row size mismatch, expect:%d, actual:%d", pFileState->rowSize, vlen);
×
UNCOV
1248
      destroyRowBuffPos(pNewPos);
×
UNCOV
1249
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
UNCOV
1250
      taosMemoryFreeClear(pVal);
×
UNCOV
1251
      QUERY_CHECK_CODE(code, lino, _end);
×
1252
    }
1253

1254
    memcpy(pNewPos->pRowBuff, pVal, vlen);
2✔
1255
    taosMemoryFreeClear(pVal);
2!
1256
    pNewPos->beFlushed = true;
2✔
1257
    pNewPos->beUsed = false;
2✔
1258
    qDebug("===stream=== read checkpoint state from disc. %s", __func__);
2!
1259
    winRes = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
2✔
1260
    if (winRes != TSDB_CODE_SUCCESS) {
2!
UNCOV
1261
      destroyRowBuffPos(pNewPos);
×
UNCOV
1262
      break;
×
1263
    }
1264
    streamStateCurPrev_rocksdb(pCur);
2✔
1265
  }
1266
  streamStateFreeCur(pCur);
2✔
1267

1268
_end:
2✔
1269
  if (code != TSDB_CODE_SUCCESS) {
2!
UNCOV
1270
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1271
  }
1272
  return code;
2✔
1273
}
1274

1275
int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,
1,833✔
1276
                   int32_t* pWinCode) {
1277
  int32_t code = TSDB_CODE_SUCCESS;
1,833✔
1278
  int32_t lino = 0;
1,833✔
1279
  (*pWinCode) = TSDB_CODE_FAILED;
1,833✔
1280
  pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey));
1,833✔
1281
  SRowBuffPos** ppPos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen);
1,833✔
1282
  if (ppPos) {
1,833✔
1283
    *pVLen = pFileState->rowSize;
846✔
1284
    *pVal = *ppPos;
846✔
1285
    (*ppPos)->beUsed = true;
846✔
1286
    (*ppPos)->beFlushed = false;
846✔
1287
    (*pWinCode) = TSDB_CODE_SUCCESS;
846✔
1288
    if ((*ppPos)->pRowBuff == NULL) {
846!
UNCOV
1289
      code = recoverStateRowBuff(pFileState, *ppPos);
×
UNCOV
1290
      QUERY_CHECK_CODE(code, lino, _end);
×
1291
    }
1292
    goto _end;
846✔
1293
  }
1294
  TSKEY ts = pFileState->getTs(pKey);
987✔
1295
  if (!isDeteled(pFileState, ts) && isFlushedState(pFileState, ts, 0)) {
987!
1296
    int32_t len = 0;
4✔
1297
    void*   p = NULL;
4✔
1298
    (*pWinCode) = pFileState->stateFileGetFn(pFileState, pKey, &p, &len);
4✔
1299
    qDebug("===stream===get %" PRId64 " from disc, res %d", ts, (*pWinCode));
4!
1300
    if ((*pWinCode) == TSDB_CODE_SUCCESS) {
4!
1301
      SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
4✔
1302
      if (!pNewPos || !pNewPos->pRowBuff) {
4!
UNCOV
1303
        code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
1304
        QUERY_CHECK_CODE(code, lino, _end);
×
1305
      }
1306

1307
      memcpy(pNewPos->pKey, pKey, keyLen);
4✔
1308
      memcpy(pNewPos->pRowBuff, p, len);
4✔
1309
      code = tSimpleHashPut(pFileState->rowStateBuff, pKey, keyLen, &pNewPos, POINTER_BYTES);
4✔
1310
      QUERY_CHECK_CODE(code, lino, _end);
4!
1311

1312
      if (pVal) {
4!
1313
        *pVLen = pFileState->rowSize;
4✔
1314
        *pVal = pNewPos;
4✔
1315
      }
1316
    }
1317
    taosMemoryFree(p);
4!
1318
  }
1319

1320
_end:
983✔
1321
  if (code != TSDB_CODE_SUCCESS) {
1,833!
UNCOV
1322
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1323
  }
1324
  return code;
1,833✔
1325
}
1326

1327
int32_t streamFileStateGroupPut(SStreamFileState* pFileState, int64_t groupId, void* value, int32_t vLen) {
476✔
1328
  int32_t code = TSDB_CODE_SUCCESS;
476✔
1329
  int32_t lino = 0;
476✔
1330
  if (value != NULL) {
476!
UNCOV
1331
    code = TSDB_CODE_INVALID_PARA;
×
UNCOV
1332
    QUERY_CHECK_CODE(code, lino, _end);
×
1333
  }
1334

1335
  if (tSimpleHashGet(pFileState->pGroupIdMap, &groupId, sizeof(int64_t)) == NULL) {
476✔
1336
    if (tSimpleHashGetSize(pFileState->pGroupIdMap) <= MAX_GROUP_ID_NUM) {
170!
1337
      code = tSimpleHashPut(pFileState->pGroupIdMap, &groupId, sizeof(int64_t), NULL, 0);
170✔
1338
      QUERY_CHECK_CODE(code, lino, _end);
170!
1339
    }
1340
    code = streamStatePutParTag_rocksdb(pFileState->pFileStore, groupId, value, vLen);
170✔
1341
    QUERY_CHECK_CODE(code, lino, _end);
170!
1342
  }
1343

1344
_end:
476✔
1345
  if (code != TSDB_CODE_SUCCESS) {
476!
UNCOV
1346
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1347
  }
1348
  return code;
476✔
1349
}
1350

1351
void streamFileStateGroupCurNext(SStreamStateCur* pCur) {
1,329✔
1352
  SStreamFileState* pFileState = (SStreamFileState*)pCur->pStreamFileState;
1,329✔
1353
  if (pCur->hashIter == -1) {
1,329✔
1354
    streamStateCurNext(pFileState->pFileStore, pCur);
70✔
1355
    return;
70✔
1356
  }
1357

1358
  int64_t gpId = *(int64_t*)tSimpleHashGetKey(pCur->pHashData, NULL);
1,259!
1359
  pCur->minGpId = TMAX(pCur->minGpId, gpId);
1,259✔
1360

1361
  SSHashObj* pHash = pFileState->pGroupIdMap;
1,259✔
1362
  pCur->pHashData = tSimpleHashIterate(pHash, pCur->pHashData, &pCur->hashIter);
1,259✔
1363
  if (!pCur->pHashData) {
1,259✔
1364
    pCur->hashIter = -1;
773✔
1365
    streamStateParTagSeekKeyNext_rocksdb(pFileState->pFileStore, pCur->minGpId, pCur);
773✔
1366
    return;
773✔
1367
  }
1368
}
1369

1370
int32_t streamFileStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void** pVal, int32_t* pVLen) {
3,984✔
1371
  int32_t code = TSDB_CODE_SUCCESS;
3,984✔
1372
  if (pCur->pHashData) {
3,984✔
1373
    *pKey = *(int64_t*)tSimpleHashGetKey(pCur->pHashData, NULL);
1,259!
1374
    return code;
1,259✔
1375
  }
1376
  return streamStateParTagGetKVByCur_rocksdb(pCur, pKey, NULL, NULL);
2,725✔
1377
}
1378

1379
SSHashObj* getGroupIdCache(SStreamFileState* pFileState) {
2,655✔
1380
  return pFileState->pGroupIdMap;
2,655✔
1381
}
1382

1383
void clearExpiredState(SStreamFileState* pFileState, int32_t numOfKeep, TSKEY minTs) {
4,320✔
1384
  int32_t    code = TSDB_CODE_SUCCESS;
4,320✔
1385
  int32_t    lino = 0;
4,320✔
1386
  SSHashObj* pSearchBuff = pFileState->searchBuff;
4,320✔
1387
  void*      pIte = NULL;
4,320✔
1388
  int32_t    iter = 0;
4,320✔
1389
  while ((pIte = tSimpleHashIterate(pSearchBuff, pIte, &iter)) != NULL) {
6,479✔
1390
    SArray* pWinStates = *((void**)pIte);
2,159✔
1391
    int32_t arraySize = TARRAY_SIZE(pWinStates);
2,159✔
1392
    if (minTs != INT64_MAX && arraySize > numOfKeep) {
2,159✔
1393
      SWinKey key = {.ts = minTs};
10!
1394
      key.groupId = *(uint64_t*)tSimpleHashGetKey(pIte, NULL);
10✔
1395
      int32_t index = binarySearch(pWinStates, arraySize, &key, fillStateKeyCompare);
10✔
1396
      numOfKeep = TMAX(arraySize - index, MIN_NUM_OF_SORT_CACHE_WIN);
10✔
1397
      qDebug("modify numOfKeep, numOfKeep:%d. %s at line %d", numOfKeep, __func__, __LINE__);
10!
1398
    }
1399

1400
    int32_t size = arraySize - numOfKeep;
2,159✔
1401
    for (int32_t i = 0; i < size; i++) {
2,605✔
1402
      SWinKey* pKey = taosArrayGet(pWinStates, i);
446✔
1403
      int32_t  code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, sizeof(SWinKey));
446✔
1404
      qTrace("clear expired buff, ts:%" PRId64 ",groupid:%" PRIu64 ". %s at line %d res:%d", pKey->ts, pKey->groupId, __func__, __LINE__, code_buff);
446!
1405

1406
      if (isFlushedState(pFileState, pKey->ts, 0)) {
446✔
1407
        int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey);
6✔
1408
        qTrace("clear expired file, ts:%" PRId64 ". %s at line %d res:%d", pKey->ts, __func__, __LINE__, code_file);
6!
1409
      }
1410

1411
      if (tSimpleHashGetSize(pFileState->pRecFlagMap) > 0) {
446!
1412
        tSimpleHashRemove(pFileState->pRecFlagMap, pKey, sizeof(SWinKey));
×
1413
      }
1414
    }
1415
    taosArrayRemoveBatch(pWinStates, 0, size, NULL);
2,159✔
1416
  }
1417
  code = clearRowBuffNonFlush(pFileState);
4,320✔
1418
  QUERY_CHECK_CODE(code, lino, _end);
4,320!
1419

1420
_end:
4,320✔
1421
  if (code != TSDB_CODE_SUCCESS) {
4,320!
UNCOV
1422
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1423
  }
1424
}
4,320✔
1425

1426
#ifdef BUILD_NO_CALL
1427
int32_t getStateSearchRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen,
1428
                              int32_t* pWinCode) {
1429
  int32_t code = TSDB_CODE_SUCCESS;
1430
  int32_t lino = 0;
1431

1432
  code = addRowBuffIfNotExist(pFileState, (void*)pKey, sizeof(SWinKey), pVal, pVLen, pWinCode);
1433
  QUERY_CHECK_CODE(code, lino, _end);
1434

1435
  SArray*    pWinStates = NULL;
1436
  SSHashObj* pSearchBuff = getSearchBuff(pFileState);
1437
  void**     ppBuff = tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
1438
  if (ppBuff) {
1439
    pWinStates = (SArray*)(*ppBuff);
1440
  } else {
1441
    pWinStates = taosArrayInit(16, sizeof(SWinKey));
1442
    QUERY_CHECK_NULL(pWinStates, code, lino, _end, terrno);
1443

1444
    code = tSimpleHashPut(pSearchBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
1445
    QUERY_CHECK_CODE(code, lino, _end);
1446
  }
1447

1448
  // recover
1449
  if (taosArrayGetSize(pWinStates) == 0 && needClearDiskBuff(pFileState)) {
1450
    recoverHashSortBuff(pFileState, pWinStates, pKey->groupId);
1451
  }
1452

1453
  int32_t size = taosArrayGetSize(pWinStates);
1454
  int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
1455
  if (!isFlushedState(pFileState, pKey->ts, 0) || index >= 0) {
1456
    // find the first position which is smaller than the pKey
1457
    if (index >= 0) {
1458
      SWinKey* pTmpKey = taosArrayGet(pWinStates, index);
1459
      if (winKeyCmprImpl(pTmpKey, pKey) == 0) {
1460
        goto _end;
1461
      }
1462
    }
1463
    index++;
1464
    void* tmp = taosArrayInsert(pWinStates, index, pKey);
1465
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1466
  }
1467

1468
  if (size >= MAX_NUM_OF_CACHE_WIN) {
1469
    int32_t num = size - NUM_OF_CACHE_WIN;
1470
    taosArrayRemoveBatch(pWinStates, 0, num, NULL);
1471
  }
1472

1473
_end:
1474
  if (code != TSDB_CODE_SUCCESS) {
1475
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1476
  }
1477
  return code;
1478
}
1479
#endif
1480

1481
int32_t getRowStatePrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** ppVal,
124✔
1482
                           int32_t* pVLen, int32_t* pWinCode) {
1483
  int32_t    code = TSDB_CODE_SUCCESS;
124✔
1484
  int32_t    lino = 0;
124✔
1485
  SArray*    pWinStates = NULL;
124✔
1486
  SSHashObj* pSearchBuff = getSearchBuff(pFileState);
124✔
1487
  void*      pState = getStateFileStore(pFileState);
124✔
1488
  void**     ppBuff = (void**)tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
124✔
1489
  if (ppBuff) {
124!
1490
    pWinStates = (SArray*)(*ppBuff);
124✔
UNCOV
1491
  } else if (needClearDiskBuff(pFileState)) {
×
UNCOV
1492
    qDebug("===stream=== search buff is empty.group id:%" PRId64, pKey->groupId);
×
UNCOV
1493
    SStreamStateCur* pCur = streamStateSeekKeyPrev_rocksdb(pState, pKey);
×
UNCOV
1494
    void*            tmpVal = NULL;
×
UNCOV
1495
    int32_t          len = 0;
×
UNCOV
1496
    (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pState, pCur, pResKey, (const void**)&tmpVal, &len);
×
UNCOV
1497
    if ((*pWinCode) == TSDB_CODE_SUCCESS) {
×
UNCOV
1498
      SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
×
UNCOV
1499
      if (!pNewPos || !pNewPos->pRowBuff) {
×
UNCOV
1500
        code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
1501
        QUERY_CHECK_CODE(code, lino, _end);
×
1502
      }
UNCOV
1503
      memcpy(pNewPos->pRowBuff, tmpVal, len);
×
UNCOV
1504
      taosMemoryFreeClear(tmpVal);
×
UNCOV
1505
      *pVLen = getRowStateRowSize(pFileState);
×
UNCOV
1506
      (*ppVal) = pNewPos;
×
1507
    }
UNCOV
1508
    streamStateFreeCur(pCur);
×
UNCOV
1509
    return code;
×
1510
  } else {
UNCOV
1511
    (*pWinCode) = TSDB_CODE_FAILED;
×
UNCOV
1512
    return code;
×
1513
  }
1514
  int32_t size = taosArrayGetSize(pWinStates);
124✔
1515
  int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
124✔
1516
  if (index >= 0) {
124!
1517
    SWinKey* pCurKey = taosArrayGet(pWinStates, index);
124✔
1518
    if (winKeyCmprImpl(pCurKey, pKey) == 0) {
124!
1519
      index--;
124✔
1520
    } else {
UNCOV
1521
      qDebug("%s failed at line %d since do not find cur SWinKey. trigger may be force window close", __func__,
×
1522
             __LINE__);
1523
    }
1524
  }
1525
  if (index == -1) {
124✔
1526
    SStreamStateCur* pCur = streamStateSeekKeyPrev_rocksdb(pState, pKey);
37✔
1527
    void*            tmpVal = NULL;
37✔
1528
    int32_t          len = 0;
37✔
1529
    (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pState, pCur, pResKey, (const void**)&tmpVal, &len);
37✔
1530
    if ((*pWinCode) == TSDB_CODE_SUCCESS) {
37!
UNCOV
1531
      SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
×
UNCOV
1532
      if (!pNewPos || !pNewPos->pRowBuff) {
×
UNCOV
1533
        code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
1534
        QUERY_CHECK_CODE(code, lino, _end);
×
1535
      }
UNCOV
1536
      memcpy(pNewPos->pRowBuff, tmpVal, len);
×
UNCOV
1537
      taosMemoryFreeClear(tmpVal);
×
UNCOV
1538
      *pVLen = getRowStateRowSize(pFileState);
×
UNCOV
1539
      (*ppVal) = pNewPos;
×
1540
    }
1541
    streamStateFreeCur(pCur);
37✔
1542
    return code;
37✔
1543
  } else {
1544
    SWinKey* pPrevKey = taosArrayGet(pWinStates, index);
87✔
1545
    *pResKey = *pPrevKey;
87✔
1546
    return addRowBuffIfNotExist(pFileState, (void*)pPrevKey, sizeof(SWinKey), ppVal, pVLen, pWinCode);
87✔
1547
  }
1548
  (*pWinCode) = TSDB_CODE_FAILED;
1549

UNCOV
1550
_end:
×
UNCOV
1551
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1552
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1553
  }
UNCOV
1554
  return code;
×
1555
}
1556

1557
int32_t addSearchItem(SStreamFileState* pFileState, SArray* pWinStates, const SWinKey* pKey, bool* pIsEnd) {
2,705✔
1558
  int32_t code = TSDB_CODE_SUCCESS;
2,705✔
1559
  int32_t lino = 0;
2,705✔
1560
  int32_t size = taosArrayGetSize(pWinStates);
2,705✔
1561
  int32_t index = binarySearch(pWinStates, size, pKey, fillTSKeyCompare);
2,705✔
1562
  if (!isFlushedState(pFileState, pKey->ts, 0) || index >= 0 || size == 0) {
2,705!
1563
    if (index >= 0) {
2,705✔
1564
      SWinKey* pTmpKey = taosArrayGet(pWinStates, index);
2,361✔
1565
      if (winKeyCmprImpl(pTmpKey, pKey) == 0) {
2,361✔
1566
        goto _end;
1,693✔
1567
      }
1568
    }
1569
    index++;
1,012✔
1570
    (*pIsEnd) = (index >= size);
1,012✔
1571
    void* tmp = taosArrayInsert(pWinStates, index, pKey);
1,012✔
1572
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1,012!
1573
  }
1574

1575
_end:
1,012✔
1576
  if (code != TSDB_CODE_SUCCESS) {
2,705!
UNCOV
1577
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1578
  }
1579
  return code;
2,705✔
1580
}
1581

1582
int32_t addArrayBuffIfNotExist(SSHashObj* pSearchBuff, uint64_t groupId, SArray** ppResStates) {
4,153✔
1583
  int32_t code = TSDB_CODE_SUCCESS;
4,153✔
1584
  int32_t lino = 0;
4,153✔
1585
  SArray* pWinStates = NULL;
4,153✔
1586
  void**  ppBuff = tSimpleHashGet(pSearchBuff, &groupId, sizeof(uint64_t));
4,153✔
1587
  if (ppBuff) {
4,153✔
1588
    pWinStates = (SArray*)(*ppBuff);
3,822✔
1589
  } else {
1590
    pWinStates = taosArrayInit(16, sizeof(SWinKey));
331✔
1591
    QUERY_CHECK_NULL(pWinStates, code, lino, _end, terrno);
331!
1592

1593
    code = tSimpleHashPut(pSearchBuff, &groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
331✔
1594
    QUERY_CHECK_CODE(code, lino, _end);
331!
1595
  }
1596

1597
  (*ppResStates) = pWinStates;
4,153✔
1598

1599
_end:
4,153✔
1600
  if (code != TSDB_CODE_SUCCESS) {
4,153!
UNCOV
1601
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1602
  }
1603
  return code;
4,153✔
1604
}
1605

1606
static void setValueBuff(TSKEY ts, char* pVal, int32_t len, char* pBuff, int32_t buffLen) {
55✔
1607
  SET_TSDATA_FLAG(pBuff, buffLen);
55✔
1608
  if (len == 0) {
55!
1609
    *(TSKEY*)pBuff = ts;
55✔
1610
    return;
55✔
1611
  }
UNCOV
1612
  memset(pBuff, 0, buffLen - 1);
×
UNCOV
1613
  *(TSKEY*)pBuff = ts;
×
UNCOV
1614
  memcpy(pBuff + sizeof(TSKEY), pVal, len);
×
1615
}
1616

1617
int32_t getAndSetTsData(STableTsDataState* pTsDataState, uint64_t tableUid, TSKEY* pCurTs, void** ppCurPkVal,
55✔
1618
                        TSKEY lastTs, void* pLastPkVal, int32_t lastPkLen, int32_t* pWinCode) {
1619
  int32_t code = TSDB_CODE_SUCCESS;
55✔
1620
  int32_t lino = 0;
55✔
1621
  bool    hasPk = (lastPkLen != 0);
55✔
1622

1623
  TSKEY* pDataVal = tSimpleHashGet(pTsDataState->pTableTsDataMap, &tableUid, sizeof(uint64_t));
55✔
1624
  if (pDataVal != NULL) {
55✔
1625
    (*pWinCode) = TSDB_CODE_SUCCESS;
33✔
1626
    *pCurTs = *pDataVal;
33✔
1627
    if ((*pCurTs) < lastTs) {
33!
1628
      setValueBuff(lastTs, pLastPkVal, lastPkLen, (char*)pDataVal, pTsDataState->pkValLen);
33✔
1629
    } else {
UNCOV
1630
      if (hasPk) {
×
UNCOV
1631
        (*ppCurPkVal) = POINTER_SHIFT(pDataVal, sizeof(TSKEY));
×
UNCOV
1632
        if ((*pCurTs) == lastTs && pTsDataState->comparePkColFn((*ppCurPkVal), pLastPkVal) < 0) {
×
UNCOV
1633
          setValueBuff(lastTs, pLastPkVal, lastPkLen, (char*)pDataVal, pTsDataState->pkValLen);
×
1634
        }
1635
      }
1636
    }
1637
  } else {
1638
    setValueBuff(lastTs, pLastPkVal, lastPkLen, pTsDataState->pPkValBuff, pTsDataState->pkValLen);
22✔
1639
    int32_t size = tSimpleHashGetSize(pTsDataState->pTableTsDataMap);
22✔
1640
    if (size < MAX_STATE_MAP_SIZE) {
22!
1641
      (*pWinCode) = TSDB_CODE_FAILED;
22✔
1642
      code = tSimpleHashPut(pTsDataState->pTableTsDataMap, &tableUid, sizeof(uint64_t), pTsDataState->pPkValBuff,
22✔
1643
                            pTsDataState->pkValLen);
22✔
1644
      QUERY_CHECK_CODE(code, lino, _end);
22!
1645
    } else {
UNCOV
1646
      (*pWinCode) = streamStateGetParTag_rocksdb(pTsDataState->pState, tableUid, &pTsDataState->pPkValBuff,
×
1647
                                                 &pTsDataState->pkValLen);
UNCOV
1648
      if ((*pWinCode) == TSDB_CODE_SUCCESS) {
×
UNCOV
1649
        *pCurTs = *(TSKEY*)pTsDataState->pPkValBuff;
×
UNCOV
1650
        if (hasPk) {
×
UNCOV
1651
          (*ppCurPkVal) = POINTER_SHIFT(pTsDataState->pPkValBuff, sizeof(TSKEY));
×
1652
        }
1653
      }
1654

UNCOV
1655
      int32_t tmpCode = streamStatePutParTag_rocksdb(pTsDataState->pState, tableUid, pTsDataState->pPkValBuff,
×
1656
                                                     pTsDataState->pkValLen);
1657
    }
1658
  }
1659

1660
_end:
55✔
1661
  if (code != TSDB_CODE_SUCCESS) {
55!
UNCOV
1662
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1663
  }
1664
  return code;
55✔
1665
}
1666

1667
int32_t doTsDataCommit(STableTsDataState* pTsDataState) {
5✔
1668
  int32_t code = TSDB_CODE_SUCCESS;
5✔
1669
  int32_t lino = 0;
5✔
1670
  void*   batch = NULL;
5✔
1671
  char*   pTempBuf = NULL;
5✔
1672

1673
  batch = streamStateCreateBatch();
5✔
1674
  QUERY_CHECK_NULL(batch, code, lino, _end, terrno);
5!
1675
  int           idx = streamStateGetCfIdx(pTsDataState->pState, "partag");
5✔
1676
  int32_t       len = (pTsDataState->pkValLen + sizeof(uint64_t) + sizeof(int32_t) + 64) * 2;
5✔
1677
  pTempBuf = taosMemoryCalloc(1, len);
5!
1678
  QUERY_CHECK_NULL(pTempBuf, code, lino, _end, terrno);
5!
1679

1680
  void*   pIte = NULL;
5✔
1681
  int32_t iter = 0;
5✔
1682
  while ((pIte = tSimpleHashIterate(pTsDataState->pTableTsDataMap, pIte, &iter)) != NULL) {
12✔
1683
    if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) {
7!
UNCOV
1684
      code = streamStatePutBatch_rocksdb(pTsDataState->pState, batch);
×
UNCOV
1685
      streamStateClearBatch(batch);
×
UNCOV
1686
      QUERY_CHECK_CODE(code, lino, _end);
×
1687
    }
1688

1689
    if (HAS_TSDATA_FLAG(pIte, pTsDataState->pkValLen)) {
7!
1690
      void* pKey = tSimpleHashGetKey(pIte, NULL);
7✔
1691
      UNSET_TSDATA_FLAG(pIte, pTsDataState->pkValLen);
7✔
1692
      code = streamStatePutBatchOptimize(pTsDataState->pState, idx, batch, pKey, pIte, pTsDataState->pkValLen, 0,
7✔
1693
                                         pTempBuf);
1694
      QUERY_CHECK_CODE(code, lino, _end);
7!
1695
      memset(pTempBuf, 0, len);
7✔
1696
      qDebug("flush ts data,table id:%" PRIu64 , *(uint64_t*)pKey);
7!
1697
    }
1698
  }
1699

1700
  int32_t numOfElems = streamStateGetBatchSize(batch);
5✔
1701
  if (numOfElems > 0) {
5✔
1702
    code = streamStatePutBatch_rocksdb(pTsDataState->pState, batch);
3✔
1703
    QUERY_CHECK_CODE(code, lino, _end);
3!
1704
  }
1705

1706
_end:
5✔
1707
  if (code != TSDB_CODE_SUCCESS) {
5!
UNCOV
1708
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1709
  }
1710
  taosMemoryFree(pTempBuf);
5!
1711
  streamStateDestroyBatch(batch);
5✔
1712
  return code;
5✔
1713
}
1714

1715
int32_t doRangeDataCommit(STableTsDataState* pTsDataState) {
5✔
1716
  int32_t code = TSDB_CODE_SUCCESS;
5✔
1717
  int32_t lino = 0;
5✔
1718
  void*   batch = NULL;
5✔
1719

1720
  batch = streamStateCreateBatch();
5✔
1721
  QUERY_CHECK_NULL(batch, code, lino, _end, terrno);
5!
1722
  int           idx = streamStateGetCfIdx(pTsDataState->pState, "sess");
5✔
1723
  int32_t       len = (pTsDataState->pkValLen + sizeof(uint64_t) + sizeof(int32_t) + 64) * 2;
5✔
1724

1725
  int32_t size = taosArrayGetSize(pTsDataState->pScanRanges);
5✔
1726
  for (int32_t i = 0; i < size; i++) {
5!
UNCOV
1727
    SScanRange* pRange = taosArrayGet(pTsDataState->pScanRanges, i);
×
UNCOV
1728
    if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) {
×
UNCOV
1729
      code = streamStatePutBatch_rocksdb(pTsDataState->pState, batch);
×
UNCOV
1730
      streamStateClearBatch(batch);
×
UNCOV
1731
      QUERY_CHECK_CODE(code, lino, _end);
×
1732
    }
UNCOV
1733
    SSessionKey key = {.win = pRange->win, .groupId = 0};
×
UNCOV
1734
    int32_t     uidSize = tSimpleHashGetSize(pRange->pUIds);
×
UNCOV
1735
    int32_t     gpIdSize = tSimpleHashGetSize(pRange->pGroupIds);
×
UNCOV
1736
    int32_t     size = uidSize + gpIdSize;
×
UNCOV
1737
    uint64_t*   pIdBuf = (uint64_t*)taosMemoryCalloc(1, size);
×
UNCOV
1738
    void*       pIte = NULL;
×
UNCOV
1739
    int32_t     iter = 0;
×
UNCOV
1740
    int32_t     i = 0;
×
UNCOV
1741
    while ((pIte = tSimpleHashIterate(pTsDataState->pTableTsDataMap, pIte, &iter)) != NULL) {
×
UNCOV
1742
      void* pTempKey = tSimpleHashGetKey(pIte, NULL);
×
UNCOV
1743
      pIdBuf[i] = *(uint64_t*)pTempKey;
×
UNCOV
1744
      i++;
×
1745
    }
1746

UNCOV
1747
    code = streamStatePutBatchOptimize(pTsDataState->pState, idx, batch, &key, (void*)pIdBuf, size, 0,
×
1748
                                       NULL);
UNCOV
1749
    QUERY_CHECK_CODE(code, lino, _end);
×
1750
  }
1751

1752
  int32_t numOfElems = streamStateGetBatchSize(batch);
5✔
1753
  if (numOfElems > 0) {
5!
UNCOV
1754
    code = streamStatePutBatch_rocksdb(pTsDataState->pState, batch);
×
UNCOV
1755
    QUERY_CHECK_CODE(code, lino, _end);
×
1756
  }
1757

1758
_end:
5✔
1759
  if (code != TSDB_CODE_SUCCESS) {
5!
UNCOV
1760
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1761
  }
1762
  streamStateDestroyBatch(batch);
5✔
1763
  return code;
5✔
1764
}
1765

1766
int32_t initTsDataState(STableTsDataState** ppTsDataState, int8_t pkType, int32_t pkLen, void* pState, void* pOtherState) {
57✔
1767
  int32_t    code = TSDB_CODE_SUCCESS;
57✔
1768
  int32_t    lino = 0;
57✔
1769

1770
  STableTsDataState* pTsDataState = taosMemoryCalloc(1, sizeof(STableTsDataState));
57!
1771
  QUERY_CHECK_NULL(pTsDataState, code, lino, _end, terrno);
57!
1772

1773
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT);
57✔
1774
  pTsDataState->pTableTsDataMap = tSimpleHashInit(DEFAULT_STATE_MAP_CAPACITY, hashFn);
57✔
1775
  QUERY_CHECK_NULL(pTsDataState->pTableTsDataMap, code, lino, _end, terrno);
57!
1776

1777
  pTsDataState->pkValLen = sizeof(TSKEY) + pkLen + sizeof(char);
57✔
1778
  pTsDataState->pPkValBuff = taosMemoryCalloc(1, pTsDataState->pkValLen);
57!
1779
  QUERY_CHECK_NULL(pTsDataState->pPkValBuff, code, lino, _end, terrno);
57!
1780

1781
  if (pkLen != 0) {
57!
UNCOV
1782
    pTsDataState->comparePkColFn = getKeyComparFunc(pkType, TSDB_ORDER_ASC);
×
1783
  } else {
1784
    pTsDataState->comparePkColFn = NULL;
57✔
1785
  }
1786

1787
  pTsDataState->pScanRanges = taosArrayInit(64, sizeof(SScanRange));
57✔
1788
  QUERY_CHECK_NULL(pTsDataState->pScanRanges, code, lino, _end, terrno);
57!
1789

1790
  pTsDataState->pState = pState;
57✔
1791
  pTsDataState->recValueLen = sizeof(SRecDataInfo) + pkLen;
57✔
1792
  pTsDataState->pRecValueBuff = taosMemoryCalloc(1, pTsDataState->recValueLen);
57!
1793
  QUERY_CHECK_NULL(pTsDataState->pRecValueBuff, code, lino, _end, terrno);
57!
1794

1795
  pTsDataState->curRecId = -1;
57✔
1796

1797
  pTsDataState->pStreamTaskState = pOtherState;
57✔
1798

1799
  pTsDataState->cfgIndex = streamStateGetCfIdx(pTsDataState->pState, "sess");
57✔
1800
  pTsDataState->pBatch = streamStateCreateBatch();
57✔
1801
  QUERY_CHECK_NULL(pTsDataState->pBatch, code, lino, _end, TSDB_CODE_FAILED);
57!
1802
  
1803
  pTsDataState->batchBufflen = (pTsDataState->recValueLen + sizeof(uint64_t) + sizeof(int32_t) + 64) * 2;
57✔
1804
  pTsDataState->pBatchBuff = taosMemoryCalloc(1, pTsDataState->batchBufflen);
57!
1805

1806
  (*ppTsDataState) = pTsDataState;
57✔
1807

1808
_end:
57✔
1809
  if (code != TSDB_CODE_SUCCESS) {
57!
UNCOV
1810
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1811
  }
1812
  return code;
57✔
1813
}
1814

UNCOV
1815
static void destroyScanRange(SScanRange* pRange) {
×
UNCOV
1816
  pRange->win.skey = INT64_MIN;
×
UNCOV
1817
  pRange->win.ekey = INT64_MIN;
×
UNCOV
1818
  tSimpleHashCleanup(pRange->pUIds);
×
UNCOV
1819
  pRange->pUIds = NULL;
×
UNCOV
1820
  tSimpleHashCleanup(pRange->pGroupIds);
×
UNCOV
1821
  pRange->pGroupIds = NULL;
×
UNCOV
1822
}
×
1823

1824
void destroyTsDataState(STableTsDataState* pTsDataState) {
57✔
1825
  SArray* pScanRanges = pTsDataState->pScanRanges;
57✔
1826
  int32_t size = taosArrayGetSize(pScanRanges);
57✔
1827
  for (int32_t i = 0; i < size; i++) {
57!
UNCOV
1828
    SScanRange* pRange = taosArrayGet(pScanRanges, i);
×
UNCOV
1829
    destroyScanRange(pRange);
×
1830
  }
1831
  taosArrayDestroy(pTsDataState->pScanRanges);
57✔
1832
  tSimpleHashCleanup(pTsDataState->pTableTsDataMap);
57✔
1833
  taosMemoryFreeClear(pTsDataState->pPkValBuff);
57!
1834
  taosMemoryFreeClear(pTsDataState->pState);
57!
1835
  taosMemoryFreeClear(pTsDataState->pRecValueBuff);
57!
1836
  pTsDataState->pStreamTaskState = NULL;
57✔
1837

1838
  streamStateClearBatch(pTsDataState->pBatch);
57✔
1839
  streamStateDestroyBatch(pTsDataState->pBatch);
57✔
1840
  pTsDataState->pBatch = NULL;
57✔
1841
  taosMemoryFreeClear(pTsDataState->pBatchBuff);
57!
1842

1843
  taosMemoryFreeClear(pTsDataState);
57!
1844
}
57✔
1845

1846
int32_t recoverTsData(STableTsDataState* pTsDataState) {
61✔
1847
  int32_t          code = TSDB_CODE_SUCCESS;
61✔
1848
  int32_t          lino = 0;
61✔
1849
  SStreamStateCur* pCur = createStateCursor(NULL);
61✔
1850
  streamStateParTagSeekKeyNext_rocksdb(pTsDataState->pState, INT64_MIN, pCur);
61✔
1851
  while (1) {
7✔
1852
    uint64_t tableUid = 0;
68✔
1853
    void*    pVal = NULL;
68✔
1854
    int32_t  len = 0;
68✔
1855
    int32_t  winCode = streamStateParTagGetKVByCur_rocksdb(pCur, &tableUid, &pVal, &len);
68✔
1856
    if (winCode != TSDB_CODE_SUCCESS) {
68✔
1857
      break;
61✔
1858
    }
1859
    if (pTsDataState->pkValLen != len) {
7!
UNCOV
1860
      taosMemoryFree(pVal);
×
UNCOV
1861
      streamStateCurNext_rocksdb(pCur);
×
UNCOV
1862
      continue;
×
1863
    }
1864
    UNSET_TSDATA_FLAG(pVal, len);
7✔
1865
    code = tSimpleHashPut(pTsDataState->pTableTsDataMap, &tableUid, sizeof(uint64_t), pVal, len);
7✔
1866
    taosMemoryFree(pVal);
7!
1867
    QUERY_CHECK_CODE(code, lino, _end);
7!
1868
    streamStateCurNext_rocksdb(pCur);
7✔
1869
  }
1870

1871
_end:
61✔
1872
  streamStateFreeCur(pCur);
61✔
1873
  if (code != TSDB_CODE_SUCCESS) {
61!
UNCOV
1874
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1875
  }
1876
  return code;
61✔
1877
}
1878

1879
SStreamStateCur* getLastStateCur(SStreamFileState* pFileState, getStateBuffFn fn) {
2✔
1880
  SStreamStateCur* pCur = createStateCursor(pFileState);
2✔
1881
  if (pCur == NULL) {
2!
UNCOV
1882
    return NULL;
×
1883
  }
1884
  SSHashObj* pSearchBuff = fn(pFileState);
2✔
1885
  pCur->buffIndex = 0;
2✔
1886
  pCur->hashIter = 0;
2✔
1887
  pCur->pHashData = NULL;
2✔
1888
  pCur->pHashData = tSimpleHashIterate(pSearchBuff, pCur->pHashData, &pCur->hashIter);
2✔
1889
  return pCur;
2✔
1890
}
1891

1892
void moveLastStateCurNext(SStreamStateCur* pCur, getStateBuffFn fn) {
5✔
1893
  SSHashObj* pSearchBuff = fn(pCur->pStreamFileState);
5✔
1894
  pCur->pHashData = tSimpleHashIterate(pSearchBuff, pCur->pHashData, &pCur->hashIter);
5✔
1895
}
5✔
1896

1897
int32_t getNLastStateKVByCur(SStreamStateCur* pCur, int32_t num, SArray* pRes) {
7✔
1898
  int32_t code = TSDB_CODE_SUCCESS;
7✔
1899
  int32_t lino = 0;
7✔
1900
  SArray*  pWinStates = NULL;
7✔
1901
  int32_t size = 0;
7✔
1902

1903
  while(1) {
1904
    if (pCur->pHashData == NULL) {
7✔
1905
      return TSDB_CODE_FAILED;
2✔
1906
    }
1907
    pWinStates = *((void**)pCur->pHashData);
5✔
1908
    size = taosArrayGetSize(pWinStates);
5✔
1909
    if (size > 0) {
5!
1910
      break;
5✔
1911
    }
UNCOV
1912
    moveLastStateCurNext(pCur, getSearchBuff);
×
1913
  }
1914

1915
  int32_t i = TMAX(size - num, 0);
5✔
1916

1917
  for ( ; i < size; i++) {
10✔
1918
    SWinKey* pKey = taosArrayGet(pWinStates, i);
5✔
1919
    int32_t  len = 0;
5✔
1920
    void*    pVal = NULL;
5✔
1921
    int32_t  winCode = TSDB_CODE_SUCCESS;
5✔
1922
    code = addRowBuffIfNotExist(pCur->pStreamFileState, (void*)pKey, sizeof(SWinKey), &pVal, &len, &winCode);
5✔
1923
    QUERY_CHECK_CODE(code, lino, _end);
5!
1924

1925
    if (winCode != TSDB_CODE_SUCCESS) {
5!
UNCOV
1926
      qError("%s failed at line %d since window not exist. ts:%" PRId64 ",groupId:%" PRIu64, __func__, __LINE__,
×
1927
             pKey->ts, pKey->groupId);
1928
    }
1929

1930
    void* pTempRes = taosArrayPush(pRes, &pVal);
5✔
1931
    QUERY_CHECK_NULL(pTempRes, code, lino, _end, terrno);
5!
1932
  }
1933

1934
_end:
5✔
1935
  if (code != TSDB_CODE_SUCCESS) {
5!
UNCOV
1936
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1937
  }
1938
  return code;
5✔
1939
}
1940

1941
int32_t reloadTsDataState(STableTsDataState* pTsDataState) {
4✔
1942
  int32_t code = TSDB_CODE_SUCCESS;
4✔
1943
  int32_t lino = 0;
4✔
1944

1945
  STableTsDataState tmpState = *pTsDataState;
4✔
1946
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT);
4✔
1947
  tmpState.pTableTsDataMap = tSimpleHashInit(DEFAULT_STATE_MAP_CAPACITY, hashFn);
4✔
1948
  QUERY_CHECK_NULL(tmpState.pTableTsDataMap, code, lino, _end, terrno);
4!
1949

1950
  code = recoverTsData(&tmpState);
4✔
1951
  QUERY_CHECK_CODE(code, lino, _end);
4!
1952

1953
  void*   pIte = NULL;
4✔
1954
  int32_t iter = 0;
4✔
1955
  while ((pIte = tSimpleHashIterate(pTsDataState->pTableTsDataMap, pIte, &iter)) != NULL) {
4!
UNCOV
1956
    size_t keyLen = 0;
×
UNCOV
1957
    void* pKey = tSimpleHashGetKey(pIte, &keyLen);
×
UNCOV
1958
    code = tSimpleHashPut(tmpState.pTableTsDataMap, pKey, keyLen, pIte, pTsDataState->pkValLen);
×
UNCOV
1959
    QUERY_CHECK_CODE(code, lino, _end);
×
1960
  }
1961
  tSimpleHashCleanup(pTsDataState->pTableTsDataMap);
4✔
1962
  pTsDataState->pTableTsDataMap = tmpState.pTableTsDataMap;
4✔
1963

1964
_end:
4✔
1965
  if (code != TSDB_CODE_SUCCESS) {
4!
UNCOV
1966
    tSimpleHashCleanup(tmpState.pTableTsDataMap);
×
UNCOV
1967
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1968
  }
1969
  return code;
4✔
1970
}
1971

UNCOV
1972
int32_t saveRecInfoToDisk(STableTsDataState* pTsDataState, SSessionKey* pKey, SRecDataInfo* pVal, int32_t vLen) {
×
UNCOV
1973
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1974
  int32_t lino = 0;
×
1975

UNCOV
1976
  SStateSessionKey stateKey = {.key = *pKey, .opNum = ((SStreamState*)pTsDataState->pState)->number};
×
UNCOV
1977
  code = streamStatePutBatchOptimize(pTsDataState->pState, pTsDataState->cfgIndex, pTsDataState->pBatch, &stateKey, pVal, vLen, 0,
×
1978
                                     pTsDataState->pBatchBuff);
UNCOV
1979
  QUERY_CHECK_CODE(code, lino, _end);
×
1980

UNCOV
1981
  memset(pTsDataState->pBatchBuff, 0, pTsDataState->batchBufflen);
×
1982

UNCOV
1983
  if (streamStateGetBatchSize(pTsDataState->pBatch) >= BATCH_LIMIT) {
×
UNCOV
1984
    code = streamStatePutBatch_rocksdb(pTsDataState->pState, pTsDataState->pBatch);
×
UNCOV
1985
    streamStateClearBatch(pTsDataState->pBatch);
×
UNCOV
1986
    QUERY_CHECK_CODE(code, lino, _end);
×
1987
  }
1988

UNCOV
1989
_end:
×
UNCOV
1990
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1991
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1992
  }
UNCOV
1993
  return code;
×
1994
}
1995

1996
int32_t flushRemainRecInfoToDisk(STableTsDataState* pTsDataState) {
6✔
1997
  int32_t code = streamStatePutBatch_rocksdb(pTsDataState->pState, pTsDataState->pBatch);
6✔
1998
  streamStateClearBatch(pTsDataState->pBatch);
6✔
1999
  return code;
6✔
2000
}
2001

2002
int32_t recoverHashSortBuff(SStreamFileState* pFileState, SArray* pWinStates, uint64_t groupId) {
4✔
2003
  int32_t code = TSDB_CODE_SUCCESS;
4✔
2004
  int32_t lino = 0;
4✔
2005

2006
  SWinKey          start = {.groupId = groupId, .ts = INT64_MAX};
4✔
2007
  void*            pState = getStateFileStore(pFileState);
4✔
2008
  SStreamStateCur* pCur = streamStateSeekKeyPrev_rocksdb(pState, &start);
4✔
2009
  for (int32_t i = 0; i < NUM_OF_CACHE_WIN; i++) {
8!
2010
    SWinKey tmpKey = {.groupId = groupId};
8✔
2011
    int32_t tmpRes = streamStateGetGroupKVByCur_rocksdb(pState, pCur, &tmpKey, NULL, 0);
8✔
2012
    if (tmpRes != TSDB_CODE_SUCCESS) {
8✔
2013
      break;
4✔
2014
    }
2015
    void* tmp = taosArrayPush(pWinStates, &tmpKey);
4✔
2016
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
4!
2017
    streamStateCurPrev_rocksdb(pCur);
4✔
2018
  }
2019
  taosArraySort(pWinStates, winKeyCmprImpl);
4✔
2020
  streamStateFreeCur(pCur);
4✔
2021

2022
_end:
4✔
2023
  if (code != TSDB_CODE_SUCCESS) {
4!
UNCOV
2024
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2025
  }
2026
  return code;
4✔
2027
}
2028

2029
int32_t getRowStateAllPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SArray* pResArray, int32_t maxNum) {
6✔
2030
  int32_t    code = TSDB_CODE_SUCCESS;
6✔
2031
  int32_t    lino = 0;
6✔
2032
  SWinKey*   pPrevKey = NULL;
6✔
2033
  SSHashObj* pSearchBuff = getSearchBuff(pFileState);
6✔
2034
  void*      pState = getStateFileStore(pFileState);
6✔
2035
  void**     ppBuff = (void**)tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
6✔
2036
  int32_t    num = 0;
6✔
2037
  if (ppBuff) {
6!
2038
    SArray* pWinStates = (SArray*)(*ppBuff);
6✔
2039
    int32_t size = taosArrayGetSize(pWinStates);
6✔
2040
    int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
6✔
2041
    for (; index >= 0 && num <= maxNum; index--) {
13!
2042
      pPrevKey = taosArrayGet(pWinStates, index);
7✔
2043
      if (winKeyCmprImpl(pPrevKey, pKey) == 0) {
7!
UNCOV
2044
        continue;
×
2045
      }
2046
      void*   pVal = NULL;
7✔
2047
      int32_t len = 0;
7✔
2048
      int32_t winCode = TSDB_CODE_SUCCESS;
7✔
2049
      code = addRowBuffIfNotExist(pFileState, (void*)pPrevKey, sizeof(SWinKey), &pVal, &len, &winCode);
7✔
2050
      QUERY_CHECK_CODE(code, lino, _end);
7!
2051
      void* tempRes = taosArrayPush(pResArray, &pVal);
7✔
2052
      QUERY_CHECK_NULL(tempRes, code, lino, _end, terrno);
7!
2053
      num++;
7✔
2054
    }
2055
  }
2056

2057
_end:
6✔
2058
  if (code != TSDB_CODE_SUCCESS) {
6!
UNCOV
2059
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2060
  }
2061
  return code;
6✔
2062
}
2063

UNCOV
2064
int32_t setStateRecFlag(SStreamFileState* pFileState, const void* pKey, int32_t keyLen, int32_t mode) {
×
UNCOV
2065
  return tSimpleHashPut(pFileState->pRecFlagMap, pKey, keyLen, &mode, sizeof(int32_t));
×
2066
}
2067

2068
int32_t getStateRecFlag(SStreamFileState* pFileState, const void* pKey, int32_t keyLen, int32_t* pMode) {
34✔
2069
  void* pVal = tSimpleHashGet(pFileState->pRecFlagMap, pKey, keyLen);
34✔
2070
  if (pVal == NULL) {
34!
2071
    return TSDB_CODE_FAILED;
34✔
2072
  }
UNCOV
2073
  *pMode = *(int32_t*) pVal;
×
UNCOV
2074
  return TSDB_CODE_SUCCESS;
×
2075
}
2076

UNCOV
2077
void clearExpiredSessionState(SStreamFileState* pFileState, int32_t numOfKeep, TSKEY minTs, SSHashObj* pFlushGroup) {
×
UNCOV
2078
  int32_t    code = TSDB_CODE_SUCCESS;
×
UNCOV
2079
  int32_t    lino = 0;
×
UNCOV
2080
  SSHashObj* pSessionBuff = pFileState->rowStateBuff;
×
UNCOV
2081
  SStreamSnapshot* pFlushList = NULL;
×
UNCOV
2082
  if (pFlushGroup != NULL) {
×
UNCOV
2083
    pFlushList = tdListNew(POINTER_BYTES);
×
2084
  }
UNCOV
2085
  void*      pIte = NULL;
×
UNCOV
2086
  int32_t    iter = 0;
×
UNCOV
2087
  while ((pIte = tSimpleHashIterate(pSessionBuff, pIte, &iter)) != NULL) {
×
UNCOV
2088
    SArray* pWinStates = *((void**)pIte);
×
UNCOV
2089
    int32_t arraySize = TARRAY_SIZE(pWinStates);
×
UNCOV
2090
    if (minTs != INT64_MAX && arraySize > numOfKeep) {
×
UNCOV
2091
      SSessionKey key = {.win.skey = minTs, .win.ekey = minTs};
×
UNCOV
2092
      key.groupId = *(uint64_t*)tSimpleHashGetKey(pIte, NULL);
×
UNCOV
2093
      int32_t index = binarySearch(pWinStates, arraySize, &key, fillStateKeyCompare);
×
UNCOV
2094
      numOfKeep = TMAX(arraySize - index, MIN_NUM_OF_SORT_CACHE_WIN);
×
UNCOV
2095
      qDebug("modify numOfKeep, numOfKeep:%d. %s at line %d", numOfKeep, __func__, __LINE__);
×
2096
    }
2097

UNCOV
2098
    int32_t size = arraySize - numOfKeep;
×
UNCOV
2099
    for (int32_t i = 0; i < size; i++) {
×
UNCOV
2100
      SRowBuffPos* pPos = taosArrayGetP(pWinStates, i);
×
UNCOV
2101
      SSessionKey* pKey = pPos->pKey;
×
UNCOV
2102
      if (tSimpleHashGetSize(pFileState->pRecFlagMap) > 0) {
×
UNCOV
2103
        tSimpleHashRemove(pFileState->pRecFlagMap, pKey, sizeof(SSessionKey));
×
2104
      }
UNCOV
2105
      pPos->invalid = true;
×
2106

UNCOV
2107
      if (i == 0 && pFlushGroup != NULL) {
×
UNCOV
2108
        void* pGpVal = tSimpleHashGet(pFlushGroup, &pKey->groupId, sizeof(uint64_t));
×
UNCOV
2109
        if (pGpVal == NULL) {
×
UNCOV
2110
          code = tdListAppend(pFlushList, &pPos);
×
UNCOV
2111
          QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
2112
          code = tSimpleHashPut(pFlushGroup, &pKey->groupId, sizeof(uint64_t), NULL, 0);
×
UNCOV
2113
          QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
2114
          continue;
×
2115
        }
2116
      }
UNCOV
2117
      pPos->beFlushed = true;
×
UNCOV
2118
      qTrace("clear expired session buff, ts:%" PRId64 ",groupid:%" PRIu64 ". %s at line %d", pKey->win.skey, pKey->groupId, __func__, __LINE__);
×
2119

UNCOV
2120
      if (isFlushedState(pFileState, pKey->win.skey, 0)) {
×
UNCOV
2121
        int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey);
×
UNCOV
2122
        qTrace("clear expired file, ts:%" PRId64 ". %s at line %d res:%d", pKey->win.skey, __func__, __LINE__, code_file);
×
2123
      }
2124
    }
UNCOV
2125
    taosArrayRemoveBatch(pWinStates, 0, size, NULL);
×
2126
  }
2127

UNCOV
2128
  if (pFlushList != NULL) {
×
UNCOV
2129
    flushSnapshot(pFileState, pFlushList, false);
×
UNCOV
2130
    code = clearRowBuffNonFlush(pFileState);
×
UNCOV
2131
    QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
2132
    tdListFreeP(pFlushList, destroyRowBuffPosPtr);
×
2133
  }
2134

UNCOV
2135
_end:
×
UNCOV
2136
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
2137
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2138
  }
UNCOV
2139
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc