• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3631

07 Mar 2025 03:18PM UTC coverage: 60.671% (-3.0%) from 63.629%
#3631

push

travis-ci

web-flow
Merge pull request #30074 from taosdata/ciup30

ci: update ci workflow to fix path issue

141481 of 300084 branches covered (47.15%)

Branch coverage included in aggregate %.

223132 of 300884 relevant lines covered (74.16%)

7878557.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.98
/source/libs/stream/src/tstreamFileState.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tstreamFileState.h"
17

18
#include "query.h"
19
#include "streamBackendRocksdb.h"
20
#include "taos.h"
21
#include "tcommon.h"
22
#include "thash.h"
23
#include "tsimplehash.h"
24

25
#define FLUSH_RATIO                    0.5
26
#define FLUSH_NUM                      4
27
#define DEFAULT_MAX_STREAM_BUFFER_SIZE (128 * 1024 * 1024)
28
#define MIN_NUM_OF_ROW_BUFF            10240
29
#define MIN_NUM_OF_RECOVER_ROW_BUFF    128
30
#define MIN_NUM_SEARCH_BUCKET          128
31
#define MAX_ARRAY_SIZE                 1024
32
#define MAX_GROUP_ID_NUM               200000
33
#define NUM_OF_CACHE_WIN               64
34
#define MAX_NUM_OF_CACHE_WIN           128
35

36
#define TASK_KEY               "streamFileState"
37
#define STREAM_STATE_INFO_NAME "StreamStateCheckPoint"
38

39
struct SStreamFileState {
40
  SList*     usedBuffs;
41
  SList*     freeBuffs;
42
  void*      rowStateBuff;
43
  void*      pFileStore;
44
  int32_t    rowSize;
45
  int32_t    selectivityRowSize;
46
  int32_t    keyLen;
47
  uint64_t   preCheckPointVersion;
48
  uint64_t   checkPointVersion;
49
  TSKEY      maxTs;
50
  TSKEY      deleteMark;
51
  TSKEY      flushMark;
52
  uint64_t   maxRowCount;
53
  uint64_t   curRowCount;
54
  GetTsFun   getTs;
55
  char*      id;
56
  char*      cfName;
57
  void*      searchBuff;
58
  SSHashObj* pGroupIdMap;
59
  bool       hasFillCatch;
60

61
  _state_buff_cleanup_fn         stateBuffCleanupFn;
62
  _state_buff_remove_fn          stateBuffRemoveFn;
63
  _state_buff_remove_by_pos_fn   stateBuffRemoveByPosFn;
64
  _state_buff_create_statekey_fn stateBuffCreateStateKeyFn;
65

66
  _state_file_remove_fn stateFileRemoveFn;
67
  _state_file_get_fn    stateFileGetFn;
68

69
  _state_fun_get_fn stateFunctionGetFn;
70
};
71

72
typedef SRowBuffPos SRowBuffInfo;
73

74
int fillStateKeyCompare(const void* pWin1, const void* pDatas, int pos) {
7,725✔
75
  SWinKey* pWin2 = taosArrayGet(pDatas, pos);
7,725✔
76
  return winKeyCmprImpl((SWinKey*)pWin1, pWin2);
7,725✔
77
}
78

79
int32_t stateHashBuffRemoveFn(void* pBuff, const void* pKey, size_t keyLen) {
3,328✔
80
  SRowBuffPos** pos = tSimpleHashGet(pBuff, pKey, keyLen);
3,328✔
81
  if (pos) {
3,328✔
82
    (*pos)->beFlushed = true;
1,756✔
83
  }
84
  return tSimpleHashRemove(pBuff, pKey, keyLen);
3,328✔
85
}
86

87
void stateHashBuffRemoveByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) {
4,630,262✔
88
  size_t        keyLen = pFileState->keyLen;
4,630,262✔
89
  SRowBuffPos** ppPos = tSimpleHashGet(pFileState->rowStateBuff, pPos->pKey, keyLen);
4,630,262✔
90
  if (ppPos) {
4,630,154✔
91
    if ((*ppPos) == pPos) {
4,629,902!
92
      int32_t tmpRes = tSimpleHashRemove(pFileState->rowStateBuff, pPos->pKey, keyLen);
4,629,903✔
93
      qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
4,629,994✔
94
    }
95
  }
96
}
4,630,311✔
97

98
void stateHashBuffClearFn(void* pBuff) { tSimpleHashClear(pBuff); }
×
99

100
void stateHashBuffCleanupFn(void* pBuff) { tSimpleHashCleanup(pBuff); }
3,558✔
101

102
int32_t intervalFileRemoveFn(SStreamFileState* pFileState, const void* pKey) {
3,102✔
103
  return streamStateDel_rocksdb(pFileState->pFileStore, pKey);
3,102✔
104
}
105

106
int32_t intervalFileGetFn(SStreamFileState* pFileState, void* pKey, void** data, int32_t* pDataLen) {
285,766✔
107
  return streamStateGet_rocksdb(pFileState->pFileStore, pKey, data, pDataLen);
285,766✔
108
}
109

110
void* intervalCreateStateKey(SRowBuffPos* pPos, int64_t num) {
1,323,425✔
111
  SStateKey* pStateKey = taosMemoryCalloc(1, sizeof(SStateKey));
1,323,425!
112
  if (pStateKey == NULL) {
1,323,355!
113
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
114
    return NULL;
×
115
  }
116
  SWinKey* pWinKey = pPos->pKey;
1,323,355✔
117
  pStateKey->key = *pWinKey;
1,323,355✔
118
  pStateKey->opNum = num;
1,323,355✔
119
  return pStateKey;
1,323,355✔
120
}
121

122
void* defaultCreateStateKey(SRowBuffPos* pPos, int64_t num) {
517✔
123
  SWinKey* pStateKey = taosMemoryCalloc(1, sizeof(SWinKey));
517!
124
  if (pStateKey == NULL) {
517!
125
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
126
    return NULL;
×
127
  }
128
  SWinKey* pWinKey = pPos->pKey;
517✔
129
  *pStateKey = *pWinKey;
517✔
130
  return pStateKey;
517✔
131
}
132

133
int32_t sessionFileRemoveFn(SStreamFileState* pFileState, const void* pKey) {
1,458✔
134
  return streamStateSessionDel_rocksdb(pFileState->pFileStore, pKey);
1,458✔
135
}
136

137
int32_t sessionFileGetFn(SStreamFileState* pFileState, void* pKey, void** data, int32_t* pDataLen) {
258✔
138
  return streamStateSessionGet_rocksdb(pFileState->pFileStore, pKey, data, pDataLen);
258✔
139
}
140

141
void* sessionCreateStateKey(SRowBuffPos* pPos, int64_t num) {
344✔
142
  SStateSessionKey* pStateKey = taosMemoryCalloc(1, sizeof(SStateSessionKey));
344!
143
  if (pStateKey == NULL) {
344!
144
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
145
    return NULL;
×
146
  }
147
  SSessionKey* pWinKey = pPos->pKey;
344✔
148
  pStateKey->key = *pWinKey;
344✔
149
  pStateKey->opNum = num;
344✔
150
  return pStateKey;
344✔
151
}
152

153
static void streamFileStateDecode(TSKEY* pKey, void* pBuff, int32_t len) { pBuff = taosDecodeFixedI64(pBuff, pKey); }
17!
154

155
static int32_t streamFileStateEncode(TSKEY* pKey, void** pVal, int32_t* pLen) {
947✔
156
  *pLen = sizeof(TSKEY);
947✔
157
  (*pVal) = taosMemoryCalloc(1, *pLen);
947!
158
  if ((*pVal) == NULL) {
947!
159
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
160
    return terrno;
×
161
  }
162
  void*   buff = *pVal;
947✔
163
  int32_t tmp = taosEncodeFixedI64(&buff, *pKey);
947!
164
  return TSDB_CODE_SUCCESS;
947✔
165
}
166

167
int32_t streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, GetTsFun fp,
4,607✔
168
                            void* pFile, TSKEY delMark, const char* taskId, int64_t checkpointId, int8_t type,
169
                            SStreamFileState** ppFileState) {
170
  int32_t code = TSDB_CODE_SUCCESS;
4,607✔
171
  int32_t lino = 0;
4,607✔
172
  if (memSize <= 0) {
4,607!
173
    memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE;
×
174
  }
175
  if (rowSize == 0) {
4,607!
176
    code = TSDB_CODE_INVALID_PARA;
×
177
    QUERY_CHECK_CODE(code, lino, _end);
×
178
  }
179

180
  SStreamFileState* pFileState = taosMemoryCalloc(1, sizeof(SStreamFileState));
4,607!
181
  QUERY_CHECK_NULL(pFileState, code, lino, _end, terrno);
4,613!
182

183
  rowSize += selectRowSize;
4,613✔
184
  pFileState->maxRowCount = TMAX((uint64_t)memSize / rowSize, FLUSH_NUM * 2);
4,613✔
185
  pFileState->usedBuffs = tdListNew(POINTER_BYTES);
4,613✔
186
  QUERY_CHECK_NULL(pFileState->usedBuffs, code, lino, _end, terrno);
4,613!
187

188
  pFileState->freeBuffs = tdListNew(POINTER_BYTES);
4,613✔
189
  QUERY_CHECK_NULL(pFileState->freeBuffs, code, lino, _end, terrno);
4,613!
190

191
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
4,613✔
192
  int32_t    cap = TMIN(MIN_NUM_OF_ROW_BUFF, pFileState->maxRowCount);
4,612✔
193
  if (type == STREAM_STATE_BUFF_HASH || type == STREAM_STATE_BUFF_HASH_SEARCH) {
4,612✔
194
    pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
3,358✔
195
    pFileState->stateBuffCleanupFn = stateHashBuffCleanupFn;
3,357✔
196
    pFileState->stateBuffRemoveFn = stateHashBuffRemoveFn;
3,357✔
197
    pFileState->stateBuffRemoveByPosFn = stateHashBuffRemoveByPosFn;
3,357✔
198
    pFileState->stateBuffCreateStateKeyFn = intervalCreateStateKey;
3,357✔
199

200
    pFileState->stateFileRemoveFn = intervalFileRemoveFn;
3,357✔
201
    pFileState->stateFileGetFn = intervalFileGetFn;
3,357✔
202
    pFileState->cfName = taosStrdup("state");
3,357!
203
    pFileState->stateFunctionGetFn = addRowBuffIfNotExist;
3,357✔
204
  } else if (type == STREAM_STATE_BUFF_SORT) {
1,254✔
205
    pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
1,053✔
206
    pFileState->stateBuffCleanupFn = sessionWinStateCleanup;
1,053✔
207
    pFileState->stateBuffRemoveFn = deleteSessionWinStateBuffFn;
1,053✔
208
    pFileState->stateBuffRemoveByPosFn = deleteSessionWinStateBuffByPosFn;
1,053✔
209
    pFileState->stateBuffCreateStateKeyFn = sessionCreateStateKey;
1,053✔
210

211
    pFileState->stateFileRemoveFn = sessionFileRemoveFn;
1,053✔
212
    pFileState->stateFileGetFn = sessionFileGetFn;
1,053✔
213
    pFileState->cfName = taosStrdup("sess");
1,053!
214
    pFileState->stateFunctionGetFn = getSessionRowBuff;
1,053✔
215
  } else if (type == STREAM_STATE_BUFF_HASH_SORT) {
201!
216
    pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
201✔
217
    pFileState->searchBuff = tSimpleHashInit(MIN_NUM_SEARCH_BUCKET, hashFn);
201✔
218
    QUERY_CHECK_NULL(pFileState->searchBuff, code, lino, _end, terrno);
201!
219
    pFileState->stateBuffCleanupFn = stateHashBuffCleanupFn;
201✔
220
    pFileState->stateBuffRemoveFn = stateHashBuffRemoveFn;
201✔
221
    pFileState->stateBuffRemoveByPosFn = stateHashBuffRemoveByPosFn;
201✔
222
    pFileState->stateBuffCreateStateKeyFn = defaultCreateStateKey;
201✔
223

224
    pFileState->stateFileRemoveFn = hashSortFileRemoveFn;
201✔
225
    pFileState->stateFileGetFn = hashSortFileGetFn;
201✔
226
    pFileState->cfName = taosStrdup("fill");
201!
227
    pFileState->stateFunctionGetFn = NULL;
201✔
228
  }
229

230
  QUERY_CHECK_NULL(pFileState->usedBuffs, code, lino, _end, terrno);
4,611!
231
  QUERY_CHECK_NULL(pFileState->freeBuffs, code, lino, _end, terrno);
4,611!
232
  QUERY_CHECK_NULL(pFileState->rowStateBuff, code, lino, _end, terrno);
4,611!
233
  QUERY_CHECK_NULL(pFileState->cfName, code, lino, _end, terrno);
4,611!
234

235
  if (type == STREAM_STATE_BUFF_HASH_SEARCH) {
4,611✔
236
    pFileState->searchBuff = tSimpleHashInit(MIN_NUM_SEARCH_BUCKET, hashFn);
70✔
237
    QUERY_CHECK_NULL(pFileState->searchBuff, code, lino, _end, terrno);
70!
238
  }
239

240
  pFileState->keyLen = keySize;
4,611✔
241
  pFileState->rowSize = rowSize;
4,611✔
242
  pFileState->selectivityRowSize = selectRowSize;
4,611✔
243
  pFileState->preCheckPointVersion = 0;
4,611✔
244
  pFileState->checkPointVersion = 1;
4,611✔
245
  pFileState->pFileStore = pFile;
4,611✔
246
  pFileState->getTs = fp;
4,611✔
247
  pFileState->curRowCount = 0;
4,611✔
248
  pFileState->deleteMark = delMark;
4,611✔
249
  pFileState->flushMark = INT64_MIN;
4,611✔
250
  pFileState->maxTs = INT64_MIN;
4,611✔
251
  pFileState->id = taosStrdup(taskId);
4,611!
252
  QUERY_CHECK_NULL(pFileState->id, code, lino, _end, terrno);
4,612!
253

254
  pFileState->pGroupIdMap = tSimpleHashInit(1024, hashFn);
4,612✔
255
  QUERY_CHECK_NULL(pFileState->pGroupIdMap, code, lino, _end, terrno);
4,613!
256

257
  pFileState->hasFillCatch = true;
4,613✔
258

259
  if (type == STREAM_STATE_BUFF_HASH || type == STREAM_STATE_BUFF_HASH_SEARCH) {
4,613✔
260
    code = recoverSnapshot(pFileState, checkpointId);
3,358✔
261
  } else if (type == STREAM_STATE_BUFF_SORT) {
1,255✔
262
    code = recoverSession(pFileState, checkpointId);
1,054✔
263
  } else if (type == STREAM_STATE_BUFF_HASH_SORT) {
201!
264
    code = recoverFillSnapshot(pFileState, checkpointId);
201✔
265
  }
266
  QUERY_CHECK_CODE(code, lino, _end);
4,613!
267

268
  void*   valBuf = NULL;
4,613✔
269
  int32_t len = 0;
4,613✔
270
  int32_t tmpRes = streamDefaultGet_rocksdb(pFileState->pFileStore, STREAM_STATE_INFO_NAME, &valBuf, &len);
4,613✔
271
  if (tmpRes == TSDB_CODE_SUCCESS) {
4,613✔
272
    QUERY_CHECK_CONDITION((len == sizeof(TSKEY)), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
17!
273
    streamFileStateDecode(&pFileState->flushMark, valBuf, len);
17✔
274
    qDebug("===stream===flushMark  read:%" PRId64, pFileState->flushMark);
17✔
275
  }
276
  taosMemoryFreeClear(valBuf);
4,613!
277
  (*ppFileState) = pFileState;
4,613✔
278

279
_end:
4,613✔
280
  if (code != TSDB_CODE_SUCCESS) {
4,613!
281
    streamFileStateDestroy(pFileState);
×
282
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
283
  }
284
  return code;
4,613✔
285
}
286

287
void destroyRowBuffPos(SRowBuffPos* pPos) {
6,960,634✔
288
  taosMemoryFreeClear(pPos->pKey);
6,960,634!
289
  taosMemoryFreeClear(pPos->pRowBuff);
6,968,066!
290
  taosMemoryFree(pPos);
6,967,479!
291
}
6,970,418✔
292

293
void destroyRowBuffPosPtr(void* ptr) {
1,397✔
294
  if (!ptr) {
1,397!
295
    return;
×
296
  }
297
  SRowBuffPos* pPos = *(SRowBuffPos**)ptr;
1,397✔
298
  if (!pPos->beUsed) {
1,397✔
299
    destroyRowBuffPos(pPos);
1,237✔
300
  }
301
}
302

303
void destroyRowBuffAllPosPtr(void* ptr) {
2,281,775✔
304
  if (!ptr) {
2,281,775!
305
    return;
×
306
  }
307
  SRowBuffPos* pPos = *(SRowBuffPos**)ptr;
2,281,775✔
308
  destroyRowBuffPos(pPos);
2,281,775✔
309
}
310

311
void destroyRowBuff(void* ptr) {
4,586,610✔
312
  if (!ptr) {
4,586,610!
313
    return;
×
314
  }
315
  taosMemoryFree(*(void**)ptr);
4,586,610!
316
}
317

318
void streamFileStateDestroy(SStreamFileState* pFileState) {
10,586✔
319
  if (!pFileState) {
10,586✔
320
    return;
5,975✔
321
  }
322

323
  taosMemoryFree(pFileState->id);
4,611!
324
  taosMemoryFree(pFileState->cfName);
4,613!
325
  tdListFreeP(pFileState->usedBuffs, destroyRowBuffAllPosPtr);
4,613✔
326
  tdListFreeP(pFileState->freeBuffs, destroyRowBuff);
4,613✔
327
  pFileState->stateBuffCleanupFn(pFileState->rowStateBuff);
4,613✔
328
  sessionWinStateCleanup(pFileState->searchBuff);
4,613✔
329
  tSimpleHashCleanup(pFileState->pGroupIdMap);
4,613✔
330
  taosMemoryFree(pFileState);
4,613!
331
}
332

333
int32_t putFreeBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
4,676,377✔
334
  int32_t code = TSDB_CODE_SUCCESS;
4,676,377✔
335
  int32_t lino = 0;
4,676,377✔
336
  if (pPos->pRowBuff) {
4,676,377✔
337
    code = tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff));
4,676,328✔
338
    QUERY_CHECK_CODE(code, lino, _end);
4,676,310!
339
    pPos->pRowBuff = NULL;
4,676,310✔
340
  }
341

342
_end:
49✔
343
  if (code != TSDB_CODE_SUCCESS) {
4,676,359!
344
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
345
  }
346
  return code;
4,676,349✔
347
}
348

349
void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) {
4,170✔
350
  int32_t   code = TSDB_CODE_SUCCESS;
4,170✔
351
  int32_t   lino = 0;
4,170✔
352
  SListIter iter = {0};
4,170✔
353
  tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
4,170✔
354

355
  SListNode* pNode = NULL;
4,169✔
356
  while ((pNode = tdListNext(&iter)) != NULL) {
6,188,709✔
357
    SRowBuffPos* pPos = *(SRowBuffPos**)(pNode->data);
6,197,778✔
358
    if (all || (pFileState->getTs(pPos->pKey) < ts && !pPos->beUsed)) {
6,197,778!
359
      code = putFreeBuff(pFileState, pPos);
4,685,836✔
360
      QUERY_CHECK_CODE(code, lino, _end);
4,674,469!
361

362
      if (!all) {
4,674,469✔
363
        pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
4,629,273✔
364
      }
365
      destroyRowBuffPos(pPos);
4,674,519✔
366
      SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode);
4,674,528✔
367
      taosMemoryFreeClear(tmp);
4,674,386!
368
    }
369
  }
370

371
_end:
1,086✔
372
  if (code != TSDB_CODE_SUCCESS) {
1,086!
373
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
374
  }
375
}
1,086✔
376

377
int32_t clearFlushedRowBuff(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool all) {
5,550✔
378
  int32_t   code = TSDB_CODE_SUCCESS;
5,550✔
379
  int32_t   lino = 0;
5,550✔
380
  uint64_t  i = 0;
5,550✔
381
  SListIter iter = {0};
5,550✔
382
  tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
5,550✔
383

384
  SListNode* pNode = NULL;
5,549✔
385
  while ((pNode = tdListNext(&iter)) != NULL && i < max) {
9,170✔
386
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
3,621✔
387
    if (isFlushedState(pFileState, pFileState->getTs(pPos->pKey), 0)) {
3,621✔
388
      if (all || !pPos->beUsed) {
1,579✔
389
        if (all && !pPos->pRowBuff) {
851!
390
          continue;
×
391
        }
392
        code = tdListAppend(pFlushList, &pPos);
851✔
393
        QUERY_CHECK_CODE(code, lino, _end);
851!
394

395
        pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
851✔
396
        pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
851✔
397
        SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode);
851✔
398
        taosMemoryFreeClear(tmp);
851!
399
        if (pPos->pRowBuff) {
851✔
400
          i++;
802✔
401
        }
402
      }
403
    }
404
  }
405

406
_end:
5,549✔
407
  if (code != TSDB_CODE_SUCCESS) {
5,549!
408
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
409
  }
410
  return code;
5,549✔
411
}
412

413
void streamFileStateClear(SStreamFileState* pFileState) {
1,564✔
414
  pFileState->flushMark = INT64_MIN;
1,564✔
415
  pFileState->maxTs = INT64_MIN;
1,564✔
416
  tSimpleHashClear(pFileState->rowStateBuff);
1,564✔
417
  clearExpiredRowBuff(pFileState, 0, true);
1,564✔
418
}
1,564✔
419

420
bool needClearDiskBuff(SStreamFileState* pFileState) { return pFileState->flushMark > 0; }
141✔
421

422
void streamFileStateReleaseBuff(SStreamFileState* pFileState, SRowBuffPos* pPos, bool used) { pPos->beUsed = used; }
6,908,477✔
423

424
int32_t popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool used) {
4,521✔
425
  int32_t   code = TSDB_CODE_SUCCESS;
4,521✔
426
  int32_t   lino = 0;
4,521✔
427
  uint64_t  i = 0;
4,521✔
428
  SListIter iter = {0};
4,521✔
429
  tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
4,521✔
430

431
  SListNode* pNode = NULL;
4,521✔
432
  while ((pNode = tdListNext(&iter)) != NULL && i < max) {
5,369✔
433
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
848✔
434
    if (pPos->beUsed == used) {
848✔
435
      if (used && !pPos->pRowBuff) {
546!
436
        QUERY_CHECK_CONDITION((pPos->needFree == true), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
437
        continue;
×
438
      }
439
      code = tdListAppend(pFlushList, &pPos);
546✔
440
      QUERY_CHECK_CODE(code, lino, _end);
546!
441

442
      pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
546✔
443
      pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
546✔
444
      SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode);
546✔
445
      taosMemoryFreeClear(tmp);
546!
446
      if (pPos->pRowBuff) {
546!
447
        i++;
546✔
448
      }
449
    }
450
  }
451

452
  qInfo("stream state flush %d rows to disk. is used:%d", listNEles(pFlushList), used);
4,521!
453

454
_end:
×
455
  if (code != TSDB_CODE_SUCCESS) {
4,521!
456
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
457
  }
458
  return code;
4,521✔
459
}
460

461
int32_t flushRowBuff(SStreamFileState* pFileState) {
2,905✔
462
  int32_t          code = TSDB_CODE_SUCCESS;
2,905✔
463
  int32_t          lino = 0;
2,905✔
464
  SStreamSnapshot* pFlushList = tdListNew(POINTER_BYTES);
2,905✔
465
  if (!pFlushList) {
2,905!
466
    code = TSDB_CODE_OUT_OF_MEMORY;
×
467
    QUERY_CHECK_CODE(code, lino, _end);
×
468
  }
469

470
  uint64_t num = (uint64_t)(pFileState->curRowCount * FLUSH_RATIO);
2,905✔
471
  num = TMAX(num, FLUSH_NUM);
2,905✔
472
  code = clearFlushedRowBuff(pFileState, pFlushList, num, false);
2,905✔
473
  QUERY_CHECK_CODE(code, lino, _end);
2,905!
474

475
  if (isListEmpty(pFlushList)) {
2,905✔
476
    code = popUsedBuffs(pFileState, pFlushList, num, false);
2,361✔
477
    QUERY_CHECK_CODE(code, lino, _end);
2,361!
478

479
    if (isListEmpty(pFlushList)) {
2,361✔
480
      code = popUsedBuffs(pFileState, pFlushList, num, true);
2,160✔
481
      QUERY_CHECK_CODE(code, lino, _end);
2,160!
482
    }
483
  }
484

485
  if (pFileState->searchBuff) {
2,905✔
486
    code = clearFlushedRowBuff(pFileState, pFlushList, pFileState->curRowCount, true);
2,645✔
487
    QUERY_CHECK_CODE(code, lino, _end);
2,645!
488
  }
489

490
  flushSnapshot(pFileState, pFlushList, false);
2,905✔
491

492
  SListIter fIter = {0};
2,904✔
493
  tdListInitIter(pFlushList, &fIter, TD_LIST_FORWARD);
2,904✔
494
  SListNode* pNode = NULL;
2,904✔
495
  while ((pNode = tdListNext(&fIter)) != NULL) {
4,301✔
496
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
1,397✔
497
    code = putFreeBuff(pFileState, pPos);
1,397✔
498
    QUERY_CHECK_CODE(code, lino, _end);
1,397!
499
  }
500

501
  tdListFreeP(pFlushList, destroyRowBuffPosPtr);
2,905✔
502

503
_end:
2,905✔
504
  if (code != TSDB_CODE_SUCCESS) {
2,905!
505
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
506
  }
507
  return code;
2,905✔
508
}
509

510
int32_t clearRowBuff(SStreamFileState* pFileState) {
3,522✔
511
  if (pFileState->deleteMark != INT64_MAX) {
3,522!
512
    clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark, false);
×
513
  }
514
  if (isListEmpty(pFileState->freeBuffs)) {
3,522✔
515
    return flushRowBuff(pFileState);
2,905✔
516
  }
517
  return TSDB_CODE_SUCCESS;
617✔
518
}
519

520
void* getFreeBuff(SStreamFileState* pFileState) {
6,975,858✔
521
  SList*     lists = pFileState->freeBuffs;
6,975,858✔
522
  int32_t    buffSize = pFileState->rowSize;
6,975,858✔
523
  SListNode* pNode = tdListPopHead(lists);
6,975,858✔
524
  if (!pNode) {
6,975,858✔
525
    return NULL;
6,935,542✔
526
  }
527
  void* ptr = *(void**)pNode->data;
40,316✔
528
  memset(ptr, 0, buffSize);
40,316✔
529
  taosMemoryFree(pNode);
40,316!
530
  return ptr;
40,327✔
531
}
532

533
void streamFileStateClearBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
×
534
  if (pPos->pRowBuff) {
×
535
    memset(pPos->pRowBuff, 0, pFileState->rowSize);
×
536
  }
537
}
×
538

539
SRowBuffPos* getNewRowPos(SStreamFileState* pFileState) {
6,975,219✔
540
  int32_t      code = TSDB_CODE_SUCCESS;
6,975,219✔
541
  int32_t      lino = 0;
6,975,219✔
542
  SRowBuffPos* pPos = taosMemoryCalloc(1, sizeof(SRowBuffPos));
6,975,219!
543
  if (!pPos) {
6,975,276!
544
    code = terrno;
×
545
    QUERY_CHECK_CODE(code, lino, _error);
×
546
  }
547

548
  pPos->pKey = taosMemoryCalloc(1, pFileState->keyLen);
6,975,276!
549
  if (!pPos->pKey) {
6,975,253!
550
    code = terrno;
×
551
    QUERY_CHECK_CODE(code, lino, _error);
×
552
  }
553

554
  void* pBuff = getFreeBuff(pFileState);
6,975,253✔
555
  if (pBuff) {
6,975,256✔
556
    pPos->pRowBuff = pBuff;
39,838✔
557
    goto _end;
39,838✔
558
  }
559

560
  if (pFileState->curRowCount < pFileState->maxRowCount) {
6,935,418✔
561
    pBuff = taosMemoryCalloc(1, pFileState->rowSize);
6,935,279!
562
    QUERY_CHECK_NULL(pBuff, code, lino, _error, terrno);
6,935,260!
563
    pPos->pRowBuff = pBuff;
6,935,260✔
564
    pFileState->curRowCount++;
6,935,260✔
565
    goto _end;
6,935,260✔
566
  }
567

568
  code = clearRowBuff(pFileState);
139✔
569
  QUERY_CHECK_CODE(code, lino, _error);
144!
570

571
  pPos->pRowBuff = getFreeBuff(pFileState);
144✔
572

573
_end:
6,975,242✔
574
  code = tdListAppend(pFileState->usedBuffs, &pPos);
6,975,242✔
575
  QUERY_CHECK_CODE(code, lino, _error);
6,975,253!
576

577
  QUERY_CHECK_CONDITION((pPos->pRowBuff != NULL), code, lino, _error, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
6,975,253✔
578
_error:
6,975,252✔
579
  if (code != TSDB_CODE_SUCCESS) {
6,975,253✔
580
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1!
581
    return NULL;
1✔
582
  }
583

584
  return pPos;
6,975,252✔
585
}
586

587
SRowBuffPos* getNewRowPosForWrite(SStreamFileState* pFileState) {
6,975,228✔
588
  int32_t      code = TSDB_CODE_SUCCESS;
6,975,228✔
589
  int32_t      lino = 0;
6,975,228✔
590
  SRowBuffPos* newPos = getNewRowPos(pFileState);
6,975,228✔
591
  if (!newPos) {
6,975,256✔
592
    code = TSDB_CODE_OUT_OF_MEMORY;
1✔
593
    QUERY_CHECK_CODE(code, lino, _error);
1!
594
  }
595
  newPos->beUsed = true;
6,975,255✔
596
  newPos->beFlushed = false;
6,975,255✔
597
  newPos->needFree = false;
6,975,255✔
598
  newPos->beUpdated = true;
6,975,255✔
599
  return newPos;
6,975,255✔
600

601
_error:
1✔
602
  if (code != TSDB_CODE_SUCCESS) {
1!
603
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1!
604
  }
605
  return NULL;
1✔
606
}
607

608
int32_t addRowBuffIfNotExist(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,
7,694,916✔
609
                             int32_t* pWinCode) {
610
  int32_t code = TSDB_CODE_SUCCESS;
7,694,916✔
611
  int32_t lino = 0;
7,694,916✔
612
  (*pWinCode) = TSDB_CODE_SUCCESS;
7,694,916✔
613
  pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey));
7,694,916✔
614
  SRowBuffPos** pos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen);
7,694,920✔
615
  if (pos) {
7,695,024✔
616
    if (pVal != NULL) {
726,793!
617
      *pVLen = pFileState->rowSize;
726,801✔
618
      *pVal = *pos;
726,801✔
619
      (*pos)->beUsed = true;
726,801✔
620
      (*pos)->beFlushed = false;
726,801✔
621
    }
622
    goto _end;
726,793✔
623
  }
624
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
6,968,231✔
625
  if (!pNewPos || !pNewPos->pRowBuff) {
6,968,243!
626
    code = TSDB_CODE_OUT_OF_MEMORY;
×
627
    QUERY_CHECK_CODE(code, lino, _end);
×
628
  }
629

630
  memcpy(pNewPos->pKey, pKey, keyLen);
6,968,243✔
631
  (*pWinCode) = TSDB_CODE_FAILED;
6,968,243✔
632

633
  TSKEY ts = pFileState->getTs(pKey);
6,968,243✔
634
  if (!isDeteled(pFileState, ts) && isFlushedState(pFileState, ts, 0)) {
6,968,230✔
635
    int32_t len = 0;
286,105✔
636
    void*   p = NULL;
286,105✔
637
    (*pWinCode) = pFileState->stateFileGetFn(pFileState, pKey, &p, &len);
286,105✔
638
    qDebug("===stream===get %" PRId64 " from disc, res %d", ts, (*pWinCode));
286,103✔
639
    if ((*pWinCode) == TSDB_CODE_SUCCESS) {
286,103✔
640
      memcpy(pNewPos->pRowBuff, p, len);
563✔
641
    }
642
    taosMemoryFree(p);
286,103!
643
  }
644

645
  code = tSimpleHashPut(pFileState->rowStateBuff, pKey, keyLen, &pNewPos, POINTER_BYTES);
6,968,223✔
646
  QUERY_CHECK_CODE(code, lino, _end);
6,968,207!
647

648
  if (pVal) {
6,968,207✔
649
    *pVLen = pFileState->rowSize;
6,968,203✔
650
    *pVal = pNewPos;
6,968,203✔
651
  }
652

653
_end:
4✔
654
  if (code != TSDB_CODE_SUCCESS) {
7,695,000!
655
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
656
  }
657
  return code;
7,694,926✔
658
}
659

660
void deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen) {
4,493✔
661
  int32_t code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, keyLen);
4,493✔
662
  qTrace("%s at line %d res:%d", __func__, __LINE__, code_buff);
4,493✔
663
  int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey);
4,493✔
664
  qTrace("%s at line %d res:%d", __func__, __LINE__, code_file);
4,493✔
665
  if (pFileState->searchBuff != NULL) {
4,493✔
666
    deleteHashSortRowBuff(pFileState, pKey);
41✔
667
  }
668
}
4,493✔
669

670
void deleteRowBuffByGroupId(SStreamFileState* pFileState, uint64_t groupId) {
55✔
671
  SSHashObj* pRowMap = pFileState->rowStateBuff;
55✔
672
  void*   pIte = NULL;
55✔
673
  int32_t iter = 0;
55✔
674
  while ((pIte = tSimpleHashIterate(pRowMap, pIte, &iter)) != NULL) {
115✔
675
    size_t keyLen = 0;
60✔
676
    SWinKey* pKey = tSimpleHashGetKey(pIte, &keyLen);
60!
677
    if (pKey->groupId == groupId) {
60!
678
      int32_t tmpRes = tSimpleHashIterateRemove(pRowMap, pKey, keyLen, &pIte, &iter);
60✔
679
      qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
60!
680
    }
681
  }
682

683
  while (1) {
340✔
684
    SWinKey tmp = {.ts = INT64_MIN, .groupId = groupId};
395✔
685
    SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pFileState->pFileStore, &tmp);
395✔
686
    SWinKey delKey = {.groupId = groupId};
395✔
687
    int32_t code = streamStateGetGroupKVByCur_rocksdb(pFileState->pFileStore, pCur, &delKey, NULL, 0);
395✔
688
    if (code != TSDB_CODE_SUCCESS) {
395✔
689
      break;
55✔
690
    }
691
    code = streamStateDel_rocksdb(pFileState->pFileStore, &delKey);
340✔
692
    qTrace("%s at line %d res:%d", __func__, __LINE__, code);
340!
693
  }
694
}
55✔
695

696
static int32_t recoverSessionRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
346✔
697
  int32_t code = TSDB_CODE_SUCCESS;
346✔
698
  int32_t lino = 0;
346✔
699
  int32_t len = 0;
346✔
700
  void*   pBuff = NULL;
346✔
701
  code = pFileState->stateFileGetFn(pFileState, pPos->pKey, &pBuff, &len);
346✔
702
  QUERY_CHECK_CODE(code, lino, _end);
346!
703
  memcpy(pPos->pRowBuff, pBuff, len);
346✔
704
  taosMemoryFree(pBuff);
346!
705

706
_end:
346✔
707
  if (code != TSDB_CODE_SUCCESS) {
346!
708
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
709
  }
710
  return code;
346✔
711
}
712

713
static int32_t recoverStateRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
346✔
714
  int32_t code = TSDB_CODE_SUCCESS;
346✔
715
  int32_t lino = 0;
346✔
716
  pPos->pRowBuff = getFreeBuff(pFileState);
346✔
717
  if (!pPos->pRowBuff) {
346✔
718
    if (pFileState->curRowCount < pFileState->maxRowCount) {
116!
719
      pPos->pRowBuff = taosMemoryCalloc(1, pFileState->rowSize);
×
720
      if (!pPos->pRowBuff) {
×
721
        code = terrno;
×
722
        QUERY_CHECK_CODE(code, lino, _end);
×
723
      }
724
      pFileState->curRowCount++;
×
725
    } else {
726
      code = clearRowBuff(pFileState);
116✔
727
      QUERY_CHECK_CODE(code, lino, _end);
116!
728
      pPos->pRowBuff = getFreeBuff(pFileState);
116✔
729
    }
730
    QUERY_CHECK_CONDITION((pPos->pRowBuff != NULL), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
116!
731
  }
732

733
  code = recoverSessionRowBuff(pFileState, pPos);
346✔
734
  QUERY_CHECK_CODE(code, lino, _end);
346!
735

736
_end:
346✔
737
  if (code != TSDB_CODE_SUCCESS) {
346!
738
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
739
  }
740
  return code;
346✔
741
}
742

743
int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal) {
6,856,762✔
744
  int32_t code = TSDB_CODE_SUCCESS;
6,856,762✔
745
  int32_t lino = 0;
6,856,762✔
746
  if (pPos->pRowBuff) {
6,856,762✔
747
    if (pPos->needFree) {
6,856,424!
748
      code = recoverSessionRowBuff(pFileState, pPos);
×
749
      QUERY_CHECK_CODE(code, lino, _end);
×
750
    }
751
    (*pVal) = pPos->pRowBuff;
6,856,424✔
752
    goto _end;
6,856,424✔
753
  }
754

755
  code = recoverStateRowBuff(pFileState, pPos);
338✔
756
  QUERY_CHECK_CODE(code, lino, _end);
346!
757

758
  (*pVal) = pPos->pRowBuff;
346✔
759
  if (!pPos->needFree) {
346✔
760
    code = tdListPrepend(pFileState->usedBuffs, &pPos);
159✔
761
    QUERY_CHECK_CODE(code, lino, _end);
159!
762
  }
763

764
_end:
346✔
765
  if (code != TSDB_CODE_SUCCESS) {
6,856,770!
766
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
767
  }
768
  return code;
6,856,768✔
769
}
770

771
bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen) {
26✔
772
  SRowBuffPos** pos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen);
26✔
773
  if (pos) {
26✔
774
    return true;
12✔
775
  }
776
  return false;
14✔
777
}
778

779
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) {
2,606✔
780
  int64_t mark = (pFileState->deleteMark == INT64_MAX || pFileState->maxTs == INT64_MIN)
1,780✔
781
                     ? INT64_MIN
782
                     : pFileState->maxTs - pFileState->deleteMark;
4,386✔
783
  clearExpiredRowBuff(pFileState, mark, false);
2,606✔
784
  return pFileState->usedBuffs;
2,606✔
785
}
786

787
void flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState) {
5,509✔
788
  int32_t   code = TSDB_CODE_SUCCESS;
5,509✔
789
  int32_t   lino = 0;
5,509✔
790
  SListIter iter = {0};
5,509✔
791
  tdListInitIter(pSnapshot, &iter, TD_LIST_FORWARD);
5,509✔
792

793
  const int32_t BATCH_LIMIT = 256;
5,509✔
794

795
  int64_t    st = taosGetTimestampMs();
5,511✔
796
  SListNode* pNode = NULL;
5,511✔
797

798
  int idx = streamStateGetCfIdx(pFileState->pFileStore, pFileState->cfName);
5,511✔
799

800
  int32_t len = (pFileState->rowSize + sizeof(uint64_t) + sizeof(int32_t) + 64) * 2;
5,511✔
801
  char*   buf = taosMemoryCalloc(1, len);
5,511!
802
  if (!buf) {
5,512!
803
    code = terrno;
×
804
    QUERY_CHECK_CODE(code, lino, _end);
×
805
  }
806

807
  void* batch = streamStateCreateBatch();
5,512✔
808
  if (!batch) {
5,512!
809
    code = TSDB_CODE_OUT_OF_MEMORY;
×
810
    QUERY_CHECK_CODE(code, lino, _end);
×
811
  }
812

813
  while ((pNode = tdListNext(&iter)) != NULL && code == TSDB_CODE_SUCCESS) {
1,705,502!
814
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
1,698,741✔
815
    if (pPos->beFlushed || !pPos->pRowBuff) {
1,698,741!
816
      continue;
375,278✔
817
    }
818
    pPos->beFlushed = true;
1,323,463✔
819
    pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
1,323,463✔
820

821
    qDebug("===stream===flushed start:%" PRId64, pFileState->getTs(pPos->pKey));
1,323,090✔
822
    if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) {
1,323,091✔
823
      code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
5,103✔
824
      streamStateClearBatch(batch);
5,103✔
825
      QUERY_CHECK_CODE(code, lino, _end);
5,103!
826
    }
827

828
    void* pSKey = pFileState->stateBuffCreateStateKeyFn(pPos, ((SStreamState*)pFileState->pFileStore)->number);
1,324,472✔
829
    QUERY_CHECK_NULL(pSKey, code, lino, _end, terrno);
1,323,771!
830

831
    code = streamStatePutBatchOptimize(pFileState->pFileStore, idx, batch, pSKey, pPos->pRowBuff, pFileState->rowSize,
1,323,771✔
832
                                       0, buf);
833
    taosMemoryFreeClear(pSKey);
1,323,955!
834
    QUERY_CHECK_CODE(code, lino, _end);
1,324,712!
835
    // todo handle failure
836
    memset(buf, 0, len);
1,324,712✔
837
  }
838
  taosMemoryFreeClear(buf);
5,219!
839

840
  int32_t numOfElems = streamStateGetBatchSize(batch);
5,220✔
841
  if (numOfElems > 0) {
5,513✔
842
    code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
1,446✔
843
    QUERY_CHECK_CODE(code, lino, _end);
1,446!
844
  } else {
845
    goto _end;
4,067✔
846
  }
847

848
  streamStateClearBatch(batch);
1,446✔
849

850
  clearSearchBuff(pFileState);
1,446✔
851

852
  int64_t elapsed = taosGetTimestampMs() - st;
1,446✔
853
  qDebug("%s flush to disk in batch model completed, rows:%d, batch size:%d, elapsed time:%" PRId64 "ms",
1,446✔
854
         pFileState->id, numOfElems, BATCH_LIMIT, elapsed);
855

856
  if (flushState) {
1,446✔
857
    void*   valBuf = NULL;
947✔
858
    int32_t len = 0;
947✔
859
    code = streamFileStateEncode(&pFileState->flushMark, &valBuf, &len);
947✔
860
    QUERY_CHECK_CODE(code, lino, _end);
947!
861

862
    qDebug("===stream===flushMark write:%" PRId64, pFileState->flushMark);
947✔
863
    code = streamStatePutBatch(pFileState->pFileStore, "default", batch, STREAM_STATE_INFO_NAME, valBuf, len, 0);
947✔
864
    taosMemoryFree(valBuf);
947!
865
    QUERY_CHECK_CODE(code, lino, _end);
947!
866

867
    code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
947✔
868
    QUERY_CHECK_CODE(code, lino, _end);
947!
869
  }
870

871
_end:
499✔
872
  if (code != TSDB_CODE_SUCCESS) {
5,513!
873
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
874
  }
875
  taosMemoryFree(buf);
5,513!
876
  streamStateDestroyBatch(batch);
5,513✔
877
}
5,513✔
878

879
int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId) {
×
880
  char keyBuf[128] = {0};
×
881
  TAOS_UNUSED(tsnprintf(keyBuf, sizeof(keyBuf), "%s:%" PRId64 "", TASK_KEY, checkpointId));
×
882
  return streamDefaultDel_rocksdb(pFileState->pFileStore, keyBuf);
×
883
}
884

885
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
×
886
  int32_t code = TSDB_CODE_SUCCESS;
×
887
  int64_t maxCheckPointId = 0;
×
888
  {
889
    char    buf[128] = {0};
×
890
    void*   val = NULL;
×
891
    int32_t len = 0;
×
892
    memcpy(buf, TASK_KEY, strlen(TASK_KEY));
×
893
    code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len);
×
894
    if (code != 0 || len == 0 || val == NULL) {
×
895
      return TSDB_CODE_FAILED;
×
896
    }
897
    memcpy(buf, val, len);
×
898
    buf[len] = 0;
×
899
    maxCheckPointId = taosStr2Int64((char*)buf, NULL, 10);
×
900
    taosMemoryFree(val);
×
901
  }
902
  for (int64_t i = maxCheckPointId; i > 0; i--) {
×
903
    char    buf[128] = {0};
×
904
    void*   val = 0;
×
905
    int32_t len = 0;
×
906
    TAOS_UNUSED(tsnprintf(buf, sizeof(buf), "%s:%" PRId64 "", TASK_KEY, i));
×
907
    code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len);
×
908
    if (code != 0) {
×
909
      return TSDB_CODE_FAILED;
×
910
    }
911
    memcpy(buf, val, len);
×
912
    buf[len] = 0;
×
913
    taosMemoryFree(val);
×
914

915
    TSKEY ts;
916
    ts = taosStr2Int64((char*)buf, NULL, 10);
×
917
    if (ts < mark) {
×
918
      // statekey winkey.ts < mark
919
      int32_t tmpRes = forceRemoveCheckpoint(pFileState, i);
×
920
      qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
×
921
      break;
×
922
    }
923
  }
924
  return code;
×
925
}
926

927
int32_t recoverSession(SStreamFileState* pFileState, int64_t ckId) {
1,052✔
928
  int32_t code = TSDB_CODE_SUCCESS;
1,052✔
929
  int32_t lino = 0;
1,052✔
930
  int32_t winRes = TSDB_CODE_SUCCESS;
1,052✔
931
  if (pFileState->maxTs != INT64_MIN) {
1,052!
932
    int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs)
×
933
                       ? INT64_MIN
934
                       : pFileState->maxTs - pFileState->deleteMark;
×
935
    int32_t tmpRes = deleteExpiredCheckPoint(pFileState, mark);
×
936
    qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
×
937
  }
938

939
  SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileState->pFileStore, INT64_MAX);
1,052✔
940
  int32_t          recoverNum = TMIN(MIN_NUM_OF_RECOVER_ROW_BUFF, pFileState->maxRowCount);
1,054✔
941
  while (winRes == TSDB_CODE_SUCCESS) {
1,054!
942
    if (pFileState->curRowCount >= recoverNum) {
1,054!
943
      break;
1,054✔
944
    }
945

946
    void*       pVal = NULL;
1,054✔
947
    int32_t     vlen = 0;
1,054✔
948
    SSessionKey key = {0};
1,054✔
949
    winRes = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, &key, &pVal, &vlen);
1,054✔
950
    if (winRes != TSDB_CODE_SUCCESS) {
1,054!
951
      break;
1,054✔
952
    }
953

954
    if (vlen != pFileState->rowSize) {
×
955
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
956
      QUERY_CHECK_CODE(code, lino, _end);
×
957
    }
958

959
    SRowBuffPos* pPos = createSessionWinBuff(pFileState, &key, pVal, &vlen);
×
960
    pPos->beUsed = false;
×
961
    winRes = putSessionWinResultBuff(pFileState, pPos);
×
962
    if (winRes != TSDB_CODE_SUCCESS) {
×
963
      break;
×
964
    }
965

966
    winRes = streamStateSessionCurPrev_rocksdb(pCur);
×
967
  }
968

969
_end:
×
970
  if (code != TSDB_CODE_SUCCESS) {
1,054!
971
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
972
  }
973
  streamStateFreeCur(pCur);
1,054✔
974
  return code;
1,054✔
975
}
976

977
int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
3,357✔
978
  int32_t code = TSDB_CODE_SUCCESS;
3,357✔
979
  int32_t lino = 0;
3,357✔
980
  int32_t winCode = TSDB_CODE_SUCCESS;
3,357✔
981
  if (pFileState->maxTs != INT64_MIN) {
3,357!
982
    int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs)
×
983
                       ? INT64_MIN
984
                       : pFileState->maxTs - pFileState->deleteMark;
×
985
    int32_t tmpRes = deleteExpiredCheckPoint(pFileState, mark);
×
986
    qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
×
987
  }
988

989
  SStreamStateCur* pCur = streamStateSeekToLast_rocksdb(pFileState->pFileStore);
3,357✔
990
  int32_t          recoverNum = TMIN(MIN_NUM_OF_RECOVER_ROW_BUFF, pFileState->maxRowCount);
3,358✔
991
  while (winCode == TSDB_CODE_SUCCESS) {
3,358!
992
    if (pFileState->curRowCount >= recoverNum) {
3,358!
993
      break;
3,358✔
994
    }
995

996
    void*        pVal = NULL;
3,358✔
997
    int32_t      vlen = 0;
3,358✔
998
    SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
3,358✔
999
    if (!pNewPos || !pNewPos->pRowBuff) {
3,358!
1000
      code = TSDB_CODE_OUT_OF_MEMORY;
×
1001
      QUERY_CHECK_CODE(code, lino, _end);
×
1002
    }
1003

1004
    winCode = streamStateGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, pNewPos->pKey, (const void**)&pVal, &vlen);
3,358✔
1005
    qDebug("===stream=== get state by cur winres:%d. %s", winCode, __func__);
3,357✔
1006
    if (winCode != TSDB_CODE_SUCCESS || pFileState->getTs(pNewPos->pKey) < pFileState->flushMark) {
3,357!
1007
      destroyRowBuffPos(pNewPos);
3,357✔
1008
      SListNode* pNode = tdListPopTail(pFileState->usedBuffs);
3,358✔
1009
      taosMemoryFreeClear(pNode);
3,358!
1010
      taosMemoryFreeClear(pVal);
3,358!
1011
      break;
3,358✔
1012
    }
1013
    if (vlen != pFileState->rowSize) {
×
1014
      qError("row size mismatch, expect:%d, actual:%d", pFileState->rowSize, vlen);
×
1015
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1016
      taosMemoryFreeClear(pVal);
×
1017
      QUERY_CHECK_CODE(code, lino, _end);
×
1018
    }
1019
    memcpy(pNewPos->pRowBuff, pVal, vlen);
×
1020
    taosMemoryFreeClear(pVal);
×
1021
    pNewPos->beFlushed = true;
×
1022
    pNewPos->beUsed = false;
×
1023
    qDebug("===stream=== read checkpoint state from disc. %s", __func__);
×
1024
    code = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
×
1025
    if (code != TSDB_CODE_SUCCESS) {
×
1026
      destroyRowBuffPos(pNewPos);
×
1027
      break;
×
1028
    }
1029
    streamStateCurPrev_rocksdb(pCur);
×
1030
  }
1031

1032
_end:
×
1033
  if (code != TSDB_CODE_SUCCESS) {
3,358!
1034
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1035
  }
1036
  streamStateFreeCur(pCur);
3,358✔
1037
  return code;
3,358✔
1038
}
1039

1040
int32_t streamFileStateGetSelectRowSize(SStreamFileState* pFileState) { return pFileState->selectivityRowSize; }
177,390✔
1041

1042
void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts) {
1,152✔
1043
  pFileState->flushMark = TMAX(pFileState->flushMark, ts);
1,152✔
1044
  pFileState->maxTs = TMAX(pFileState->maxTs, ts);
1,152✔
1045
}
1,152✔
1046

1047
void* getRowStateBuff(SStreamFileState* pFileState) { return pFileState->rowStateBuff; }
27,288✔
1048
void* getSearchBuff(SStreamFileState* pFileState) { return pFileState->searchBuff; }
7,521,144✔
1049

1050
void* getStateFileStore(SStreamFileState* pFileState) { return pFileState->pFileStore; }
14,578✔
1051

1052
bool isDeteled(SStreamFileState* pFileState, TSKEY ts) {
6,969,259✔
1053
  return pFileState->deleteMark != INT64_MAX && pFileState->maxTs > 0 &&
11,612,921!
1054
         ts < (pFileState->maxTs - pFileState->deleteMark);
4,643,662✔
1055
}
1056

1057
bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) { return ts <= (pFileState->flushMark + gap); }
4,199,730✔
1058

1059
TSKEY getFlushMark(SStreamFileState* pFileState) { return pFileState->flushMark; };
455✔
1060

1061
int32_t getRowStateRowSize(SStreamFileState* pFileState) { return pFileState->rowSize; }
177,793✔
1062

1063
int32_t getFunctionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) {
177,384✔
1064
  int32_t winCode = TSDB_CODE_SUCCESS;
177,384✔
1065
  return pFileState->stateFunctionGetFn(pFileState, pKey, keyLen, pVal, pVLen, &winCode);
177,384✔
1066
}
1067

1068
int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId) {
201✔
1069
  int32_t code = TSDB_CODE_SUCCESS;
201✔
1070
  int32_t lino = 0;
201✔
1071
  if (pFileState->maxTs != INT64_MIN) {
201!
1072
    int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs)
×
1073
                       ? INT64_MIN
1074
                       : pFileState->maxTs - pFileState->deleteMark;
×
1075
    code = deleteExpiredCheckPoint(pFileState, mark);
×
1076
    QUERY_CHECK_CODE(code, lino, _end);
×
1077
  }
1078

1079
  SStreamStateCur* pCur = streamStateFillSeekToLast_rocksdb(pFileState->pFileStore);
201✔
1080
  if (pCur == NULL) {
201✔
1081
    return code;
199✔
1082
  }
1083
  int32_t recoverNum = TMIN(MIN_NUM_OF_RECOVER_ROW_BUFF, pFileState->maxRowCount);
2✔
1084
  int32_t winRes = TSDB_CODE_SUCCESS;
2✔
1085
  while (winRes == TSDB_CODE_SUCCESS) {
4!
1086
    if (pFileState->curRowCount >= recoverNum) {
4!
1087
      break;
2✔
1088
    }
1089

1090
    void*        pVal = NULL;
4✔
1091
    int32_t      vlen = 0;
4✔
1092
    SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
4✔
1093
    winRes = streamStateFillGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &vlen);
4✔
1094
    qDebug("===stream=== get state by cur winres:%d. %s", winRes, __func__);
4!
1095
    if (winRes != TSDB_CODE_SUCCESS || isFlushedState(pFileState, pFileState->getTs(pNewPos->pKey), 0)) {
4!
1096
      destroyRowBuffPos(pNewPos);
2✔
1097
      SListNode* pNode = tdListPopTail(pFileState->usedBuffs);
2✔
1098
      taosMemoryFreeClear(pNode);
2!
1099
      taosMemoryFreeClear(pVal);
2!
1100
      break;
2✔
1101
    }
1102

1103
    if (vlen != pFileState->rowSize) {
2!
1104
      qError("row size mismatch, expect:%d, actual:%d", pFileState->rowSize, vlen);
×
1105
      destroyRowBuffPos(pNewPos);
×
1106
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1107
      taosMemoryFreeClear(pVal);
×
1108
      QUERY_CHECK_CODE(code, lino, _end);
×
1109
    }
1110

1111
    memcpy(pNewPos->pRowBuff, pVal, vlen);
2✔
1112
    taosMemoryFreeClear(pVal);
2!
1113
    pNewPos->beFlushed = true;
2✔
1114
    pNewPos->beUsed = false;
2✔
1115
    qDebug("===stream=== read checkpoint state from disc. %s", __func__);
2!
1116
    winRes = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
2✔
1117
    if (winRes != TSDB_CODE_SUCCESS) {
2!
1118
      destroyRowBuffPos(pNewPos);
×
1119
      break;
×
1120
    }
1121
    streamStateCurPrev_rocksdb(pCur);
2✔
1122
  }
1123
  streamStateFreeCur(pCur);
2✔
1124

1125
_end:
2✔
1126
  if (code != TSDB_CODE_SUCCESS) {
2!
1127
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1128
  }
1129
  return code;
2✔
1130
}
1131

1132
int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,
1,326✔
1133
                   int32_t* pWinCode) {
1134
  int32_t code = TSDB_CODE_SUCCESS;
1,326✔
1135
  int32_t lino = 0;
1,326✔
1136
  (*pWinCode) = TSDB_CODE_FAILED;
1,326✔
1137
  pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey));
1,326✔
1138
  SRowBuffPos** ppPos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen);
1,326✔
1139
  if (ppPos) {
1,326✔
1140
    *pVLen = pFileState->rowSize;
608✔
1141
    *pVal = *ppPos;
608✔
1142
    (*ppPos)->beUsed = true;
608✔
1143
    (*ppPos)->beFlushed = false;
608✔
1144
    (*pWinCode) = TSDB_CODE_SUCCESS;
608✔
1145
    if ((*ppPos)->pRowBuff == NULL) {
608!
1146
      code = recoverStateRowBuff(pFileState, *ppPos);
×
1147
      QUERY_CHECK_CODE(code, lino, _end);
×
1148
    }
1149
    goto _end;
608✔
1150
  }
1151
  TSKEY ts = pFileState->getTs(pKey);
718✔
1152
  if (!isDeteled(pFileState, ts) && isFlushedState(pFileState, ts, 0)) {
718!
1153
    int32_t len = 0;
17✔
1154
    void*   p = NULL;
17✔
1155
    (*pWinCode) = pFileState->stateFileGetFn(pFileState, pKey, &p, &len);
17✔
1156
    qDebug("===stream===get %" PRId64 " from disc, res %d", ts, (*pWinCode));
17!
1157
    if ((*pWinCode) == TSDB_CODE_SUCCESS) {
17!
1158
      SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
17✔
1159
      if (!pNewPos || !pNewPos->pRowBuff) {
17!
1160
        code = TSDB_CODE_OUT_OF_MEMORY;
×
1161
        QUERY_CHECK_CODE(code, lino, _end);
×
1162
      }
1163

1164
      memcpy(pNewPos->pKey, pKey, keyLen);
17✔
1165
      memcpy(pNewPos->pRowBuff, p, len);
17✔
1166
      code = tSimpleHashPut(pFileState->rowStateBuff, pKey, keyLen, &pNewPos, POINTER_BYTES);
17✔
1167
      QUERY_CHECK_CODE(code, lino, _end);
17!
1168

1169
      if (pVal) {
17!
1170
        *pVLen = pFileState->rowSize;
17✔
1171
        *pVal = pNewPos;
17✔
1172
      }
1173
    }
1174
    taosMemoryFree(p);
17!
1175
  }
1176

1177
_end:
700✔
1178
  if (code != TSDB_CODE_SUCCESS) {
1,325!
1179
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1180
  }
1181
  return code;
1,325✔
1182
}
1183

1184
int32_t streamFileStateGroupPut(SStreamFileState* pFileState, int64_t groupId, void* value, int32_t vLen) {
200✔
1185
  int32_t code = TSDB_CODE_SUCCESS;
200✔
1186
  int32_t lino = 0;
200✔
1187
  if (value != NULL) {
200!
1188
    code = TSDB_CODE_INVALID_PARA;
×
1189
    QUERY_CHECK_CODE(code, lino, _end);
×
1190
  }
1191

1192
  if (tSimpleHashGet(pFileState->pGroupIdMap, &groupId, sizeof(int64_t)) == NULL) {
200✔
1193
    if (tSimpleHashGetSize(pFileState->pGroupIdMap) <= MAX_GROUP_ID_NUM) {
50!
1194
      code = tSimpleHashPut(pFileState->pGroupIdMap, &groupId, sizeof(int64_t), NULL, 0);
50✔
1195
      QUERY_CHECK_CODE(code, lino, _end);
50!
1196
    }
1197
    code = streamStatePutParTag_rocksdb(pFileState->pFileStore, groupId, value, vLen);
50✔
1198
    QUERY_CHECK_CODE(code, lino, _end);
50!
1199
  }
1200

1201
_end:
200✔
1202
  if (code != TSDB_CODE_SUCCESS) {
200!
1203
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1204
  }
1205
  return code;
200✔
1206
}
1207

1208
void streamFileStateGroupCurNext(SStreamStateCur* pCur) {
886✔
1209
  SStreamFileState* pFileState = (SStreamFileState*)pCur->pStreamFileState;
886✔
1210
  if (pCur->hashIter == -1) {
886✔
1211
    streamStateCurNext(pFileState->pFileStore, pCur);
68✔
1212
    return;
68✔
1213
  }
1214

1215
  int64_t gpId = *(int64_t*)tSimpleHashGetKey(pCur->pHashData, NULL);
818!
1216
  pCur->minGpId = TMAX(pCur->minGpId, gpId);
818✔
1217

1218
  SSHashObj* pHash = pFileState->pGroupIdMap;
818✔
1219
  pCur->pHashData = tSimpleHashIterate(pHash, pCur->pHashData, &pCur->hashIter);
818✔
1220
  if (!pCur->pHashData) {
818✔
1221
    pCur->hashIter = -1;
539✔
1222
    streamStateParTagSeekKeyNext_rocksdb(pFileState->pFileStore, pCur->minGpId, pCur);
539✔
1223
    return;
539✔
1224
  }
1225
}
1226

1227
int32_t streamFileStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void** pVal, int32_t* pVLen) {
2,555✔
1228
  int32_t code = TSDB_CODE_SUCCESS;
2,555✔
1229
  if (pCur->pHashData) {
2,555✔
1230
    *pKey = *(int64_t*)tSimpleHashGetKey(pCur->pHashData, NULL);
818!
1231
    return code;
818✔
1232
  }
1233
  return streamStateParTagGetKVByCur_rocksdb(pCur, pKey, NULL, NULL);
1,737✔
1234
}
1235

1236
SSHashObj* getGroupIdCache(SStreamFileState* pFileState) {
1,670✔
1237
  return pFileState->pGroupIdMap;
1,670✔
1238
}
1239

1240
void clearExpiredState(SStreamFileState* pFileState) {
3,261✔
1241
  int32_t    code = TSDB_CODE_SUCCESS;
3,261✔
1242
  int32_t    lino = 0;
3,261✔
1243
  SSHashObj* pSearchBuff = pFileState->searchBuff;
3,261✔
1244
  void*      pIte = NULL;
3,261✔
1245
  int32_t    iter = 0;
3,261✔
1246
  while ((pIte = tSimpleHashIterate(pSearchBuff, pIte, &iter)) != NULL) {
4,904✔
1247
    SArray* pWinStates = *((void**)pIte);
1,643✔
1248
    int32_t size = taosArrayGetSize(pWinStates);
1,643✔
1249
    for (int32_t i = 0; i < size - 1; i++) {
1,936✔
1250
      SWinKey* pKey = taosArrayGet(pWinStates, i);
293✔
1251
      int32_t  code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, sizeof(SWinKey));
293✔
1252
      qTrace("clear expired buff, ts:%" PRId64 ". %s at line %d res:%d", pKey->ts, __func__, __LINE__, code_buff);
293!
1253

1254
      if (isFlushedState(pFileState, pKey->ts, 0)) {
293✔
1255
        int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey);
219✔
1256
        qTrace("clear expired file, ts:%" PRId64 ". %s at line %d res:%d", pKey->ts, __func__, __LINE__, code_file);
219!
1257
      }
1258
    }
1259
    taosArrayRemoveBatch(pWinStates, 0, size - 1, NULL);
1,643✔
1260
  }
1261
  code = clearRowBuff(pFileState);
3,262✔
1262
  QUERY_CHECK_CODE(code, lino, _end);
3,262!
1263

1264
_end:
3,262✔
1265
  if (code != TSDB_CODE_SUCCESS) {
3,262!
1266
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1267
  }
1268
}
3,262✔
1269

1270
#ifdef BUILD_NO_CALL
1271
int32_t getStateSearchRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen,
1272
                           int32_t* pWinCode) {
1273
  int32_t code = TSDB_CODE_SUCCESS;
1274
  int32_t lino = 0;
1275

1276
  code = addRowBuffIfNotExist(pFileState, (void*)pKey, sizeof(SWinKey), pVal, pVLen, pWinCode);
1277
  QUERY_CHECK_CODE(code, lino, _end);
1278

1279
  SArray*    pWinStates = NULL;
1280
  SSHashObj* pSearchBuff = getSearchBuff(pFileState);
1281
  void**     ppBuff = tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
1282
  if (ppBuff) {
1283
    pWinStates = (SArray*)(*ppBuff);
1284
  } else {
1285
    pWinStates = taosArrayInit(16, sizeof(SWinKey));
1286
    QUERY_CHECK_NULL(pWinStates, code, lino, _end, terrno);
1287

1288
    code = tSimpleHashPut(pSearchBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
1289
    QUERY_CHECK_CODE(code, lino, _end);
1290
  }
1291

1292
  // recover
1293
  if (taosArrayGetSize(pWinStates) == 0 && needClearDiskBuff(pFileState)) {
1294
    TSKEY            ts = getFlushMark(pFileState);
1295
    SWinKey          start = {.groupId = pKey->groupId, .ts = INT64_MAX};
1296
    void*            pState = getStateFileStore(pFileState);
1297
    SStreamStateCur* pCur = streamStateSeekKeyPrev_rocksdb(pState, &start);
1298
    for (int32_t i = 0; i < NUM_OF_CACHE_WIN; i++) {
1299
      SWinKey tmpKey = {.groupId = pKey->groupId};
1300
      int32_t tmpRes = streamStateGetGroupKVByCur_rocksdb(pState, pCur, &tmpKey, NULL, 0);
1301
      if (tmpRes != TSDB_CODE_SUCCESS) {
1302
        break;
1303
      }
1304
      void* tmp = taosArrayPush(pWinStates, &tmpKey);
1305
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1306
      streamStateCurPrev_rocksdb(pCur);
1307
    }
1308
    taosArraySort(pWinStates, winKeyCmprImpl);
1309
    streamStateFreeCur(pCur);
1310
  }
1311

1312
  int32_t size = taosArrayGetSize(pWinStates);
1313
  int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
1314
  if (!isFlushedState(pFileState, pKey->ts, 0)|| index >= 0) {
1315
    // find the first position which is smaller than the pKey
1316
    if (index >= 0) {
1317
      SWinKey* pTmpKey = taosArrayGet(pWinStates, index);
1318
      if (winKeyCmprImpl(pTmpKey, pKey) == 0) {
1319
        goto _end;
1320
      }
1321
    }
1322
    index++;
1323
    void* tmp = taosArrayInsert(pWinStates, index, pKey);
1324
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1325
  }
1326

1327
  if (size >= MAX_NUM_OF_CACHE_WIN) {
1328
    int32_t num = size - NUM_OF_CACHE_WIN;
1329
    taosArrayRemoveBatch(pWinStates, 0, num, NULL);
1330
  }
1331

1332
_end:
1333
  if (code != TSDB_CODE_SUCCESS) {
1334
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1335
  }
1336
  return code;
1337
}
1338
#endif
1339

1340
int32_t getRowStatePrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** ppVal,
105✔
1341
                           int32_t* pVLen, int32_t* pWinCode) {
1342
  int32_t    code = TSDB_CODE_SUCCESS;
105✔
1343
  int32_t    lino = 0;
105✔
1344
  SArray*    pWinStates = NULL;
105✔
1345
  SSHashObj* pSearchBuff = getSearchBuff(pFileState);
105✔
1346
  void*      pState = getStateFileStore(pFileState);
105✔
1347
  void**     ppBuff = (void**) tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
105✔
1348
  if (ppBuff) {
105!
1349
    pWinStates = (SArray*)(*ppBuff);
105✔
1350
  } else {
1351
    qDebug("===stream=== search buff is empty.group id:%" PRId64, pKey->groupId);
×
1352
    SStreamStateCur* pCur = streamStateSeekKeyPrev_rocksdb(pState, pKey);
×
1353
    void*            tmpVal = NULL;
×
1354
    int32_t          len = 0;
×
1355
    (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pState, pCur, pResKey, (const void**)&tmpVal, &len);
×
1356
    if ((*pWinCode) == TSDB_CODE_SUCCESS) {
×
1357
      SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
×
1358
      if (!pNewPos || !pNewPos->pRowBuff) {
×
1359
        code = TSDB_CODE_OUT_OF_MEMORY;
×
1360
        QUERY_CHECK_CODE(code, lino, _end);
×
1361
      }
1362
      memcpy(pNewPos->pRowBuff, tmpVal, len);
×
1363
      taosMemoryFreeClear(tmpVal);
×
1364
      *pVLen = getRowStateRowSize(pFileState);
×
1365
      (*ppVal) = pNewPos;
×
1366
    }
1367
    streamStateFreeCur(pCur);
×
1368
    return code;
×
1369
  }
1370
  int32_t size = taosArrayGetSize(pWinStates);
105✔
1371
  int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
105✔
1372
  if (index >= 0) {
105!
1373
    SWinKey* pCurKey = taosArrayGet(pWinStates, index);
105✔
1374
    if (winKeyCmprImpl(pCurKey, pKey) == 0) {
105!
1375
      index--;
105✔
1376
    } else {
1377
      qDebug("%s failed at line %d since do not find cur SWinKey. trigger may be force window close", __func__, __LINE__);
×
1378
    }
1379
  }
1380
  if (index == -1) {
105✔
1381
    SStreamStateCur* pCur = streamStateSeekKeyPrev_rocksdb(pState, pKey);
28✔
1382
    void*            tmpVal = NULL;
28✔
1383
    int32_t          len = 0;
28✔
1384
    (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pState, pCur, pResKey, (const void**)&tmpVal, &len);
28✔
1385
    if ((*pWinCode) == TSDB_CODE_SUCCESS) {
28!
1386
      SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
×
1387
      if (!pNewPos || !pNewPos->pRowBuff) {
×
1388
        code = TSDB_CODE_OUT_OF_MEMORY;
×
1389
        QUERY_CHECK_CODE(code, lino, _end);
×
1390
      }
1391
      memcpy(pNewPos->pRowBuff, tmpVal, len);
×
1392
      taosMemoryFreeClear(tmpVal);
×
1393
      *pVLen = getRowStateRowSize(pFileState);
×
1394
      (*ppVal) = pNewPos;
×
1395
    }
1396
    streamStateFreeCur(pCur);
28✔
1397
    return code;
28✔
1398
  } else {
1399
    SWinKey* pPrevKey = taosArrayGet(pWinStates, index);
77✔
1400
    *pResKey = *pPrevKey;
77✔
1401
    return addRowBuffIfNotExist(pFileState, (void*)pPrevKey, sizeof(SWinKey), ppVal, pVLen, pWinCode);
77✔
1402
  }
1403
  (*pWinCode) = TSDB_CODE_FAILED;
1404

1405
_end:
×
1406
  if (code != TSDB_CODE_SUCCESS) {
×
1407
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1408
  }
1409
  return code;
×
1410
}
1411

1412
int32_t addSearchItem(SStreamFileState* pFileState, SArray* pWinStates, const SWinKey* pKey) {
2,001✔
1413
  int32_t code = TSDB_CODE_SUCCESS;
2,001✔
1414
  int32_t lino = 0;
2,001✔
1415
  int32_t size = taosArrayGetSize(pWinStates);
2,001✔
1416
  int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
2,001✔
1417
  if (!isFlushedState(pFileState, pKey->ts, 0) || index >= 0 || size == 0) {
2,001!
1418
    if (index >= 0) {
2,001✔
1419
      SWinKey* pTmpKey = taosArrayGet(pWinStates, index);
1,810✔
1420
      if (winKeyCmprImpl(pTmpKey, pKey) == 0) {
1,810✔
1421
        goto _end;
1,319✔
1422
      }
1423
    }
1424
    index++;
682✔
1425
    void* tmp = taosArrayInsert(pWinStates, index, pKey);
682✔
1426
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
682!
1427
  }
1428

1429
  if (size >= MAX_NUM_OF_CACHE_WIN) {
682!
1430
    int32_t num = size - NUM_OF_CACHE_WIN;
×
1431
    taosArrayRemoveBatch(pWinStates, 0, num, NULL);
×
1432
  }
1433
_end:
682✔
1434
  if (code != TSDB_CODE_SUCCESS) {
2,001!
1435
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1436
  }
1437
  return code;
2,001✔
1438
}
1439

1440
int32_t addArrayBuffIfNotExist(SSHashObj* pSearchBuff, uint64_t groupId, SArray** ppResStates) {
3,075✔
1441
  int32_t code = TSDB_CODE_SUCCESS;
3,075✔
1442
  int32_t lino = 0; 
3,075✔
1443
  SArray*    pWinStates = NULL;
3,075✔
1444
  void**     ppBuff = tSimpleHashGet(pSearchBuff, &groupId, sizeof(uint64_t));
3,075✔
1445
  if (ppBuff) {
3,075✔
1446
    pWinStates = (SArray*)(*ppBuff);
2,896✔
1447
  } else {
1448
    pWinStates = taosArrayInit(16, sizeof(SWinKey));
179✔
1449
    QUERY_CHECK_NULL(pWinStates, code, lino, _end, terrno);
179!
1450

1451
    code = tSimpleHashPut(pSearchBuff, &groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
179✔
1452
    QUERY_CHECK_CODE(code, lino, _end);
179!
1453
  }
1454

1455
  (*ppResStates) = pWinStates;
3,075✔
1456

1457
_end:
3,075✔
1458
  if (code != TSDB_CODE_SUCCESS) {
3,075!
1459
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1460
  }
1461
  return code;
3,075✔
1462
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc