• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3610

12 Feb 2025 09:54AM UTC coverage: 54.713% (-8.4%) from 63.066%
#3610

push

travis-ci

web-flow
Merge pull request #29745 from taosdata/fix/TD33664-3.0

fix: --version show information check for 3.0

120957 of 286549 branches covered (42.21%)

Branch coverage included in aggregate %.

190849 of 283342 relevant lines covered (67.36%)

4969786.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.51
/source/libs/stream/src/tstreamFileState.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tstreamFileState.h"
17

18
#include "query.h"
19
#include "streamBackendRocksdb.h"
20
#include "taos.h"
21
#include "tcommon.h"
22
#include "thash.h"
23
#include "tsimplehash.h"
24

25
#define FLUSH_RATIO                    0.5
26
#define FLUSH_NUM                      4
27
#define DEFAULT_MAX_STREAM_BUFFER_SIZE (128 * 1024 * 1024)
28
#define MIN_NUM_OF_ROW_BUFF            10240
29
#define MIN_NUM_OF_RECOVER_ROW_BUFF    128
30
#define MIN_NUM_SEARCH_BUCKET          128
31
#define MAX_ARRAY_SIZE                 1024
32
#define MAX_GROUP_ID_NUM               200000
33
#define NUM_OF_CACHE_WIN               64
34
#define MAX_NUM_OF_CACHE_WIN           128
35

36
#define TASK_KEY               "streamFileState"
37
#define STREAM_STATE_INFO_NAME "StreamStateCheckPoint"
38

39
struct SStreamFileState {
40
  SList*     usedBuffs;
41
  SList*     freeBuffs;
42
  void*      rowStateBuff;
43
  void*      pFileStore;
44
  int32_t    rowSize;
45
  int32_t    selectivityRowSize;
46
  int32_t    keyLen;
47
  uint64_t   preCheckPointVersion;
48
  uint64_t   checkPointVersion;
49
  TSKEY      maxTs;
50
  TSKEY      deleteMark;
51
  TSKEY      flushMark;
52
  uint64_t   maxRowCount;
53
  uint64_t   curRowCount;
54
  GetTsFun   getTs;
55
  char*      id;
56
  char*      cfName;
57
  void*      searchBuff;
58
  SSHashObj* pGroupIdMap;
59
  bool       hasFillCatch;
60

61
  _state_buff_cleanup_fn         stateBuffCleanupFn;
62
  _state_buff_remove_fn          stateBuffRemoveFn;
63
  _state_buff_remove_by_pos_fn   stateBuffRemoveByPosFn;
64
  _state_buff_create_statekey_fn stateBuffCreateStateKeyFn;
65

66
  _state_file_remove_fn stateFileRemoveFn;
67
  _state_file_get_fn    stateFileGetFn;
68

69
  _state_fun_get_fn stateFunctionGetFn;
70
};
71

72
typedef SRowBuffPos SRowBuffInfo;
73

74
int fillStateKeyCompare(const void* pWin1, const void* pDatas, int pos) {
378✔
75
  SWinKey* pWin2 = taosArrayGet(pDatas, pos);
378✔
76
  return winKeyCmprImpl((SWinKey*)pWin1, pWin2);
378✔
77
}
78

79
int32_t stateHashBuffRemoveFn(void* pBuff, const void* pKey, size_t keyLen) {
1,226✔
80
  SRowBuffPos** pos = tSimpleHashGet(pBuff, pKey, keyLen);
1,226✔
81
  if (pos) {
1,226✔
82
    (*pos)->beFlushed = true;
1,131✔
83
  }
84
  return tSimpleHashRemove(pBuff, pKey, keyLen);
1,226✔
85
}
86

87
void stateHashBuffRemoveByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) {
6,746,601✔
88
  size_t        keyLen = pFileState->keyLen;
6,746,601✔
89
  SRowBuffPos** ppPos = tSimpleHashGet(pFileState->rowStateBuff, pPos->pKey, keyLen);
6,746,601✔
90
  if (ppPos) {
6,742,622✔
91
    if ((*ppPos) == pPos) {
6,742,531!
92
      int32_t tmpRes = tSimpleHashRemove(pFileState->rowStateBuff, pPos->pKey, keyLen);
6,742,561✔
93
      qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
6,741,227✔
94
    }
95
  }
96
}
6,741,298✔
97

98
void stateHashBuffClearFn(void* pBuff) { tSimpleHashClear(pBuff); }
×
99

100
void stateHashBuffCleanupFn(void* pBuff) { tSimpleHashCleanup(pBuff); }
2,434✔
101

102
int32_t intervalFileRemoveFn(SStreamFileState* pFileState, const void* pKey) {
1,180✔
103
  return streamStateDel_rocksdb(pFileState->pFileStore, pKey);
1,180✔
104
}
105

106
int32_t intervalFileGetFn(SStreamFileState* pFileState, void* pKey, void** data, int32_t* pDataLen) {
546✔
107
  return streamStateGet_rocksdb(pFileState->pFileStore, pKey, data, pDataLen);
546✔
108
}
109

110
void* intervalCreateStateKey(SRowBuffPos* pPos, int64_t num) {
1,084,604✔
111
  SStateKey* pStateKey = taosMemoryCalloc(1, sizeof(SStateKey));
1,084,604!
112
  if (pStateKey == NULL) {
1,084,733!
113
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
114
    return NULL;
×
115
  }
116
  SWinKey* pWinKey = pPos->pKey;
1,084,733✔
117
  pStateKey->key = *pWinKey;
1,084,733✔
118
  pStateKey->opNum = num;
1,084,733✔
119
  return pStateKey;
1,084,733✔
120
}
121

122
void* defaultCreateStateKey(SRowBuffPos* pPos, int64_t num) {
24✔
123
  SWinKey* pStateKey = taosMemoryCalloc(1, sizeof(SWinKey));
24!
124
  if (pStateKey == NULL) {
24!
125
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
126
    return NULL;
×
127
  }
128
  SWinKey* pWinKey = pPos->pKey;
24✔
129
  *pStateKey = *pWinKey;
24✔
130
  return pStateKey;
24✔
131
}
132

133
int32_t sessionFileRemoveFn(SStreamFileState* pFileState, const void* pKey) {
571✔
134
  return streamStateSessionDel_rocksdb(pFileState->pFileStore, pKey);
571✔
135
}
136

137
int32_t sessionFileGetFn(SStreamFileState* pFileState, void* pKey, void** data, int32_t* pDataLen) {
95✔
138
  return streamStateSessionGet_rocksdb(pFileState->pFileStore, pKey, data, pDataLen);
95✔
139
}
140

141
void* sessionCreateStateKey(SRowBuffPos* pPos, int64_t num) {
119✔
142
  SStateSessionKey* pStateKey = taosMemoryCalloc(1, sizeof(SStateSessionKey));
119!
143
  if (pStateKey == NULL) {
119!
144
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
145
    return NULL;
×
146
  }
147
  SSessionKey* pWinKey = pPos->pKey;
119✔
148
  pStateKey->key = *pWinKey;
119✔
149
  pStateKey->opNum = num;
119✔
150
  return pStateKey;
119✔
151
}
152

153
static void streamFileStateDecode(TSKEY* pKey, void* pBuff, int32_t len) { pBuff = taosDecodeFixedI64(pBuff, pKey); }
2!
154

155
static int32_t streamFileStateEncode(TSKEY* pKey, void** pVal, int32_t* pLen) {
920✔
156
  *pLen = sizeof(TSKEY);
920✔
157
  (*pVal) = taosMemoryCalloc(1, *pLen);
920!
158
  if ((*pVal) == NULL) {
920!
159
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
160
    return terrno;
×
161
  }
162
  void*   buff = *pVal;
920✔
163
  int32_t tmp = taosEncodeFixedI64(&buff, *pKey);
920!
164
  return TSDB_CODE_SUCCESS;
920✔
165
}
166

167
int32_t streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, GetTsFun fp,
2,758✔
168
                            void* pFile, TSKEY delMark, const char* taskId, int64_t checkpointId, int8_t type,
169
                            SStreamFileState** ppFileState) {
170
  int32_t code = TSDB_CODE_SUCCESS;
2,758✔
171
  int32_t lino = 0;
2,758✔
172
  if (memSize <= 0) {
2,758!
173
    memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE;
×
174
  }
175
  if (rowSize == 0) {
2,758!
176
    code = TSDB_CODE_INVALID_PARA;
×
177
    QUERY_CHECK_CODE(code, lino, _end);
×
178
  }
179

180
  SStreamFileState* pFileState = taosMemoryCalloc(1, sizeof(SStreamFileState));
2,758!
181
  QUERY_CHECK_NULL(pFileState, code, lino, _end, terrno);
2,759!
182

183
  rowSize += selectRowSize;
2,759✔
184
  pFileState->maxRowCount = TMAX((uint64_t)memSize / rowSize, FLUSH_NUM * 2);
2,759✔
185
  pFileState->usedBuffs = tdListNew(POINTER_BYTES);
2,759✔
186
  QUERY_CHECK_NULL(pFileState->usedBuffs, code, lino, _end, terrno);
2,759!
187

188
  pFileState->freeBuffs = tdListNew(POINTER_BYTES);
2,759✔
189
  QUERY_CHECK_NULL(pFileState->freeBuffs, code, lino, _end, terrno);
2,758!
190

191
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
2,758✔
192
  int32_t    cap = TMIN(MIN_NUM_OF_ROW_BUFF, pFileState->maxRowCount);
2,757✔
193
  if (type == STREAM_STATE_BUFF_HASH || type == STREAM_STATE_BUFF_HASH_SEARCH) {
2,757✔
194
    pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
2,480✔
195
    pFileState->stateBuffCleanupFn = stateHashBuffCleanupFn;
2,482✔
196
    pFileState->stateBuffRemoveFn = stateHashBuffRemoveFn;
2,482✔
197
    pFileState->stateBuffRemoveByPosFn = stateHashBuffRemoveByPosFn;
2,482✔
198
    pFileState->stateBuffCreateStateKeyFn = intervalCreateStateKey;
2,482✔
199

200
    pFileState->stateFileRemoveFn = intervalFileRemoveFn;
2,482✔
201
    pFileState->stateFileGetFn = intervalFileGetFn;
2,482✔
202
    pFileState->cfName = taosStrdup("state");
2,482!
203
    pFileState->stateFunctionGetFn = addRowBuffIfNotExist;
2,482✔
204
  } else if (type == STREAM_STATE_BUFF_SORT) {
277✔
205
    pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
265✔
206
    pFileState->stateBuffCleanupFn = sessionWinStateCleanup;
265✔
207
    pFileState->stateBuffRemoveFn = deleteSessionWinStateBuffFn;
265✔
208
    pFileState->stateBuffRemoveByPosFn = deleteSessionWinStateBuffByPosFn;
265✔
209
    pFileState->stateBuffCreateStateKeyFn = sessionCreateStateKey;
265✔
210

211
    pFileState->stateFileRemoveFn = sessionFileRemoveFn;
265✔
212
    pFileState->stateFileGetFn = sessionFileGetFn;
265✔
213
    pFileState->cfName = taosStrdup("sess");
265!
214
    pFileState->stateFunctionGetFn = getSessionRowBuff;
265✔
215
  } else if (type == STREAM_STATE_BUFF_HASH_SORT) {
12!
216
    pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
12✔
217
    pFileState->searchBuff = tSimpleHashInit(MIN_NUM_SEARCH_BUCKET, hashFn);
12✔
218
    QUERY_CHECK_NULL(pFileState->searchBuff, code, lino, _end, terrno);
12!
219
    pFileState->stateBuffCleanupFn = stateHashBuffCleanupFn;
12✔
220
    pFileState->stateBuffRemoveFn = stateHashBuffRemoveFn;
12✔
221
    pFileState->stateBuffRemoveByPosFn = stateHashBuffRemoveByPosFn;
12✔
222
    pFileState->stateBuffCreateStateKeyFn = defaultCreateStateKey;
12✔
223

224
    pFileState->stateFileRemoveFn = hashSortFileRemoveFn;
12✔
225
    pFileState->stateFileGetFn = hashSortFileGetFn;
12✔
226
    pFileState->cfName = taosStrdup("fill");
12!
227
    pFileState->stateFunctionGetFn = NULL;
12✔
228
  }
229

230
  QUERY_CHECK_NULL(pFileState->usedBuffs, code, lino, _end, terrno);
2,759!
231
  QUERY_CHECK_NULL(pFileState->freeBuffs, code, lino, _end, terrno);
2,759!
232
  QUERY_CHECK_NULL(pFileState->rowStateBuff, code, lino, _end, terrno);
2,759!
233
  QUERY_CHECK_NULL(pFileState->cfName, code, lino, _end, terrno);
2,759!
234

235
  if (type == STREAM_STATE_BUFF_HASH_SEARCH) {
2,759✔
236
    pFileState->searchBuff = tSimpleHashInit(MIN_NUM_SEARCH_BUCKET, hashFn);
16✔
237
    QUERY_CHECK_NULL(pFileState->searchBuff, code, lino, _end, terrno);
16!
238
  }
239

240
  pFileState->keyLen = keySize;
2,759✔
241
  pFileState->rowSize = rowSize;
2,759✔
242
  pFileState->selectivityRowSize = selectRowSize;
2,759✔
243
  pFileState->preCheckPointVersion = 0;
2,759✔
244
  pFileState->checkPointVersion = 1;
2,759✔
245
  pFileState->pFileStore = pFile;
2,759✔
246
  pFileState->getTs = fp;
2,759✔
247
  pFileState->curRowCount = 0;
2,759✔
248
  pFileState->deleteMark = delMark;
2,759✔
249
  pFileState->flushMark = INT64_MIN;
2,759✔
250
  pFileState->maxTs = INT64_MIN;
2,759✔
251
  pFileState->id = taosStrdup(taskId);
2,759!
252
  QUERY_CHECK_NULL(pFileState->id, code, lino, _end, terrno);
2,759!
253

254
  pFileState->pGroupIdMap = tSimpleHashInit(1024, hashFn);
2,759✔
255
  QUERY_CHECK_NULL(pFileState->pGroupIdMap, code, lino, _end, terrno);
2,759!
256

257
  pFileState->hasFillCatch = true;
2,759✔
258

259
  if (type == STREAM_STATE_BUFF_HASH || type == STREAM_STATE_BUFF_HASH_SEARCH) {
2,759✔
260
    code = recoverSnapshot(pFileState, checkpointId);
2,482✔
261
  } else if (type == STREAM_STATE_BUFF_SORT) {
277✔
262
    code = recoverSession(pFileState, checkpointId);
265✔
263
  } else if (type == STREAM_STATE_BUFF_HASH_SORT) {
12!
264
    code = recoverFillSnapshot(pFileState, checkpointId);
12✔
265
  }
266
  QUERY_CHECK_CODE(code, lino, _end);
2,759!
267

268
  void*   valBuf = NULL;
2,759✔
269
  int32_t len = 0;
2,759✔
270
  int32_t tmpRes = streamDefaultGet_rocksdb(pFileState->pFileStore, STREAM_STATE_INFO_NAME, &valBuf, &len);
2,759✔
271
  if (tmpRes == TSDB_CODE_SUCCESS) {
2,759✔
272
    QUERY_CHECK_CONDITION((len == sizeof(TSKEY)), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
2!
273
    streamFileStateDecode(&pFileState->flushMark, valBuf, len);
2✔
274
    qDebug("===stream===flushMark  read:%" PRId64, pFileState->flushMark);
2!
275
  }
276
  taosMemoryFreeClear(valBuf);
2,759!
277
  (*ppFileState) = pFileState;
2,759✔
278

279
_end:
2,759✔
280
  if (code != TSDB_CODE_SUCCESS) {
2,759!
281
    streamFileStateDestroy(pFileState);
×
282
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
283
  }
284
  return code;
2,759✔
285
}
286

287
void destroyRowBuffPos(SRowBuffPos* pPos) {
6,366,554✔
288
  taosMemoryFreeClear(pPos->pKey);
6,366,554!
289
  taosMemoryFreeClear(pPos->pRowBuff);
6,367,937!
290
  taosMemoryFree(pPos);
6,364,953!
291
}
6,364,439✔
292

293
void destroyRowBuffPosPtr(void* ptr) {
928,699✔
294
  if (!ptr) {
928,699!
295
    return;
×
296
  }
297
  SRowBuffPos* pPos = *(SRowBuffPos**)ptr;
928,699✔
298
  if (!pPos->beUsed) {
928,699✔
299
    destroyRowBuffPos(pPos);
118✔
300
  }
301
}
302

303
void destroyRowBuffAllPosPtr(void* ptr) {
368,850✔
304
  if (!ptr) {
368,850!
305
    return;
×
306
  }
307
  SRowBuffPos* pPos = *(SRowBuffPos**)ptr;
368,850✔
308
  destroyRowBuffPos(pPos);
368,850✔
309
}
310

311
void destroyRowBuff(void* ptr) {
6,126,006✔
312
  if (!ptr) {
6,126,006!
313
    return;
×
314
  }
315
  taosMemoryFree(*(void**)ptr);
6,126,006!
316
}
317

318
void streamFileStateDestroy(SStreamFileState* pFileState) {
5,924✔
319
  if (!pFileState) {
5,924✔
320
    return;
3,225✔
321
  }
322

323
  taosMemoryFree(pFileState->id);
2,699!
324
  taosMemoryFree(pFileState->cfName);
2,699!
325
  tdListFreeP(pFileState->usedBuffs, destroyRowBuffAllPosPtr);
2,699✔
326
  tdListFreeP(pFileState->freeBuffs, destroyRowBuff);
2,699✔
327
  pFileState->stateBuffCleanupFn(pFileState->rowStateBuff);
2,699✔
328
  sessionWinStateCleanup(pFileState->searchBuff);
2,699✔
329
  tSimpleHashCleanup(pFileState->pGroupIdMap);
2,699✔
330
  taosMemoryFree(pFileState);
2,699!
331
}
332

333
int32_t putFreeBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
6,946,053✔
334
  int32_t code = TSDB_CODE_SUCCESS;
6,946,053✔
335
  int32_t lino = 0;
6,946,053✔
336
  if (pPos->pRowBuff) {
6,946,053✔
337
    code = tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff));
6,943,655✔
338
    QUERY_CHECK_CODE(code, lino, _end);
6,953,937!
339
    pPos->pRowBuff = NULL;
6,953,937✔
340
  }
341

342
_end:
2,398✔
343
  if (code != TSDB_CODE_SUCCESS) {
6,956,335!
344
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
345
  }
346
  return code;
6,953,686✔
347
}
348

349
void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) {
2,670✔
350
  int32_t   code = TSDB_CODE_SUCCESS;
2,670✔
351
  int32_t   lino = 0;
2,670✔
352
  SListIter iter = {0};
2,670✔
353
  tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
2,670✔
354

355
  SListNode* pNode = NULL;
2,670✔
356
  while ((pNode = tdListNext(&iter)) != NULL) {
6,117,662✔
357
    SRowBuffPos* pPos = *(SRowBuffPos**)(pNode->data);
6,114,989✔
358
    if (all || (pFileState->getTs(pPos->pKey) < ts && !pPos->beUsed)) {
6,114,989!
359
      code = putFreeBuff(pFileState, pPos);
5,995,572✔
360
      QUERY_CHECK_CODE(code, lino, _end);
5,995,560!
361

362
      if (!all) {
5,995,560✔
363
        pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
5,786,631✔
364
      }
365
      destroyRowBuffPos(pPos);
5,995,566✔
366
      SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode);
5,995,577✔
367
      taosMemoryFreeClear(tmp);
5,995,570!
368
    }
369
  }
370

371
_end:
2,663✔
372
  if (code != TSDB_CODE_SUCCESS) {
2,663!
373
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
374
  }
375
}
2,663✔
376

377
int32_t clearFlushedRowBuff(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool all) {
1,214✔
378
  int32_t   code = TSDB_CODE_SUCCESS;
1,214✔
379
  int32_t   lino = 0;
1,214✔
380
  uint64_t  i = 0;
1,214✔
381
  SListIter iter = {0};
1,214✔
382
  tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
1,214✔
383

384
  SListNode* pNode = NULL;
1,214✔
385
  while ((pNode = tdListNext(&iter)) != NULL && i < max) {
1,784,690!
386
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
1,784,872✔
387
    if (isFlushedState(pFileState, pFileState->getTs(pPos->pKey), 0)) {
1,784,872✔
388
      if (all || !pPos->beUsed) {
9,860!
389
        if (all && !pPos->pRowBuff) {
7!
390
          continue;
×
391
        }
392
        code = tdListAppend(pFlushList, &pPos);
7✔
393
        QUERY_CHECK_CODE(code, lino, _end);
48!
394

395
        pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
48✔
396
        pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
48✔
397
        SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode);
48✔
398
        taosMemoryFreeClear(tmp);
48!
399
        if (pPos->pRowBuff) {
48!
400
          i++;
48✔
401
        }
402
      }
403
    }
404
  }
405

406
_end:
1,203✔
407
  if (code != TSDB_CODE_SUCCESS) {
1,203!
408
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
409
  }
410
  return code;
1,214✔
411
}
412

413
void streamFileStateClear(SStreamFileState* pFileState) {
446✔
414
  pFileState->flushMark = INT64_MIN;
446✔
415
  pFileState->maxTs = INT64_MIN;
446✔
416
  tSimpleHashClear(pFileState->rowStateBuff);
446✔
417
  clearExpiredRowBuff(pFileState, 0, true);
446✔
418
}
446✔
419

420
bool needClearDiskBuff(SStreamFileState* pFileState) { return pFileState->flushMark > 0; }
6✔
421

422
void streamFileStateReleaseBuff(SStreamFileState* pFileState, SRowBuffPos* pPos, bool used) { pPos->beUsed = used; }
6,112,865✔
423

424
int32_t popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool used) {
1,084✔
425
  int32_t   code = TSDB_CODE_SUCCESS;
1,084✔
426
  int32_t   lino = 0;
1,084✔
427
  uint64_t  i = 0;
1,084✔
428
  SListIter iter = {0};
1,084✔
429
  tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
1,084✔
430

431
  SListNode* pNode = NULL;
1,084✔
432
  while ((pNode = tdListNext(&iter)) != NULL && i < max) {
2,748,245!
433
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
2,722,942✔
434
    if (pPos->beUsed == used) {
2,722,942✔
435
      if (used && !pPos->pRowBuff) {
953,362!
436
        QUERY_CHECK_CONDITION((pPos->needFree == true), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
437
        continue;
×
438
      }
439
      code = tdListAppend(pFlushList, &pPos);
953,362✔
440
      QUERY_CHECK_CODE(code, lino, _end);
963,101!
441

442
      pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
963,101✔
443
      pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
960,324✔
444
      SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode);
954,362✔
445
      taosMemoryFreeClear(tmp);
974,860!
446
      if (pPos->pRowBuff) {
977,581✔
447
        i++;
956,356✔
448
      }
449
    }
450
  }
451

452
  qInfo("stream state flush %d rows to disk. is used:%d", listNEles(pFlushList), used);
247!
453

454
_end:
×
455
  if (code != TSDB_CODE_SUCCESS) {
249!
456
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
457
  }
458
  return code;
1,086✔
459
}
460

461
int32_t flushRowBuff(SStreamFileState* pFileState) {
611✔
462
  int32_t          code = TSDB_CODE_SUCCESS;
611✔
463
  int32_t          lino = 0;
611✔
464
  SStreamSnapshot* pFlushList = tdListNew(POINTER_BYTES);
611✔
465
  if (!pFlushList) {
611!
466
    code = TSDB_CODE_OUT_OF_MEMORY;
×
467
    QUERY_CHECK_CODE(code, lino, _end);
×
468
  }
469

470
  uint64_t num = (uint64_t)(pFileState->curRowCount * FLUSH_RATIO);
611✔
471
  num = TMAX(num, FLUSH_NUM);
611✔
472
  code = clearFlushedRowBuff(pFileState, pFlushList, num, false);
611✔
473
  QUERY_CHECK_CODE(code, lino, _end);
611!
474

475
  if (isListEmpty(pFlushList)) {
611✔
476
    code = popUsedBuffs(pFileState, pFlushList, num, false);
563✔
477
    QUERY_CHECK_CODE(code, lino, _end);
564!
478

479
    if (isListEmpty(pFlushList)) {
564✔
480
      code = popUsedBuffs(pFileState, pFlushList, num, true);
522✔
481
      QUERY_CHECK_CODE(code, lino, _end);
522!
482
    }
483
  }
484

485
  if (pFileState->searchBuff) {
612✔
486
    code = clearFlushedRowBuff(pFileState, pFlushList, pFileState->curRowCount, true);
604✔
487
    QUERY_CHECK_CODE(code, lino, _end);
604!
488
  }
489

490
  flushSnapshot(pFileState, pFlushList, false);
612✔
491

492
  SListIter fIter = {0};
612✔
493
  tdListInitIter(pFlushList, &fIter, TD_LIST_FORWARD);
612✔
494
  SListNode* pNode = NULL;
612✔
495
  while ((pNode = tdListNext(&fIter)) != NULL) {
954,747!
496
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
952,780✔
497
    code = putFreeBuff(pFileState, pPos);
952,780✔
498
    QUERY_CHECK_CODE(code, lino, _end);
954,135!
499
  }
500

501
  tdListFreeP(pFlushList, destroyRowBuffPosPtr);
×
502

503
_end:
612✔
504
  if (code != TSDB_CODE_SUCCESS) {
612!
505
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
506
  }
507
  return code;
612✔
508
}
509

510
int32_t clearRowBuff(SStreamFileState* pFileState) {
703✔
511
  if (pFileState->deleteMark != INT64_MAX) {
703!
512
    clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark, false);
×
513
  }
514
  if (isListEmpty(pFileState->freeBuffs)) {
703✔
515
    return flushRowBuff(pFileState);
611✔
516
  }
517
  return TSDB_CODE_SUCCESS;
92✔
518
}
519

520
void* getFreeBuff(SStreamFileState* pFileState) {
7,333,296✔
521
  SList*     lists = pFileState->freeBuffs;
7,333,296✔
522
  int32_t    buffSize = pFileState->rowSize;
7,333,296✔
523
  SListNode* pNode = tdListPopHead(lists);
7,333,296✔
524
  if (!pNode) {
7,332,800✔
525
    return NULL;
6,601,664✔
526
  }
527
  void* ptr = *(void**)pNode->data;
731,136✔
528
  memset(ptr, 0, buffSize);
731,136✔
529
  taosMemoryFree(pNode);
731,136!
530
  return ptr;
731,054✔
531
}
532

533
void streamFileStateClearBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
×
534
  if (pPos->pRowBuff) {
×
535
    memset(pPos->pRowBuff, 0, pFileState->rowSize);
×
536
  }
537
}
×
538

539
SRowBuffPos* getNewRowPos(SStreamFileState* pFileState) {
7,329,954✔
540
  int32_t      code = TSDB_CODE_SUCCESS;
7,329,954✔
541
  int32_t      lino = 0;
7,329,954✔
542
  SRowBuffPos* pPos = taosMemoryCalloc(1, sizeof(SRowBuffPos));
7,329,954!
543
  if (!pPos) {
7,334,637!
544
    code = terrno;
×
545
    QUERY_CHECK_CODE(code, lino, _error);
×
546
  }
547

548
  pPos->pKey = taosMemoryCalloc(1, pFileState->keyLen);
7,334,637!
549
  if (!pPos->pKey) {
7,333,357!
550
    code = terrno;
×
551
    QUERY_CHECK_CODE(code, lino, _error);
×
552
  }
553

554
  void* pBuff = getFreeBuff(pFileState);
7,333,357✔
555
  if (pBuff) {
7,332,501✔
556
    pPos->pRowBuff = pBuff;
730,930✔
557
    goto _end;
730,930✔
558
  }
559

560
  if (pFileState->curRowCount < pFileState->maxRowCount) {
6,601,571!
561
    pBuff = taosMemoryCalloc(1, pFileState->rowSize);
6,601,576!
562
    QUERY_CHECK_NULL(pBuff, code, lino, _error, terrno);
6,601,968!
563
    pPos->pRowBuff = pBuff;
6,601,968✔
564
    pFileState->curRowCount++;
6,601,968✔
565
    goto _end;
6,601,968✔
566
  }
567

568
  code = clearRowBuff(pFileState);
×
569
  QUERY_CHECK_CODE(code, lino, _error);
8!
570

571
  pPos->pRowBuff = getFreeBuff(pFileState);
8✔
572

573
_end:
7,332,906✔
574
  code = tdListAppend(pFileState->usedBuffs, &pPos);
7,332,906✔
575
  QUERY_CHECK_CODE(code, lino, _error);
7,334,161!
576

577
  QUERY_CHECK_CONDITION((pPos->pRowBuff != NULL), code, lino, _error, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
7,334,161!
578
_error:
7,334,161✔
579
  if (code != TSDB_CODE_SUCCESS) {
7,334,161!
580
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
581
    return NULL;
×
582
  }
583

584
  return pPos;
7,334,161✔
585
}
586

587
SRowBuffPos* getNewRowPosForWrite(SStreamFileState* pFileState) {
7,330,093✔
588
  int32_t      code = TSDB_CODE_SUCCESS;
7,330,093✔
589
  int32_t      lino = 0;
7,330,093✔
590
  SRowBuffPos* newPos = getNewRowPos(pFileState);
7,330,093✔
591
  if (!newPos) {
7,333,535!
592
    code = TSDB_CODE_OUT_OF_MEMORY;
×
593
    QUERY_CHECK_CODE(code, lino, _error);
×
594
  }
595
  newPos->beUsed = true;
7,333,535✔
596
  newPos->beFlushed = false;
7,333,535✔
597
  newPos->needFree = false;
7,333,535✔
598
  newPos->beUpdated = true;
7,333,535✔
599
  return newPos;
7,333,535✔
600

601
_error:
×
602
  if (code != TSDB_CODE_SUCCESS) {
×
603
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
604
  }
605
  return NULL;
×
606
}
607

608
int32_t addRowBuffIfNotExist(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,
7,441,026✔
609
                             int32_t* pWinCode) {
610
  int32_t code = TSDB_CODE_SUCCESS;
7,441,026✔
611
  int32_t lino = 0;
7,441,026✔
612
  (*pWinCode) = TSDB_CODE_SUCCESS;
7,441,026✔
613
  pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey));
7,441,026✔
614
  SRowBuffPos** pos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen);
7,440,039✔
615
  if (pos) {
7,444,586✔
616
    if (pVal != NULL) {
117,897!
617
      *pVLen = pFileState->rowSize;
117,897✔
618
      *pVal = *pos;
117,897✔
619
      (*pos)->beUsed = true;
117,897✔
620
      (*pos)->beFlushed = false;
117,897✔
621
    }
622
    goto _end;
117,897✔
623
  }
624
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
7,326,689✔
625
  if (!pNewPos || !pNewPos->pRowBuff) {
7,329,500!
626
    code = TSDB_CODE_OUT_OF_MEMORY;
×
627
    QUERY_CHECK_CODE(code, lino, _end);
×
628
  }
629

630
  memcpy(pNewPos->pKey, pKey, keyLen);
7,329,500✔
631
  (*pWinCode) = TSDB_CODE_FAILED;
7,329,500✔
632

633
  TSKEY ts = pFileState->getTs(pKey);
7,329,500✔
634
  if (!isDeteled(pFileState, ts) && isFlushedState(pFileState, ts, 0)) {
7,328,626✔
635
    int32_t len = 0;
570✔
636
    void*   p = NULL;
570✔
637
    (*pWinCode) = pFileState->stateFileGetFn(pFileState, pKey, &p, &len);
570✔
638
    qDebug("===stream===get %" PRId64 " from disc, res %d", ts, (*pWinCode));
570!
639
    if ((*pWinCode) == TSDB_CODE_SUCCESS) {
570✔
640
      memcpy(pNewPos->pRowBuff, p, len);
63✔
641
    }
642
    taosMemoryFree(p);
570!
643
  }
644

645
  code = tSimpleHashPut(pFileState->rowStateBuff, pKey, keyLen, &pNewPos, POINTER_BYTES);
7,328,069✔
646
  QUERY_CHECK_CODE(code, lino, _end);
7,325,428!
647

648
  if (pVal) {
7,325,428!
649
    *pVLen = pFileState->rowSize;
7,325,566✔
650
    *pVal = pNewPos;
7,325,566✔
651
  }
652

653
_end:
×
654
  if (code != TSDB_CODE_SUCCESS) {
7,443,325!
655
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
656
  }
657
  return code;
7,443,454✔
658
}
659

660
void deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen) {
1,727✔
661
  int32_t code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, keyLen);
1,727✔
662
  qTrace("%s at line %d res:%d", __func__, __LINE__, code_buff);
1,727✔
663
  int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey);
1,727✔
664
  qTrace("%s at line %d res:%d", __func__, __LINE__, code_file);
1,727✔
665
  if (pFileState->searchBuff != NULL) {
1,727!
666
    deleteHashSortRowBuff(pFileState, pKey);
×
667
  }
668
}
1,727✔
669

670
static int32_t recoverSessionRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
95✔
671
  int32_t code = TSDB_CODE_SUCCESS;
95✔
672
  int32_t lino = 0;
95✔
673
  int32_t len = 0;
95✔
674
  void*   pBuff = NULL;
95✔
675
  code = pFileState->stateFileGetFn(pFileState, pPos->pKey, &pBuff, &len);
95✔
676
  QUERY_CHECK_CODE(code, lino, _end);
95!
677
  memcpy(pPos->pRowBuff, pBuff, len);
95✔
678
  taosMemoryFree(pBuff);
95!
679

680
_end:
95✔
681
  if (code != TSDB_CODE_SUCCESS) {
95!
682
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
683
  }
684
  return code;
95✔
685
}
686

687
static int32_t recoverStateRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
95✔
688
  int32_t code = TSDB_CODE_SUCCESS;
95✔
689
  int32_t lino = 0;
95✔
690
  pPos->pRowBuff = getFreeBuff(pFileState);
95✔
691
  if (!pPos->pRowBuff) {
95!
692
    if (pFileState->curRowCount < pFileState->maxRowCount) {
×
693
      pPos->pRowBuff = taosMemoryCalloc(1, pFileState->rowSize);
×
694
      if (!pPos->pRowBuff) {
×
695
        code = terrno;
×
696
        QUERY_CHECK_CODE(code, lino, _end);
×
697
      }
698
      pFileState->curRowCount++;
×
699
    } else {
700
      code = clearRowBuff(pFileState);
×
701
      QUERY_CHECK_CODE(code, lino, _end);
×
702
      pPos->pRowBuff = getFreeBuff(pFileState);
×
703
    }
704
    QUERY_CHECK_CONDITION((pPos->pRowBuff != NULL), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
705
  }
706

707
  code = recoverSessionRowBuff(pFileState, pPos);
95✔
708
  QUERY_CHECK_CODE(code, lino, _end);
95!
709

710
_end:
95✔
711
  if (code != TSDB_CODE_SUCCESS) {
95!
712
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
713
  }
714
  return code;
95✔
715
}
716

717
int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal) {
6,112,282✔
718
  int32_t code = TSDB_CODE_SUCCESS;
6,112,282✔
719
  int32_t lino = 0;
6,112,282✔
720
  if (pPos->pRowBuff) {
6,112,282!
721
    if (pPos->needFree) {
6,112,398!
722
      code = recoverSessionRowBuff(pFileState, pPos);
×
723
      QUERY_CHECK_CODE(code, lino, _end);
×
724
    }
725
    (*pVal) = pPos->pRowBuff;
6,112,398✔
726
    goto _end;
6,112,398✔
727
  }
728

729
  code = recoverStateRowBuff(pFileState, pPos);
×
730
  QUERY_CHECK_CODE(code, lino, _end);
95!
731

732
  (*pVal) = pPos->pRowBuff;
95✔
733
  if (!pPos->needFree) {
95!
734
    code = tdListPrepend(pFileState->usedBuffs, &pPos);
×
735
    QUERY_CHECK_CODE(code, lino, _end);
×
736
  }
737

738
_end:
95✔
739
  if (code != TSDB_CODE_SUCCESS) {
6,112,493!
740
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
741
  }
742
  return code;
6,112,473✔
743
}
744

745
bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen) {
×
746
  SRowBuffPos** pos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen);
×
747
  if (pos) {
×
748
    return true;
×
749
  }
750
  return false;
×
751
}
752

753
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) {
2,224✔
754
  int64_t mark = (pFileState->deleteMark == INT64_MAX || pFileState->maxTs == INT64_MIN)
1,914✔
755
                     ? INT64_MIN
756
                     : pFileState->maxTs - pFileState->deleteMark;
4,138✔
757
  clearExpiredRowBuff(pFileState, mark, false);
2,224✔
758
  return pFileState->usedBuffs;
2,223✔
759
}
760

761
void flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState) {
2,835✔
762
  int32_t   code = TSDB_CODE_SUCCESS;
2,835✔
763
  int32_t   lino = 0;
2,835✔
764
  SListIter iter = {0};
2,835✔
765
  tdListInitIter(pSnapshot, &iter, TD_LIST_FORWARD);
2,835✔
766

767
  const int32_t BATCH_LIMIT = 256;
2,835✔
768

769
  int64_t    st = taosGetTimestampMs();
2,835✔
770
  SListNode* pNode = NULL;
2,835✔
771

772
  int idx = streamStateGetCfIdx(pFileState->pFileStore, pFileState->cfName);
2,835✔
773

774
  int32_t len = (pFileState->rowSize + sizeof(uint64_t) + sizeof(int32_t) + 64) * 2;
2,836✔
775
  char*   buf = taosMemoryCalloc(1, len);
2,836!
776
  if (!buf) {
2,835!
777
    code = terrno;
×
778
    QUERY_CHECK_CODE(code, lino, _end);
×
779
  }
780

781
  void* batch = streamStateCreateBatch();
2,835✔
782
  if (!batch) {
2,836!
783
    code = TSDB_CODE_OUT_OF_MEMORY;
×
784
    QUERY_CHECK_CODE(code, lino, _end);
×
785
  }
786

787
  while ((pNode = tdListNext(&iter)) != NULL && code == TSDB_CODE_SUCCESS) {
1,088,030✔
788
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
1,085,050✔
789
    if (pPos->beFlushed || !pPos->pRowBuff) {
1,085,050!
790
      continue;
171✔
791
    }
792
    pPos->beFlushed = true;
1,084,879✔
793
    pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
1,084,879✔
794

795
    qDebug("===stream===flushed start:%" PRId64, pFileState->getTs(pPos->pKey));
1,084,812✔
796
    if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) {
1,084,814✔
797
      code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
4,172✔
798
      streamStateClearBatch(batch);
4,172✔
799
      QUERY_CHECK_CODE(code, lino, _end);
4,172!
800
    }
801

802
    void* pSKey = pFileState->stateBuffCreateStateKeyFn(pPos, ((SStreamState*)pFileState->pFileStore)->number);
1,084,776✔
803
    QUERY_CHECK_NULL(pSKey, code, lino, _end, terrno);
1,084,851!
804

805
    code = streamStatePutBatchOptimize(pFileState->pFileStore, idx, batch, pSKey, pPos->pRowBuff, pFileState->rowSize,
1,084,851✔
806
                                       0, buf);
807
    taosMemoryFreeClear(pSKey);
1,085,019!
808
    QUERY_CHECK_CODE(code, lino, _end);
1,085,023!
809
    // todo handle failure
810
    memset(buf, 0, len);
1,085,023✔
811
  }
812
  taosMemoryFreeClear(buf);
2,883!
813

814
  int32_t numOfElems = streamStateGetBatchSize(batch);
2,883✔
815
  if (numOfElems > 0) {
2,836✔
816
    code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
976✔
817
    QUERY_CHECK_CODE(code, lino, _end);
976!
818
  } else {
819
    goto _end;
1,860✔
820
  }
821

822
  streamStateClearBatch(batch);
976✔
823

824
  clearSearchBuff(pFileState);
976✔
825

826
  int64_t elapsed = taosGetTimestampMs() - st;
976✔
827
  qDebug("%s flush to disk in batch model completed, rows:%d, batch size:%d, elapsed time:%" PRId64 "ms",
976✔
828
         pFileState->id, numOfElems, BATCH_LIMIT, elapsed);
829

830
  if (flushState) {
976✔
831
    void*   valBuf = NULL;
920✔
832
    int32_t len = 0;
920✔
833
    code = streamFileStateEncode(&pFileState->flushMark, &valBuf, &len);
920✔
834
    QUERY_CHECK_CODE(code, lino, _end);
920!
835

836
    qDebug("===stream===flushMark write:%" PRId64, pFileState->flushMark);
920✔
837
    code = streamStatePutBatch(pFileState->pFileStore, "default", batch, STREAM_STATE_INFO_NAME, valBuf, len, 0);
920✔
838
    taosMemoryFree(valBuf);
920!
839
    QUERY_CHECK_CODE(code, lino, _end);
920!
840

841
    code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
920✔
842
    QUERY_CHECK_CODE(code, lino, _end);
920!
843
  }
844

845
_end:
56✔
846
  if (code != TSDB_CODE_SUCCESS) {
2,836!
847
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
848
  }
849
  taosMemoryFree(buf);
2,836!
850
  streamStateDestroyBatch(batch);
2,836✔
851
}
2,836✔
852

853
int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId) {
×
854
  char keyBuf[128] = {0};
×
855
  TAOS_UNUSED(tsnprintf(keyBuf, sizeof(keyBuf), "%s:%" PRId64 "", TASK_KEY, checkpointId));
×
856
  return streamDefaultDel_rocksdb(pFileState->pFileStore, keyBuf);
×
857
}
858

859
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
×
860
  int32_t code = TSDB_CODE_SUCCESS;
×
861
  int64_t maxCheckPointId = 0;
×
862
  {
863
    char    buf[128] = {0};
×
864
    void*   val = NULL;
×
865
    int32_t len = 0;
×
866
    memcpy(buf, TASK_KEY, strlen(TASK_KEY));
×
867
    code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len);
×
868
    if (code != 0 || len == 0 || val == NULL) {
×
869
      return TSDB_CODE_FAILED;
×
870
    }
871
    memcpy(buf, val, len);
×
872
    buf[len] = 0;
×
873
    maxCheckPointId = taosStr2Int64((char*)buf, NULL, 10);
×
874
    taosMemoryFree(val);
×
875
  }
876
  for (int64_t i = maxCheckPointId; i > 0; i--) {
×
877
    char    buf[128] = {0};
×
878
    void*   val = 0;
×
879
    int32_t len = 0;
×
880
    TAOS_UNUSED(tsnprintf(buf, sizeof(buf), "%s:%" PRId64 "", TASK_KEY, i));
×
881
    code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len);
×
882
    if (code != 0) {
×
883
      return TSDB_CODE_FAILED;
×
884
    }
885
    memcpy(buf, val, len);
×
886
    buf[len] = 0;
×
887
    taosMemoryFree(val);
×
888

889
    TSKEY ts;
890
    ts = taosStr2Int64((char*)buf, NULL, 10);
×
891
    if (ts < mark) {
×
892
      // statekey winkey.ts < mark
893
      int32_t tmpRes = forceRemoveCheckpoint(pFileState, i);
×
894
      qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
×
895
      break;
×
896
    }
897
  }
898
  return code;
×
899
}
900

901
int32_t recoverSession(SStreamFileState* pFileState, int64_t ckId) {
265✔
902
  int32_t code = TSDB_CODE_SUCCESS;
265✔
903
  int32_t lino = 0;
265✔
904
  int32_t winRes = TSDB_CODE_SUCCESS;
265✔
905
  if (pFileState->maxTs != INT64_MIN) {
265!
906
    int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs)
×
907
                       ? INT64_MIN
908
                       : pFileState->maxTs - pFileState->deleteMark;
×
909
    int32_t tmpRes = deleteExpiredCheckPoint(pFileState, mark);
×
910
    qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
×
911
  }
912

913
  SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileState->pFileStore, INT64_MAX);
265✔
914
  int32_t          recoverNum = TMIN(MIN_NUM_OF_RECOVER_ROW_BUFF, pFileState->maxRowCount);
265✔
915
  while (winRes == TSDB_CODE_SUCCESS) {
265!
916
    if (pFileState->curRowCount >= recoverNum) {
265!
917
      break;
265✔
918
    }
919

920
    void*       pVal = NULL;
265✔
921
    int32_t     vlen = 0;
265✔
922
    SSessionKey key = {0};
265✔
923
    winRes = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, &key, &pVal, &vlen);
265✔
924
    if (winRes != TSDB_CODE_SUCCESS) {
265!
925
      break;
265✔
926
    }
927

928
    if (vlen != pFileState->rowSize) {
×
929
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
930
      QUERY_CHECK_CODE(code, lino, _end);
×
931
    }
932

933
    SRowBuffPos* pPos = createSessionWinBuff(pFileState, &key, pVal, &vlen);
×
934
    pPos->beUsed = false;
×
935
    winRes = putSessionWinResultBuff(pFileState, pPos);
×
936
    if (winRes != TSDB_CODE_SUCCESS) {
×
937
      break;
×
938
    }
939

940
    winRes = streamStateSessionCurPrev_rocksdb(pCur);
×
941
  }
942

943
_end:
×
944
  if (code != TSDB_CODE_SUCCESS) {
265!
945
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
946
  }
947
  streamStateFreeCur(pCur);
265✔
948
  return code;
265✔
949
}
950

951
int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
2,482✔
952
  int32_t code = TSDB_CODE_SUCCESS;
2,482✔
953
  int32_t lino = 0;
2,482✔
954
  int32_t winCode = TSDB_CODE_SUCCESS;
2,482✔
955
  if (pFileState->maxTs != INT64_MIN) {
2,482!
956
    int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs)
×
957
                       ? INT64_MIN
958
                       : pFileState->maxTs - pFileState->deleteMark;
×
959
    int32_t tmpRes = deleteExpiredCheckPoint(pFileState, mark);
×
960
    qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
×
961
  }
962

963
  SStreamStateCur* pCur = streamStateSeekToLast_rocksdb(pFileState->pFileStore);
2,482✔
964
  int32_t          recoverNum = TMIN(MIN_NUM_OF_RECOVER_ROW_BUFF, pFileState->maxRowCount);
2,480✔
965
  while (winCode == TSDB_CODE_SUCCESS) {
2,480!
966
    if (pFileState->curRowCount >= recoverNum) {
2,481!
967
      break;
2,482✔
968
    }
969

970
    void*        pVal = NULL;
2,481✔
971
    int32_t      vlen = 0;
2,481✔
972
    SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
2,481✔
973
    if (!pNewPos || !pNewPos->pRowBuff) {
2,482!
974
      code = TSDB_CODE_OUT_OF_MEMORY;
×
975
      QUERY_CHECK_CODE(code, lino, _end);
×
976
    }
977

978
    winCode = streamStateGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, pNewPos->pKey, (const void**)&pVal, &vlen);
2,482✔
979
    qDebug("===stream=== get state by cur winres:%d. %s", winCode, __func__);
2,481✔
980
    if (winCode != TSDB_CODE_SUCCESS || pFileState->getTs(pNewPos->pKey) < pFileState->flushMark) {
2,481!
981
      destroyRowBuffPos(pNewPos);
2,481✔
982
      SListNode* pNode = tdListPopTail(pFileState->usedBuffs);
2,482✔
983
      taosMemoryFreeClear(pNode);
2,482!
984
      taosMemoryFreeClear(pVal);
2,482!
985
      break;
2,482✔
986
    }
987
    if (vlen != pFileState->rowSize) {
×
988
      qError("row size mismatch, expect:%d, actual:%d", pFileState->rowSize, vlen);
×
989
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
990
      taosMemoryFreeClear(pVal);
×
991
      QUERY_CHECK_CODE(code, lino, _end);
×
992
    }
993
    memcpy(pNewPos->pRowBuff, pVal, vlen);
×
994
    taosMemoryFreeClear(pVal);
×
995
    pNewPos->beFlushed = true;
×
996
    pNewPos->beUsed = false;
×
997
    qDebug("===stream=== read checkpoint state from disc. %s", __func__);
×
998
    code = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
×
999
    if (code != TSDB_CODE_SUCCESS) {
×
1000
      destroyRowBuffPos(pNewPos);
×
1001
      break;
×
1002
    }
1003
    streamStateCurPrev_rocksdb(pCur);
×
1004
  }
1005

1006
_end:
×
1007
  if (code != TSDB_CODE_SUCCESS) {
2,481!
1008
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1009
  }
1010
  streamStateFreeCur(pCur);
2,481✔
1011
  return code;
2,482✔
1012
}
1013

1014
int32_t streamFileStateGetSelectRowSize(SStreamFileState* pFileState) { return pFileState->selectivityRowSize; }
6✔
1015

1016
void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts) {
1,071✔
1017
  pFileState->flushMark = TMAX(pFileState->flushMark, ts);
1,071✔
1018
  pFileState->maxTs = TMAX(pFileState->maxTs, ts);
1,071✔
1019
}
1,071✔
1020

1021
void* getRowStateBuff(SStreamFileState* pFileState) { return pFileState->rowStateBuff; }
10,134✔
1022
void* getSearchBuff(SStreamFileState* pFileState) { return pFileState->searchBuff; }
7,444,204✔
1023

1024
void* getStateFileStore(SStreamFileState* pFileState) { return pFileState->pFileStore; }
5,861✔
1025

1026
bool isDeteled(SStreamFileState* pFileState, TSKEY ts) {
7,328,666✔
1027
  return pFileState->deleteMark != INT64_MAX && pFileState->maxTs > 0 &&
13,128,205!
1028
         ts < (pFileState->maxTs - pFileState->deleteMark);
5,799,539✔
1029
}
1030

1031
bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) { return ts <= (pFileState->flushMark + gap); }
5,636,795✔
1032

1033
TSKEY getFlushMark(SStreamFileState* pFileState) { return pFileState->flushMark; };
48✔
1034

1035
int32_t getRowStateRowSize(SStreamFileState* pFileState) { return pFileState->rowSize; }
13✔
1036

1037
int32_t getFunctionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) {
6✔
1038
  int32_t winCode = TSDB_CODE_SUCCESS;
6✔
1039
  return pFileState->stateFunctionGetFn(pFileState, pKey, keyLen, pVal, pVLen, &winCode);
6✔
1040
}
1041

1042
int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId) {
12✔
1043
  int32_t code = TSDB_CODE_SUCCESS;
12✔
1044
  int32_t lino = 0;
12✔
1045
  if (pFileState->maxTs != INT64_MIN) {
12!
1046
    int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs)
×
1047
                       ? INT64_MIN
1048
                       : pFileState->maxTs - pFileState->deleteMark;
×
1049
    code = deleteExpiredCheckPoint(pFileState, mark);
×
1050
    QUERY_CHECK_CODE(code, lino, _end);
×
1051
  }
1052

1053
  SStreamStateCur* pCur = streamStateFillSeekToLast_rocksdb(pFileState->pFileStore);
12✔
1054
  if (pCur == NULL) {
12!
1055
    return code;
12✔
1056
  }
1057
  int32_t recoverNum = TMIN(MIN_NUM_OF_RECOVER_ROW_BUFF, pFileState->maxRowCount);
×
1058
  int32_t winRes = TSDB_CODE_SUCCESS;
×
1059
  while (winRes == TSDB_CODE_SUCCESS) {
×
1060
    if (pFileState->curRowCount >= recoverNum) {
×
1061
      break;
×
1062
    }
1063

1064
    void*        pVal = NULL;
×
1065
    int32_t      vlen = 0;
×
1066
    SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
×
1067
    winRes = streamStateFillGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &vlen);
×
1068
    qDebug("===stream=== get state by cur winres:%d. %s", winRes, __func__);
×
1069
    if (winRes != TSDB_CODE_SUCCESS || isFlushedState(pFileState, pFileState->getTs(pNewPos->pKey), 0)) {
×
1070
      destroyRowBuffPos(pNewPos);
×
1071
      SListNode* pNode = tdListPopTail(pFileState->usedBuffs);
×
1072
      taosMemoryFreeClear(pNode);
×
1073
      taosMemoryFreeClear(pVal);
×
1074
      break;
×
1075
    }
1076

1077
    if (vlen != pFileState->rowSize) {
×
1078
      qError("row size mismatch, expect:%d, actual:%d", pFileState->rowSize, vlen);
×
1079
      destroyRowBuffPos(pNewPos);
×
1080
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1081
      taosMemoryFreeClear(pVal);
×
1082
      QUERY_CHECK_CODE(code, lino, _end);
×
1083
    }
1084

1085
    memcpy(pNewPos->pRowBuff, pVal, vlen);
×
1086
    taosMemoryFreeClear(pVal);
×
1087
    pNewPos->beFlushed = true;
×
1088
    pNewPos->beUsed = false;
×
1089
    qDebug("===stream=== read checkpoint state from disc. %s", __func__);
×
1090
    winRes = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
×
1091
    if (winRes != TSDB_CODE_SUCCESS) {
×
1092
      destroyRowBuffPos(pNewPos);
×
1093
      break;
×
1094
    }
1095
    streamStateCurPrev_rocksdb(pCur);
×
1096
  }
1097
  streamStateFreeCur(pCur);
×
1098

1099
_end:
×
1100
  if (code != TSDB_CODE_SUCCESS) {
×
1101
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1102
  }
1103
  return code;
×
1104
}
1105

1106
int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,
122✔
1107
                   int32_t* pWinCode) {
1108
  int32_t code = TSDB_CODE_SUCCESS;
122✔
1109
  int32_t lino = 0;
122✔
1110
  (*pWinCode) = TSDB_CODE_FAILED;
122✔
1111
  pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey));
122✔
1112
  SRowBuffPos** ppPos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen);
122✔
1113
  if (ppPos) {
122✔
1114
    *pVLen = pFileState->rowSize;
66✔
1115
    *pVal = *ppPos;
66✔
1116
    (*ppPos)->beUsed = true;
66✔
1117
    (*ppPos)->beFlushed = false;
66✔
1118
    (*pWinCode) = TSDB_CODE_SUCCESS;
66✔
1119
    if ((*ppPos)->pRowBuff == NULL) {
66!
1120
      code = recoverStateRowBuff(pFileState, *ppPos);
×
1121
      QUERY_CHECK_CODE(code, lino, _end);
×
1122
    }
1123
    goto _end;
66✔
1124
  }
1125
  TSKEY ts = pFileState->getTs(pKey);
56✔
1126
  if (!isDeteled(pFileState, ts) && isFlushedState(pFileState, ts, 0)) {
56!
1127
    int32_t len = 0;
×
1128
    void*   p = NULL;
×
1129
    (*pWinCode) = pFileState->stateFileGetFn(pFileState, pKey, &p, &len);
×
1130
    qDebug("===stream===get %" PRId64 " from disc, res %d", ts, (*pWinCode));
×
1131
    if ((*pWinCode) == TSDB_CODE_SUCCESS) {
×
1132
      SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
×
1133
      if (!pNewPos || !pNewPos->pRowBuff) {
×
1134
        code = TSDB_CODE_OUT_OF_MEMORY;
×
1135
        QUERY_CHECK_CODE(code, lino, _end);
×
1136
      }
1137

1138
      memcpy(pNewPos->pKey, pKey, keyLen);
×
1139
      memcpy(pNewPos->pRowBuff, p, len);
×
1140
      code = tSimpleHashPut(pFileState->rowStateBuff, pKey, keyLen, &pNewPos, POINTER_BYTES);
×
1141
      QUERY_CHECK_CODE(code, lino, _end);
×
1142

1143
      if (pVal) {
×
1144
        *pVLen = pFileState->rowSize;
×
1145
        *pVal = pNewPos;
×
1146
      }
1147
    }
1148
    taosMemoryFree(p);
×
1149
  }
1150

1151
_end:
56✔
1152
  if (code != TSDB_CODE_SUCCESS) {
122!
1153
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1154
  }
1155
  return code;
122✔
1156
}
1157

1158
int32_t streamFileStateGroupPut(SStreamFileState* pFileState, int64_t groupId, void* value, int32_t vLen) {
36✔
1159
  int32_t code = TSDB_CODE_SUCCESS;
36✔
1160
  int32_t lino = 0;
36✔
1161
  if (value != NULL) {
36!
1162
    code = TSDB_CODE_INVALID_PARA;
×
1163
    QUERY_CHECK_CODE(code, lino, _end);
×
1164
  }
1165

1166
  if (tSimpleHashGet(pFileState->pGroupIdMap, &groupId, sizeof(int64_t)) == NULL) {
36✔
1167
    if (tSimpleHashGetSize(pFileState->pGroupIdMap) <= MAX_GROUP_ID_NUM) {
6!
1168
      code = tSimpleHashPut(pFileState->pGroupIdMap, &groupId, sizeof(int64_t), NULL, 0);
6✔
1169
      QUERY_CHECK_CODE(code, lino, _end);
6!
1170
    }
1171
    code = streamStatePutParTag_rocksdb(pFileState->pFileStore, groupId, value, vLen);
6✔
1172
    QUERY_CHECK_CODE(code, lino, _end);
6!
1173
  }
1174

1175
_end:
36✔
1176
  if (code != TSDB_CODE_SUCCESS) {
36!
1177
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1178
  }
1179
  return code;
36✔
1180
}
1181

1182
void streamFileStateGroupCurNext(SStreamStateCur* pCur) {
78✔
1183
  SStreamFileState* pFileState = (SStreamFileState*)pCur->pStreamFileState;
78✔
1184
  if (pCur->hashIter == -1) {
78!
1185
    streamStateCurNext(pFileState->pFileStore, pCur);
×
1186
    return;
×
1187
  }
1188

1189
  int64_t gpId = *(int64_t*)tSimpleHashGetKey(pCur->pHashData, NULL);
78!
1190
  pCur->minGpId = TMAX(pCur->minGpId, gpId);
78✔
1191

1192
  SSHashObj* pHash = pFileState->pGroupIdMap;
78✔
1193
  pCur->pHashData = tSimpleHashIterate(pHash, pCur->pHashData, &pCur->hashIter);
78✔
1194
  if (!pCur->pHashData) {
78!
1195
    pCur->hashIter = -1;
78✔
1196
    streamStateParTagSeekKeyNext_rocksdb(pFileState->pFileStore, pCur->minGpId, pCur);
78✔
1197
    return;
78✔
1198
  }
1199
}
1200

1201
int32_t streamFileStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void** pVal, int32_t* pVLen) {
376✔
1202
  int32_t code = TSDB_CODE_SUCCESS;
376✔
1203
  if (pCur->pHashData) {
376✔
1204
    *pKey = *(int64_t*)tSimpleHashGetKey(pCur->pHashData, NULL);
78!
1205
    return code;
78✔
1206
  }
1207
  return streamStateParTagGetKVByCur_rocksdb(pCur, pKey, NULL, NULL);
298✔
1208
}
1209

1210
SSHashObj* getGroupIdCache(SStreamFileState* pFileState) {
298✔
1211
  return pFileState->pGroupIdMap;
298✔
1212
}
1213

1214
void clearExpiredState(SStreamFileState* pFileState) {
696✔
1215
  int32_t    code = TSDB_CODE_SUCCESS;
696✔
1216
  int32_t    lino = 0;
696✔
1217
  SSHashObj* pSearchBuff = pFileState->searchBuff;
696✔
1218
  void*      pIte = NULL;
696✔
1219
  int32_t    iter = 0;
696✔
1220
  while ((pIte = tSimpleHashIterate(pSearchBuff, pIte, &iter)) != NULL) {
878✔
1221
    SArray* pWinStates = *((void**)pIte);
182✔
1222
    int32_t size = taosArrayGetSize(pWinStates);
182✔
1223
    for (int32_t i = 0; i < size - 1; i++) {
252✔
1224
      SWinKey* pKey = taosArrayGet(pWinStates, i);
70✔
1225
      int32_t  code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, sizeof(SWinKey));
70✔
1226
      qTrace("clear expired buff, ts:%" PRId64 ". %s at line %d res:%d", pKey->ts, __func__, __LINE__, code_buff);
70!
1227

1228
      if (isFlushedState(pFileState, pKey->ts, 0)) {
70✔
1229
        int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey);
42✔
1230
        qTrace("clear expired file, ts:%" PRId64 ". %s at line %d res:%d", pKey->ts, __func__, __LINE__, code_file);
42!
1231
      }
1232
    }
1233
    taosArrayRemoveBatch(pWinStates, 0, size - 1, NULL);
182✔
1234
  }
1235
  code = clearRowBuff(pFileState);
695✔
1236
  QUERY_CHECK_CODE(code, lino, _end);
696!
1237

1238
_end:
696✔
1239
  if (code != TSDB_CODE_SUCCESS) {
696!
1240
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1241
  }
1242
}
696✔
1243

1244
#ifdef BUILD_NO_CALL
1245
int32_t getStateSearchRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen,
1246
                           int32_t* pWinCode) {
1247
  int32_t code = TSDB_CODE_SUCCESS;
1248
  int32_t lino = 0;
1249

1250
  code = addRowBuffIfNotExist(pFileState, (void*)pKey, sizeof(SWinKey), pVal, pVLen, pWinCode);
1251
  QUERY_CHECK_CODE(code, lino, _end);
1252

1253
  SArray*    pWinStates = NULL;
1254
  SSHashObj* pSearchBuff = getSearchBuff(pFileState);
1255
  void**     ppBuff = tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
1256
  if (ppBuff) {
1257
    pWinStates = (SArray*)(*ppBuff);
1258
  } else {
1259
    pWinStates = taosArrayInit(16, sizeof(SWinKey));
1260
    QUERY_CHECK_NULL(pWinStates, code, lino, _end, terrno);
1261

1262
    code = tSimpleHashPut(pSearchBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
1263
    QUERY_CHECK_CODE(code, lino, _end);
1264
  }
1265

1266
  // recover
1267
  if (taosArrayGetSize(pWinStates) == 0 && needClearDiskBuff(pFileState)) {
1268
    TSKEY            ts = getFlushMark(pFileState);
1269
    SWinKey          start = {.groupId = pKey->groupId, .ts = INT64_MAX};
1270
    void*            pState = getStateFileStore(pFileState);
1271
    SStreamStateCur* pCur = streamStateSeekKeyPrev_rocksdb(pState, &start);
1272
    for (int32_t i = 0; i < NUM_OF_CACHE_WIN; i++) {
1273
      SWinKey tmpKey = {.groupId = pKey->groupId};
1274
      int32_t tmpRes = streamStateGetGroupKVByCur_rocksdb(pState, pCur, &tmpKey, NULL, 0);
1275
      if (tmpRes != TSDB_CODE_SUCCESS) {
1276
        break;
1277
      }
1278
      void* tmp = taosArrayPush(pWinStates, &tmpKey);
1279
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1280
      streamStateCurPrev_rocksdb(pCur);
1281
    }
1282
    taosArraySort(pWinStates, winKeyCmprImpl);
1283
    streamStateFreeCur(pCur);
1284
  }
1285

1286
  int32_t size = taosArrayGetSize(pWinStates);
1287
  int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
1288
  if (!isFlushedState(pFileState, pKey->ts, 0)|| index >= 0) {
1289
    // find the first position which is smaller than the pKey
1290
    if (index >= 0) {
1291
      SWinKey* pTmpKey = taosArrayGet(pWinStates, index);
1292
      if (winKeyCmprImpl(pTmpKey, pKey) == 0) {
1293
        goto _end;
1294
      }
1295
    }
1296
    index++;
1297
    void* tmp = taosArrayInsert(pWinStates, index, pKey);
1298
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1299
  }
1300

1301
  if (size >= MAX_NUM_OF_CACHE_WIN) {
1302
    int32_t num = size - NUM_OF_CACHE_WIN;
1303
    taosArrayRemoveBatch(pWinStates, 0, num, NULL);
1304
  }
1305

1306
_end:
1307
  if (code != TSDB_CODE_SUCCESS) {
1308
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1309
  }
1310
  return code;
1311
}
1312
#endif
1313

1314
int32_t getRowStatePrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** ppVal,
48✔
1315
                           int32_t* pVLen, int32_t* pWinCode) {
1316
  int32_t    code = TSDB_CODE_SUCCESS;
48✔
1317
  int32_t    lino = 0;
48✔
1318
  SArray*    pWinStates = NULL;
48✔
1319
  SSHashObj* pSearchBuff = getSearchBuff(pFileState);
48✔
1320
  void*      pState = getStateFileStore(pFileState);
48✔
1321
  void**     ppBuff = (void**) tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
48✔
1322
  if (ppBuff) {
48!
1323
    pWinStates = (SArray*)(*ppBuff);
48✔
1324
  } else {
1325
    qDebug("===stream=== search buff is empty.group id:%" PRId64, pKey->groupId);
×
1326
    SStreamStateCur* pCur = streamStateSeekKeyPrev_rocksdb(pState, pKey);
×
1327
    void*            tmpVal = NULL;
×
1328
    int32_t          len = 0;
×
1329
    (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pState, pCur, pResKey, (const void**)&tmpVal, &len);
×
1330
    if ((*pWinCode) == TSDB_CODE_SUCCESS) {
×
1331
      SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
×
1332
      if (!pNewPos || !pNewPos->pRowBuff) {
×
1333
        code = TSDB_CODE_OUT_OF_MEMORY;
×
1334
        QUERY_CHECK_CODE(code, lino, _end);
×
1335
      }
1336
      memcpy(pNewPos->pRowBuff, tmpVal, len);
×
1337
      taosMemoryFreeClear(tmpVal);
×
1338
      *pVLen = getRowStateRowSize(pFileState);
×
1339
      (*ppVal) = pNewPos;
×
1340
    }
1341
    streamStateFreeCur(pCur);
×
1342
    return code;
×
1343
  }
1344
  int32_t size = taosArrayGetSize(pWinStates);
48✔
1345
  int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
48✔
1346
  if (index >= 0) {
48!
1347
    SWinKey* pCurKey = taosArrayGet(pWinStates, index);
48✔
1348
    if (winKeyCmprImpl(pCurKey, pKey) == 0) {
48!
1349
      index--;
48✔
1350
    } else {
1351
      qDebug("%s failed at line %d since do not find cur SWinKey. trigger may be force window close", __func__, __LINE__);
×
1352
    }
1353
  }
1354
  if (index == -1) {
48✔
1355
    SStreamStateCur* pCur = streamStateSeekKeyPrev_rocksdb(pState, pKey);
8✔
1356
    void*            tmpVal = NULL;
8✔
1357
    int32_t          len = 0;
8✔
1358
    (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pState, pCur, pResKey, (const void**)&tmpVal, &len);
8✔
1359
    if ((*pWinCode) == TSDB_CODE_SUCCESS) {
8!
1360
      SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
×
1361
      if (!pNewPos || !pNewPos->pRowBuff) {
×
1362
        code = TSDB_CODE_OUT_OF_MEMORY;
×
1363
        QUERY_CHECK_CODE(code, lino, _end);
×
1364
      }
1365
      memcpy(pNewPos->pRowBuff, tmpVal, len);
×
1366
      taosMemoryFreeClear(tmpVal);
×
1367
      *pVLen = getRowStateRowSize(pFileState);
×
1368
      (*ppVal) = pNewPos;
×
1369
    }
1370
    streamStateFreeCur(pCur);
8✔
1371
    return code;
8✔
1372
  } else {
1373
    SWinKey* pPrevKey = taosArrayGet(pWinStates, index);
40✔
1374
    *pResKey = *pPrevKey;
40✔
1375
    return addRowBuffIfNotExist(pFileState, (void*)pPrevKey, sizeof(SWinKey), ppVal, pVLen, pWinCode);
40✔
1376
  }
1377
  (*pWinCode) = TSDB_CODE_FAILED;
1378

1379
_end:
×
1380
  if (code != TSDB_CODE_SUCCESS) {
×
1381
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1382
  }
1383
  return code;
×
1384
}
1385

1386
int32_t addSearchItem(SStreamFileState* pFileState, SArray* pWinStates, const SWinKey* pKey) {
170✔
1387
  int32_t code = TSDB_CODE_SUCCESS;
170✔
1388
  int32_t lino = 0;
170✔
1389
  int32_t size = taosArrayGetSize(pWinStates);
170✔
1390
  int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
170✔
1391
  if (!isFlushedState(pFileState, pKey->ts, 0) || index >= 0 || size == 0) {
170!
1392
    if (index >= 0) {
170✔
1393
      SWinKey* pTmpKey = taosArrayGet(pWinStates, index);
156✔
1394
      if (winKeyCmprImpl(pTmpKey, pKey) == 0) {
156✔
1395
        goto _end;
86✔
1396
      }
1397
    }
1398
    index++;
84✔
1399
    void* tmp = taosArrayInsert(pWinStates, index, pKey);
84✔
1400
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
84!
1401
  }
1402

1403
  if (size >= MAX_NUM_OF_CACHE_WIN) {
84!
1404
    int32_t num = size - NUM_OF_CACHE_WIN;
×
1405
    taosArrayRemoveBatch(pWinStates, 0, num, NULL);
×
1406
  }
1407
_end:
84✔
1408
  if (code != TSDB_CODE_SUCCESS) {
170!
1409
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1410
  }
1411
  return code;
170✔
1412
}
1413

1414
int32_t addArrayBuffIfNotExist(SSHashObj* pSearchBuff, uint64_t groupId, SArray** ppResStates) {
226✔
1415
  int32_t code = TSDB_CODE_SUCCESS;
226✔
1416
  int32_t lino = 0; 
226✔
1417
  SArray*    pWinStates = NULL;
226✔
1418
  void**     ppBuff = tSimpleHashGet(pSearchBuff, &groupId, sizeof(uint64_t));
226✔
1419
  if (ppBuff) {
226✔
1420
    pWinStates = (SArray*)(*ppBuff);
212✔
1421
  } else {
1422
    pWinStates = taosArrayInit(16, sizeof(SWinKey));
14✔
1423
    QUERY_CHECK_NULL(pWinStates, code, lino, _end, terrno);
14!
1424

1425
    code = tSimpleHashPut(pSearchBuff, &groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
14✔
1426
    QUERY_CHECK_CODE(code, lino, _end);
14!
1427
  }
1428

1429
  (*ppResStates) = pWinStates;
226✔
1430

1431
_end:
226✔
1432
  if (code != TSDB_CODE_SUCCESS) {
226!
1433
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1434
  }
1435
  return code;
226✔
1436
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc