• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3720

26 Mar 2025 06:20AM UTC coverage: 30.242% (-31.7%) from 61.936%
#3720

push

travis-ci

web-flow
feat(taosBenchmark): supports decimal data type (#30456)

* feat: taosBenchmark supports decimal data type

* build: decimal script not use pytest.sh

* fix: fix typo for decimal script

* test: insertBasic.py debug

71234 of 313946 branches covered (22.69%)

Branch coverage included in aggregate %.

38 of 423 new or added lines in 8 files covered. (8.98%)

120240 existing lines in 447 files now uncovered.

118188 of 312400 relevant lines covered (37.83%)

1450220.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/libs/stream/src/tstreamFileState.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tstreamFileState.h"
17

18
#include "query.h"
19
#include "streamBackendRocksdb.h"
20
#include "taos.h"
21
#include "tcommon.h"
22
#include "tcompare.h"
23
#include "thash.h"
24
#include "tsimplehash.h"
25

26
#define FLUSH_RATIO                    0.5
27
#define FLUSH_NUM                      4
28
#define DEFAULT_MAX_STREAM_BUFFER_SIZE (128 * 1024 * 1024)
29
#define MIN_NUM_OF_ROW_BUFF            10240
30
#define MIN_NUM_OF_RECOVER_ROW_BUFF    128
31
#define MIN_NUM_SEARCH_BUCKET          128
32
#define MAX_ARRAY_SIZE                 1024
33
#define MAX_GROUP_ID_NUM               200000
34
#define NUM_OF_CACHE_WIN               64
35
#define MAX_NUM_OF_CACHE_WIN           128
36
#define MIN_NUM_OF_SORT_CACHE_WIN      40960
37
#define BATCH_LIMIT                    256
38

39
#define DEFAULT_STATE_MAP_CAPACITY 10240
40
#define MAX_STATE_MAP_SIZE         10240000
41

42
#define SET_TSDATA_FLAG(ptr, len)   ((*(char*)POINTER_SHIFT(ptr, (len - 1))) |= 1)
43
#define UNSET_TSDATA_FLAG(ptr, len) ((*(char*)POINTER_SHIFT(ptr, (len - 1))) &= 0)
44
#define HAS_TSDATA_FLAG(ptr, len)   ((*(char*)POINTER_SHIFT(ptr, (len - 1))) & 1)
45

46
#define TASK_KEY               "streamFileState"
47
#define STREAM_STATE_INFO_NAME "StreamStateCheckPoint"
48

49
struct SStreamFileState {
50
  SList*     usedBuffs;
51
  SList*     freeBuffs;
52
  void*      rowStateBuff;
53
  void*      pFileStore;
54
  int32_t    rowSize;
55
  int32_t    selectivityRowSize;
56
  int32_t    keyLen;
57
  uint64_t   preCheckPointVersion;
58
  uint64_t   checkPointVersion;
59
  TSKEY      maxTs;
60
  TSKEY      deleteMark;
61
  TSKEY      flushMark;
62
  uint64_t   maxRowCount;
63
  uint64_t   curRowCount;
64
  GetTsFun   getTs;
65
  char*      id;
66
  char*      cfName;
67
  void*      searchBuff;
68
  SSHashObj* pGroupIdMap;
69
  bool       hasFillCatch;
70
  SSHashObj* pRecFlagMap;
71

72
  _state_buff_cleanup_fn         stateBuffCleanupFn;
73
  _state_buff_remove_fn          stateBuffRemoveFn;
74
  _state_buff_remove_by_pos_fn   stateBuffRemoveByPosFn;
75
  _state_buff_create_statekey_fn stateBuffCreateStateKeyFn;
76

77
  _state_file_remove_fn stateFileRemoveFn;
78
  _state_file_get_fn    stateFileGetFn;
79

80
  _state_fun_get_fn stateFunctionGetFn;
81
};
82

83
typedef SRowBuffPos SRowBuffInfo;
84

UNCOV
85
int fillStateKeyCompare(const void* pWin1, const void* pDatas, int pos) {
×
UNCOV
86
  SWinKey* pWin2 = taosArrayGet(pDatas, pos);
×
UNCOV
87
  return winKeyCmprImpl((SWinKey*)pWin1, pWin2);
×
88
}
89

UNCOV
90
int fillTSKeyCompare(const void* pKey1, const void* pDatas, int pos) {
×
UNCOV
91
  SWinKey* pWin1 = (SWinKey*)pKey1;
×
UNCOV
92
  SWinKey* pWin2 = taosArrayGet(pDatas, pos);
×
UNCOV
93
  if (pWin1->ts > pWin2->ts) {
×
UNCOV
94
    return 1;
×
UNCOV
95
  } else if (pWin1->ts < pWin2->ts) {
×
UNCOV
96
    return -1;
×
97
  }
98

UNCOV
99
  return 0;
×
100
}
101

UNCOV
102
int32_t stateHashBuffRemoveFn(void* pBuff, const void* pKey, size_t keyLen) {
×
UNCOV
103
  SRowBuffPos** pos = tSimpleHashGet(pBuff, pKey, keyLen);
×
UNCOV
104
  if (pos) {
×
UNCOV
105
    (*pos)->beFlushed = true;
×
UNCOV
106
    (*pos)->invalid = true;
×
107
  }
UNCOV
108
  return tSimpleHashRemove(pBuff, pKey, keyLen);
×
109
}
110

UNCOV
111
void stateHashBuffRemoveByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) {
×
UNCOV
112
  size_t        keyLen = pFileState->keyLen;
×
UNCOV
113
  SRowBuffPos** ppPos = tSimpleHashGet(pFileState->rowStateBuff, pPos->pKey, keyLen);
×
UNCOV
114
  if (ppPos) {
×
UNCOV
115
    if ((*ppPos) == pPos) {
×
UNCOV
116
      int32_t tmpRes = tSimpleHashRemove(pFileState->rowStateBuff, pPos->pKey, keyLen);
×
UNCOV
117
      qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
×
118
    }
119
  }
UNCOV
120
}
×
121

122
void stateHashBuffClearFn(void* pBuff) { tSimpleHashClear(pBuff); }
×
123

UNCOV
124
void stateHashBuffCleanupFn(void* pBuff) { tSimpleHashCleanup(pBuff); }
×
125

UNCOV
126
int32_t intervalFileRemoveFn(SStreamFileState* pFileState, const void* pKey) {
×
UNCOV
127
  return streamStateDel_rocksdb(pFileState->pFileStore, pKey);
×
128
}
129

UNCOV
130
int32_t intervalFileGetFn(SStreamFileState* pFileState, void* pKey, void** data, int32_t* pDataLen) {
×
UNCOV
131
  return streamStateGet_rocksdb(pFileState->pFileStore, pKey, data, pDataLen);
×
132
}
133

UNCOV
134
void* intervalCreateStateKey(SRowBuffPos* pPos, int64_t num) {
×
UNCOV
135
  SStateKey* pStateKey = taosMemoryCalloc(1, sizeof(SStateKey));
×
UNCOV
136
  if (pStateKey == NULL) {
×
137
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
138
    return NULL;
×
139
  }
UNCOV
140
  SWinKey* pWinKey = pPos->pKey;
×
UNCOV
141
  pStateKey->key = *pWinKey;
×
UNCOV
142
  pStateKey->opNum = num;
×
UNCOV
143
  return pStateKey;
×
144
}
145

UNCOV
146
void* defaultCreateStateKey(SRowBuffPos* pPos, int64_t num) {
×
UNCOV
147
  SWinKey* pStateKey = taosMemoryCalloc(1, sizeof(SWinKey));
×
UNCOV
148
  if (pStateKey == NULL) {
×
149
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
150
    return NULL;
×
151
  }
UNCOV
152
  SWinKey* pWinKey = pPos->pKey;
×
UNCOV
153
  *pStateKey = *pWinKey;
×
UNCOV
154
  return pStateKey;
×
155
}
156

UNCOV
157
int32_t sessionFileRemoveFn(SStreamFileState* pFileState, const void* pKey) {
×
UNCOV
158
  return streamStateSessionDel_rocksdb(pFileState->pFileStore, pKey);
×
159
}
160

UNCOV
161
int32_t sessionFileGetFn(SStreamFileState* pFileState, void* pKey, void** data, int32_t* pDataLen) {
×
UNCOV
162
  return streamStateSessionGet_rocksdb(pFileState->pFileStore, pKey, data, pDataLen);
×
163
}
164

UNCOV
165
void* sessionCreateStateKey(SRowBuffPos* pPos, int64_t num) {
×
UNCOV
166
  SStateSessionKey* pStateKey = taosMemoryCalloc(1, sizeof(SStateSessionKey));
×
UNCOV
167
  if (pStateKey == NULL) {
×
168
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
169
    return NULL;
×
170
  }
UNCOV
171
  SSessionKey* pWinKey = pPos->pKey;
×
UNCOV
172
  pStateKey->key = *pWinKey;
×
UNCOV
173
  pStateKey->opNum = num;
×
UNCOV
174
  return pStateKey;
×
175
}
176

UNCOV
177
static void streamFileStateDecode(TSKEY* pKey, void* pBuff, int32_t len) { pBuff = taosDecodeFixedI64(pBuff, pKey); }
×
178

UNCOV
179
static int32_t streamFileStateEncode(TSKEY* pKey, void** pVal, int32_t* pLen) {
×
UNCOV
180
  *pLen = sizeof(TSKEY);
×
UNCOV
181
  (*pVal) = taosMemoryCalloc(1, *pLen);
×
UNCOV
182
  if ((*pVal) == NULL) {
×
183
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
184
    return terrno;
×
185
  }
UNCOV
186
  void*   buff = *pVal;
×
UNCOV
187
  int32_t tmp = taosEncodeFixedI64(&buff, *pKey);
×
UNCOV
188
  return TSDB_CODE_SUCCESS;
×
189
}
190

UNCOV
191
int32_t streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, GetTsFun fp,
×
192
                            void* pFile, TSKEY delMark, const char* taskId, int64_t checkpointId, int8_t type,
193
                            SStreamFileState** ppFileState) {
UNCOV
194
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
195
  int32_t lino = 0;
×
UNCOV
196
  if (memSize <= 0) {
×
197
    memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE;
×
198
  }
UNCOV
199
  if (rowSize == 0) {
×
200
    code = TSDB_CODE_INVALID_PARA;
×
201
    QUERY_CHECK_CODE(code, lino, _end);
×
202
  }
203

UNCOV
204
  SStreamFileState* pFileState = taosMemoryCalloc(1, sizeof(SStreamFileState));
×
UNCOV
205
  QUERY_CHECK_NULL(pFileState, code, lino, _end, terrno);
×
206

UNCOV
207
  rowSize += selectRowSize;
×
UNCOV
208
  pFileState->maxRowCount = TMAX((uint64_t)memSize / rowSize, FLUSH_NUM * 2);
×
UNCOV
209
  pFileState->usedBuffs = tdListNew(POINTER_BYTES);
×
UNCOV
210
  QUERY_CHECK_NULL(pFileState->usedBuffs, code, lino, _end, terrno);
×
211

UNCOV
212
  pFileState->freeBuffs = tdListNew(POINTER_BYTES);
×
UNCOV
213
  QUERY_CHECK_NULL(pFileState->freeBuffs, code, lino, _end, terrno);
×
214

UNCOV
215
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
×
UNCOV
216
  int32_t    cap = TMIN(MIN_NUM_OF_ROW_BUFF, pFileState->maxRowCount);
×
UNCOV
217
  if (type == STREAM_STATE_BUFF_HASH || type == STREAM_STATE_BUFF_HASH_SEARCH) {
×
UNCOV
218
    pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
×
UNCOV
219
    pFileState->stateBuffCleanupFn = stateHashBuffCleanupFn;
×
UNCOV
220
    pFileState->stateBuffRemoveFn = stateHashBuffRemoveFn;
×
UNCOV
221
    pFileState->stateBuffRemoveByPosFn = stateHashBuffRemoveByPosFn;
×
UNCOV
222
    pFileState->stateBuffCreateStateKeyFn = intervalCreateStateKey;
×
223

UNCOV
224
    pFileState->stateFileRemoveFn = intervalFileRemoveFn;
×
UNCOV
225
    pFileState->stateFileGetFn = intervalFileGetFn;
×
UNCOV
226
    pFileState->cfName = taosStrdup("state");
×
UNCOV
227
    pFileState->stateFunctionGetFn = addRowBuffIfNotExist;
×
UNCOV
228
  } else if (type == STREAM_STATE_BUFF_SORT) {
×
UNCOV
229
    pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
×
UNCOV
230
    pFileState->stateBuffCleanupFn = sessionWinStateCleanup;
×
UNCOV
231
    pFileState->stateBuffRemoveFn = deleteSessionWinStateBuffFn;
×
UNCOV
232
    pFileState->stateBuffRemoveByPosFn = deleteSessionWinStateBuffByPosFn;
×
UNCOV
233
    pFileState->stateBuffCreateStateKeyFn = sessionCreateStateKey;
×
234

UNCOV
235
    pFileState->stateFileRemoveFn = sessionFileRemoveFn;
×
UNCOV
236
    pFileState->stateFileGetFn = sessionFileGetFn;
×
UNCOV
237
    pFileState->cfName = taosStrdup("sess");
×
UNCOV
238
    pFileState->stateFunctionGetFn = getSessionRowBuff;
×
UNCOV
239
  } else if (type == STREAM_STATE_BUFF_HASH_SORT) {
×
UNCOV
240
    pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
×
UNCOV
241
    pFileState->searchBuff = tSimpleHashInit(MIN_NUM_SEARCH_BUCKET, hashFn);
×
UNCOV
242
    QUERY_CHECK_NULL(pFileState->searchBuff, code, lino, _end, terrno);
×
UNCOV
243
    pFileState->stateBuffCleanupFn = stateHashBuffCleanupFn;
×
UNCOV
244
    pFileState->stateBuffRemoveFn = stateHashBuffRemoveFn;
×
UNCOV
245
    pFileState->stateBuffRemoveByPosFn = stateHashBuffRemoveByPosFn;
×
UNCOV
246
    pFileState->stateBuffCreateStateKeyFn = defaultCreateStateKey;
×
247

UNCOV
248
    pFileState->stateFileRemoveFn = hashSortFileRemoveFn;
×
UNCOV
249
    pFileState->stateFileGetFn = hashSortFileGetFn;
×
UNCOV
250
    pFileState->cfName = taosStrdup("fill");
×
UNCOV
251
    pFileState->stateFunctionGetFn = NULL;
×
252
  }
253

UNCOV
254
  QUERY_CHECK_NULL(pFileState->usedBuffs, code, lino, _end, terrno);
×
UNCOV
255
  QUERY_CHECK_NULL(pFileState->freeBuffs, code, lino, _end, terrno);
×
UNCOV
256
  QUERY_CHECK_NULL(pFileState->rowStateBuff, code, lino, _end, terrno);
×
UNCOV
257
  QUERY_CHECK_NULL(pFileState->cfName, code, lino, _end, terrno);
×
258

UNCOV
259
  if (type == STREAM_STATE_BUFF_HASH_SEARCH) {
×
UNCOV
260
    pFileState->searchBuff = tSimpleHashInit(MIN_NUM_SEARCH_BUCKET, hashFn);
×
UNCOV
261
    QUERY_CHECK_NULL(pFileState->searchBuff, code, lino, _end, terrno);
×
262
  }
263

UNCOV
264
  pFileState->keyLen = keySize;
×
UNCOV
265
  pFileState->rowSize = rowSize;
×
UNCOV
266
  pFileState->selectivityRowSize = selectRowSize;
×
UNCOV
267
  pFileState->preCheckPointVersion = 0;
×
UNCOV
268
  pFileState->checkPointVersion = 1;
×
UNCOV
269
  pFileState->pFileStore = pFile;
×
UNCOV
270
  pFileState->getTs = fp;
×
UNCOV
271
  pFileState->curRowCount = 0;
×
UNCOV
272
  pFileState->deleteMark = delMark;
×
UNCOV
273
  pFileState->flushMark = INT64_MIN;
×
UNCOV
274
  pFileState->maxTs = INT64_MIN;
×
UNCOV
275
  pFileState->id = taosStrdup(taskId);
×
UNCOV
276
  QUERY_CHECK_NULL(pFileState->id, code, lino, _end, terrno);
×
277

UNCOV
278
  pFileState->pGroupIdMap = tSimpleHashInit(1024, hashFn);
×
UNCOV
279
  QUERY_CHECK_NULL(pFileState->pGroupIdMap, code, lino, _end, terrno);
×
280

UNCOV
281
  pFileState->pRecFlagMap = tSimpleHashInit(1024, hashFn);
×
UNCOV
282
  QUERY_CHECK_NULL(pFileState->pRecFlagMap, code, lino, _end, terrno);
×
283

284

UNCOV
285
  pFileState->hasFillCatch = true;
×
286

UNCOV
287
  if (type == STREAM_STATE_BUFF_HASH || type == STREAM_STATE_BUFF_HASH_SEARCH) {
×
UNCOV
288
    code = recoverSnapshot(pFileState, checkpointId);
×
UNCOV
289
  } else if (type == STREAM_STATE_BUFF_SORT) {
×
UNCOV
290
    code = recoverSession(pFileState, checkpointId);
×
UNCOV
291
  } else if (type == STREAM_STATE_BUFF_HASH_SORT) {
×
UNCOV
292
    code = recoverFillSnapshot(pFileState, checkpointId);
×
293
  }
UNCOV
294
  QUERY_CHECK_CODE(code, lino, _end);
×
295

UNCOV
296
  void*   valBuf = NULL;
×
UNCOV
297
  int32_t len = 0;
×
UNCOV
298
  int32_t tmpRes = streamDefaultGet_rocksdb(pFileState->pFileStore, STREAM_STATE_INFO_NAME, &valBuf, &len);
×
UNCOV
299
  if (tmpRes == TSDB_CODE_SUCCESS) {
×
UNCOV
300
    QUERY_CHECK_CONDITION((len == sizeof(TSKEY)), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
UNCOV
301
    streamFileStateDecode(&pFileState->flushMark, valBuf, len);
×
UNCOV
302
    qDebug("===stream===flushMark  read:%" PRId64, pFileState->flushMark);
×
303
  }
UNCOV
304
  taosMemoryFreeClear(valBuf);
×
UNCOV
305
  (*ppFileState) = pFileState;
×
306

UNCOV
307
_end:
×
UNCOV
308
  if (code != TSDB_CODE_SUCCESS) {
×
309
    streamFileStateDestroy(pFileState);
×
310
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
311
  }
UNCOV
312
  return code;
×
313
}
314

UNCOV
315
void destroyRowBuffPos(SRowBuffPos* pPos) {
×
UNCOV
316
  taosMemoryFreeClear(pPos->pKey);
×
UNCOV
317
  taosMemoryFreeClear(pPos->pRowBuff);
×
UNCOV
318
  taosMemoryFree(pPos);
×
UNCOV
319
}
×
320

UNCOV
321
void destroyRowBuffPosPtr(void* ptr) {
×
UNCOV
322
  if (!ptr) {
×
323
    return;
×
324
  }
UNCOV
325
  SRowBuffPos* pPos = *(SRowBuffPos**)ptr;
×
UNCOV
326
  if (!pPos->beUsed) {
×
UNCOV
327
    destroyRowBuffPos(pPos);
×
328
  }
329
}
330

UNCOV
331
void destroyRowBuffAllPosPtr(void* ptr) {
×
UNCOV
332
  if (!ptr) {
×
333
    return;
×
334
  }
UNCOV
335
  SRowBuffPos* pPos = *(SRowBuffPos**)ptr;
×
UNCOV
336
  destroyRowBuffPos(pPos);
×
337
}
338

UNCOV
339
void destroyRowBuff(void* ptr) {
×
UNCOV
340
  if (!ptr) {
×
341
    return;
×
342
  }
UNCOV
343
  taosMemoryFree(*(void**)ptr);
×
344
}
345

UNCOV
346
void streamFileStateDestroy(SStreamFileState* pFileState) {
×
UNCOV
347
  if (!pFileState) {
×
UNCOV
348
    return;
×
349
  }
350

UNCOV
351
  taosMemoryFree(pFileState->id);
×
UNCOV
352
  taosMemoryFree(pFileState->cfName);
×
UNCOV
353
  tdListFreeP(pFileState->usedBuffs, destroyRowBuffAllPosPtr);
×
UNCOV
354
  tdListFreeP(pFileState->freeBuffs, destroyRowBuff);
×
UNCOV
355
  pFileState->stateBuffCleanupFn(pFileState->rowStateBuff);
×
UNCOV
356
  sessionWinStateCleanup(pFileState->searchBuff);
×
UNCOV
357
  tSimpleHashCleanup(pFileState->pGroupIdMap);
×
UNCOV
358
  tSimpleHashCleanup(pFileState->pRecFlagMap);
×
UNCOV
359
  taosMemoryFree(pFileState);
×
360
}
361

UNCOV
362
int32_t putFreeBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
×
UNCOV
363
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
364
  int32_t lino = 0;
×
UNCOV
365
  if (pPos->pRowBuff) {
×
UNCOV
366
    code = tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff));
×
UNCOV
367
    QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
368
    pPos->pRowBuff = NULL;
×
369
  }
370

UNCOV
371
_end:
×
UNCOV
372
  if (code != TSDB_CODE_SUCCESS) {
×
373
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
374
  }
UNCOV
375
  return code;
×
376
}
377

UNCOV
378
void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) {
×
UNCOV
379
  int32_t   code = TSDB_CODE_SUCCESS;
×
UNCOV
380
  int32_t   lino = 0;
×
UNCOV
381
  SListIter iter = {0};
×
UNCOV
382
  tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
×
383

UNCOV
384
  SListNode* pNode = NULL;
×
UNCOV
385
  while ((pNode = tdListNext(&iter)) != NULL) {
×
UNCOV
386
    SRowBuffPos* pPos = *(SRowBuffPos**)(pNode->data);
×
UNCOV
387
    if (all || (pFileState->getTs(pPos->pKey) < ts && !pPos->beUsed)) {
×
UNCOV
388
      code = putFreeBuff(pFileState, pPos);
×
UNCOV
389
      QUERY_CHECK_CODE(code, lino, _end);
×
390

UNCOV
391
      if (!all) {
×
UNCOV
392
        pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
×
393
      }
UNCOV
394
      destroyRowBuffPos(pPos);
×
UNCOV
395
      SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode);
×
UNCOV
396
      taosMemoryFreeClear(tmp);
×
397
    }
398
  }
399

UNCOV
400
_end:
×
UNCOV
401
  if (code != TSDB_CODE_SUCCESS) {
×
402
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
403
  }
UNCOV
404
}
×
405

UNCOV
406
int32_t clearFlushedRowBuff(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool all) {
×
UNCOV
407
  int32_t   code = TSDB_CODE_SUCCESS;
×
UNCOV
408
  int32_t   lino = 0;
×
UNCOV
409
  uint64_t  i = 0;
×
UNCOV
410
  SListIter iter = {0};
×
UNCOV
411
  tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
×
412

UNCOV
413
  SListNode* pNode = NULL;
×
UNCOV
414
  while ((pNode = tdListNext(&iter)) != NULL && i < max) {
×
UNCOV
415
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
×
UNCOV
416
    if (isFlushedState(pFileState, pFileState->getTs(pPos->pKey), 0)) {
×
UNCOV
417
      if (all || !pPos->beUsed) {
×
UNCOV
418
        if (all && !pPos->pRowBuff) {
×
419
          continue;
×
420
        }
UNCOV
421
        code = tdListAppend(pFlushList, &pPos);
×
UNCOV
422
        QUERY_CHECK_CODE(code, lino, _end);
×
423

UNCOV
424
        pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
×
UNCOV
425
        pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
×
UNCOV
426
        if (pPos->beUsed == false) {
×
UNCOV
427
          SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode);
×
UNCOV
428
          taosMemoryFreeClear(tmp);
×
429
        }
UNCOV
430
        if (pPos->pRowBuff) {
×
UNCOV
431
          i++;
×
432
        }
433
      }
434
    }
435
  }
UNCOV
436
  qDebug("clear flushed row buff. %d rows to disk. is all:%d", listNEles(pFlushList), all);
×
437

438
_end:
×
UNCOV
439
  if (code != TSDB_CODE_SUCCESS) {
×
440
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
441
  }
UNCOV
442
  return code;
×
443
}
444

UNCOV
445
void streamFileStateClear(SStreamFileState* pFileState) {
×
UNCOV
446
  pFileState->flushMark = INT64_MIN;
×
UNCOV
447
  pFileState->maxTs = INT64_MIN;
×
UNCOV
448
  tSimpleHashClear(pFileState->rowStateBuff);
×
UNCOV
449
  clearExpiredRowBuff(pFileState, 0, true);
×
UNCOV
450
}
×
451

UNCOV
452
bool needClearDiskBuff(SStreamFileState* pFileState) { return pFileState->flushMark > 0; }
×
453

UNCOV
454
void streamFileStateReleaseBuff(SStreamFileState* pFileState, SRowBuffPos* pPos, bool used) { pPos->beUsed = used; }
×
455

UNCOV
456
int32_t popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool used) {
×
UNCOV
457
  int32_t   code = TSDB_CODE_SUCCESS;
×
UNCOV
458
  int32_t   lino = 0;
×
UNCOV
459
  uint64_t  i = 0;
×
UNCOV
460
  SListIter iter = {0};
×
UNCOV
461
  tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
×
462

UNCOV
463
  SListNode* pNode = NULL;
×
UNCOV
464
  while ((pNode = tdListNext(&iter)) != NULL && i < max) {
×
UNCOV
465
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
×
UNCOV
466
    if (pPos->beUsed == used) {
×
UNCOV
467
      if (used && !pPos->pRowBuff) {
×
UNCOV
468
        continue;
×
469
      }
UNCOV
470
      code = tdListAppend(pFlushList, &pPos);
×
UNCOV
471
      QUERY_CHECK_CODE(code, lino, _end);
×
472

UNCOV
473
      pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
×
UNCOV
474
      pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
×
UNCOV
475
      if (pPos->beUsed == false) {
×
UNCOV
476
        SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode);
×
UNCOV
477
        taosMemoryFreeClear(tmp);
×
478
      }
UNCOV
479
      if (pPos->pRowBuff) {
×
UNCOV
480
        i++;
×
481
      }
482
    }
483
  }
484

UNCOV
485
  qInfo("%s stream state flush %d rows to disk. is used:%d", pFileState->id, listNEles(pFlushList), used);
×
486

487
_end:
×
UNCOV
488
  if (code != TSDB_CODE_SUCCESS) {
×
489
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
490
  }
UNCOV
491
  return code;
×
492
}
493

UNCOV
494
int32_t flushRowBuff(SStreamFileState* pFileState) {
×
UNCOV
495
  int32_t          code = TSDB_CODE_SUCCESS;
×
UNCOV
496
  int32_t          lino = 0;
×
UNCOV
497
  SStreamSnapshot* pFlushList = tdListNew(POINTER_BYTES);
×
UNCOV
498
  if (!pFlushList) {
×
499
    code = TSDB_CODE_OUT_OF_MEMORY;
×
500
    QUERY_CHECK_CODE(code, lino, _end);
×
501
  }
502

UNCOV
503
  uint64_t num = (uint64_t)(pFileState->curRowCount * FLUSH_RATIO);
×
UNCOV
504
  num = TMAX(num, FLUSH_NUM);
×
UNCOV
505
  code = clearFlushedRowBuff(pFileState, pFlushList, num, false);
×
UNCOV
506
  QUERY_CHECK_CODE(code, lino, _end);
×
507

UNCOV
508
  if (isListEmpty(pFlushList)) {
×
UNCOV
509
    code = popUsedBuffs(pFileState, pFlushList, num, false);
×
UNCOV
510
    QUERY_CHECK_CODE(code, lino, _end);
×
511

UNCOV
512
    if (isListEmpty(pFlushList)) {
×
UNCOV
513
      code = popUsedBuffs(pFileState, pFlushList, num, true);
×
UNCOV
514
      QUERY_CHECK_CODE(code, lino, _end);
×
515
    }
516
  }
517

UNCOV
518
  if (pFileState->searchBuff) {
×
519
    code = clearFlushedRowBuff(pFileState, pFlushList, pFileState->curRowCount, true);
×
520
    QUERY_CHECK_CODE(code, lino, _end);
×
521
  }
522

UNCOV
523
  flushSnapshot(pFileState, pFlushList, false);
×
524

UNCOV
525
  SListIter fIter = {0};
×
UNCOV
526
  tdListInitIter(pFlushList, &fIter, TD_LIST_FORWARD);
×
UNCOV
527
  SListNode* pNode = NULL;
×
UNCOV
528
  while ((pNode = tdListNext(&fIter)) != NULL) {
×
UNCOV
529
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
×
UNCOV
530
    code = putFreeBuff(pFileState, pPos);
×
UNCOV
531
    QUERY_CHECK_CODE(code, lino, _end);
×
532
  }
533

UNCOV
534
  tdListFreeP(pFlushList, destroyRowBuffPosPtr);
×
535

UNCOV
536
_end:
×
UNCOV
537
  if (code != TSDB_CODE_SUCCESS) {
×
538
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
539
  }
UNCOV
540
  return code;
×
541
}
542

UNCOV
543
int32_t clearRowBuff(SStreamFileState* pFileState) {
×
UNCOV
544
  if (pFileState->deleteMark != INT64_MAX) {
×
545
    clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark, false);
×
546
  }
547
  do {
UNCOV
548
    int32_t code = flushRowBuff(pFileState);
×
UNCOV
549
    if (code != TSDB_CODE_SUCCESS) {
×
550
      return code;
×
551
    }
UNCOV
552
  } while (isListEmpty(pFileState->freeBuffs) && pFileState->curRowCount == pFileState->maxRowCount);
×
UNCOV
553
  return TSDB_CODE_SUCCESS;
×
554
}
555

UNCOV
556
int32_t clearFlushedRowBuffByFlag(SStreamFileState* pFileState, uint64_t max) {
×
UNCOV
557
  int32_t   code = TSDB_CODE_SUCCESS;
×
UNCOV
558
  int32_t   lino = 0;
×
UNCOV
559
  uint64_t  i = 0;
×
UNCOV
560
  SListIter iter = {0};
×
UNCOV
561
  tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
×
562

UNCOV
563
  SListNode* pNode = NULL;
×
UNCOV
564
  while ((pNode = tdListNext(&iter)) != NULL && i < max) {
×
UNCOV
565
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
×
UNCOV
566
    if (pPos->invalid) {
×
UNCOV
567
      if (!pPos->beUsed) {
×
UNCOV
568
        SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode);
×
UNCOV
569
        taosMemoryFreeClear(tmp);
×
UNCOV
570
        if (pPos->pRowBuff) {
×
UNCOV
571
          i++;
×
572
        }
UNCOV
573
        code = putFreeBuff(pFileState, pPos);
×
UNCOV
574
        QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
575
        destroyRowBuffPos(pPos);
×
576
      }
577
    }
578
  }
579

UNCOV
580
_end:
×
UNCOV
581
  if (code != TSDB_CODE_SUCCESS) {
×
582
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
583
  }
UNCOV
584
  return code;
×
585
}
586

UNCOV
587
int32_t clearRowBuffNonFlush(SStreamFileState* pFileState) {
×
UNCOV
588
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
589
  int32_t lino = 0;
×
590

UNCOV
591
  if (pFileState->deleteMark != INT64_MAX) {
×
592
    clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark, false);
×
593
  }
594

UNCOV
595
  uint64_t num = (uint64_t)(pFileState->curRowCount * FLUSH_RATIO);
×
UNCOV
596
  num = TMAX(num, FLUSH_NUM);
×
UNCOV
597
  code = clearFlushedRowBuffByFlag(pFileState, num);
×
UNCOV
598
  QUERY_CHECK_CODE(code, lino, _end);
×
599

UNCOV
600
_end:
×
UNCOV
601
  if (code != TSDB_CODE_SUCCESS) {
×
602
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
603
  }
UNCOV
604
  return code;
×
605
}
606

UNCOV
607
void* getFreeBuff(SStreamFileState* pFileState) {
×
UNCOV
608
  SList*     lists = pFileState->freeBuffs;
×
UNCOV
609
  int32_t    buffSize = pFileState->rowSize;
×
UNCOV
610
  SListNode* pNode = tdListPopHead(lists);
×
UNCOV
611
  if (!pNode) {
×
UNCOV
612
    return NULL;
×
613
  }
UNCOV
614
  void* ptr = *(void**)pNode->data;
×
UNCOV
615
  memset(ptr, 0, buffSize);
×
UNCOV
616
  taosMemoryFree(pNode);
×
UNCOV
617
  return ptr;
×
618
}
619

620
void streamFileStateClearBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
×
621
  if (pPos->pRowBuff) {
×
622
    memset(pPos->pRowBuff, 0, pFileState->rowSize);
×
623
  }
624
}
×
625

UNCOV
626
SRowBuffPos* getNewRowPos(SStreamFileState* pFileState) {
×
UNCOV
627
  int32_t      code = TSDB_CODE_SUCCESS;
×
UNCOV
628
  int32_t      lino = 0;
×
UNCOV
629
  SRowBuffPos* pPos = taosMemoryCalloc(1, sizeof(SRowBuffPos));
×
UNCOV
630
  if (!pPos) {
×
631
    code = terrno;
×
632
    QUERY_CHECK_CODE(code, lino, _error);
×
633
  }
634

UNCOV
635
  pPos->pKey = taosMemoryCalloc(1, pFileState->keyLen);
×
UNCOV
636
  if (!pPos->pKey) {
×
637
    code = terrno;
×
638
    QUERY_CHECK_CODE(code, lino, _error);
×
639
  }
640

UNCOV
641
  void* pBuff = getFreeBuff(pFileState);
×
UNCOV
642
  if (pBuff) {
×
UNCOV
643
    pPos->pRowBuff = pBuff;
×
UNCOV
644
    goto _end;
×
645
  }
646

UNCOV
647
  if (pFileState->curRowCount < pFileState->maxRowCount) {
×
UNCOV
648
    pBuff = taosMemoryCalloc(1, pFileState->rowSize);
×
UNCOV
649
    QUERY_CHECK_NULL(pBuff, code, lino, _error, terrno);
×
UNCOV
650
    pPos->pRowBuff = pBuff;
×
UNCOV
651
    pFileState->curRowCount++;
×
UNCOV
652
    goto _end;
×
653
  }
654

UNCOV
655
  code = clearRowBuff(pFileState);
×
UNCOV
656
  QUERY_CHECK_CODE(code, lino, _error);
×
657

UNCOV
658
  pPos->pRowBuff = getFreeBuff(pFileState);
×
659

UNCOV
660
_end:
×
UNCOV
661
  code = tdListAppend(pFileState->usedBuffs, &pPos);
×
UNCOV
662
  QUERY_CHECK_CODE(code, lino, _error);
×
663

UNCOV
664
  QUERY_CHECK_CONDITION((pPos->pRowBuff != NULL), code, lino, _error, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
UNCOV
665
_error:
×
UNCOV
666
  if (code != TSDB_CODE_SUCCESS) {
×
667
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
668
    return NULL;
×
669
  }
670

UNCOV
671
  return pPos;
×
672
}
673

UNCOV
674
SRowBuffPos* getNewRowPosForWrite(SStreamFileState* pFileState) {
×
UNCOV
675
  int32_t      code = TSDB_CODE_SUCCESS;
×
UNCOV
676
  int32_t      lino = 0;
×
UNCOV
677
  SRowBuffPos* newPos = getNewRowPos(pFileState);
×
UNCOV
678
  if (!newPos) {
×
679
    code = TSDB_CODE_OUT_OF_MEMORY;
×
680
    QUERY_CHECK_CODE(code, lino, _error);
×
681
  }
UNCOV
682
  newPos->beUsed = true;
×
UNCOV
683
  newPos->beFlushed = false;
×
UNCOV
684
  newPos->needFree = false;
×
UNCOV
685
  newPos->beUpdated = true;
×
UNCOV
686
  newPos->invalid = false;
×
UNCOV
687
  return newPos;
×
688

689
_error:
×
690
  if (code != TSDB_CODE_SUCCESS) {
×
691
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
692
  }
693
  return NULL;
×
694
}
695

UNCOV
696
int32_t addRowBuffIfNotExist(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,
×
697
                             int32_t* pWinCode) {
UNCOV
698
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
699
  int32_t lino = 0;
×
UNCOV
700
  (*pWinCode) = TSDB_CODE_SUCCESS;
×
UNCOV
701
  pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey));
×
UNCOV
702
  SRowBuffPos** pos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen);
×
UNCOV
703
  if (pos) {
×
UNCOV
704
    if (pVal != NULL) {
×
UNCOV
705
      *pVLen = pFileState->rowSize;
×
UNCOV
706
      *pVal = *pos;
×
UNCOV
707
      (*pos)->beUsed = true;
×
UNCOV
708
      (*pos)->beFlushed = false;
×
709
    }
UNCOV
710
    goto _end;
×
711
  }
UNCOV
712
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
×
UNCOV
713
  if (!pNewPos || !pNewPos->pRowBuff) {
×
714
    code = TSDB_CODE_OUT_OF_MEMORY;
×
715
    QUERY_CHECK_CODE(code, lino, _end);
×
716
  }
717

UNCOV
718
  memcpy(pNewPos->pKey, pKey, keyLen);
×
UNCOV
719
  (*pWinCode) = TSDB_CODE_FAILED;
×
720

UNCOV
721
  TSKEY ts = pFileState->getTs(pKey);
×
UNCOV
722
  if (!isDeteled(pFileState, ts) && isFlushedState(pFileState, ts, 0)) {
×
UNCOV
723
    int32_t len = 0;
×
UNCOV
724
    void*   p = NULL;
×
UNCOV
725
    (*pWinCode) = pFileState->stateFileGetFn(pFileState, pKey, &p, &len);
×
UNCOV
726
    qDebug("===stream===get %" PRId64 " from disc, res %d", ts, (*pWinCode));
×
UNCOV
727
    if ((*pWinCode) == TSDB_CODE_SUCCESS) {
×
UNCOV
728
      memcpy(pNewPos->pRowBuff, p, len);
×
729
    }
UNCOV
730
    taosMemoryFree(p);
×
731
  }
732

UNCOV
733
  code = tSimpleHashPut(pFileState->rowStateBuff, pKey, keyLen, &pNewPos, POINTER_BYTES);
×
UNCOV
734
  QUERY_CHECK_CODE(code, lino, _end);
×
735

UNCOV
736
  if (pVal) {
×
UNCOV
737
    *pVLen = pFileState->rowSize;
×
UNCOV
738
    *pVal = pNewPos;
×
739
  }
740

UNCOV
741
_end:
×
UNCOV
742
  if (code != TSDB_CODE_SUCCESS) {
×
743
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
744
  }
UNCOV
745
  return code;
×
746
}
747

UNCOV
748
int32_t createRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) {
×
UNCOV
749
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
750
  int32_t lino = 0;
×
751

UNCOV
752
  pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey));
×
753

UNCOV
754
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
×
UNCOV
755
  if (!pNewPos || !pNewPos->pRowBuff) {
×
756
    code = TSDB_CODE_OUT_OF_MEMORY;
×
757
    QUERY_CHECK_CODE(code, lino, _end);
×
758
  }
759

UNCOV
760
  memcpy(pNewPos->pKey, pKey, keyLen);
×
761

UNCOV
762
  code = tSimpleHashPut(pFileState->rowStateBuff, pKey, keyLen, &pNewPos, POINTER_BYTES);
×
UNCOV
763
  QUERY_CHECK_CODE(code, lino, _end);
×
764

UNCOV
765
  if (pVal) {
×
UNCOV
766
    *pVLen = pFileState->rowSize;
×
UNCOV
767
    *pVal = pNewPos;
×
768
  }
769

770
_end:
×
UNCOV
771
  if (code != TSDB_CODE_SUCCESS) {
×
772
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
773
  }
UNCOV
774
  return code;
×
775
}
776

UNCOV
777
void deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen) {
×
UNCOV
778
  int32_t code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, keyLen);
×
UNCOV
779
  qTrace("%s at line %d res:%d", __func__, __LINE__, code_buff);
×
UNCOV
780
  int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey);
×
UNCOV
781
  qTrace("%s at line %d res:%d", __func__, __LINE__, code_file);
×
UNCOV
782
  if (pFileState->searchBuff != NULL) {
×
UNCOV
783
    deleteHashSortRowBuff(pFileState, pKey);
×
784
  }
UNCOV
785
}
×
786

UNCOV
787
void deleteRowBuffByGroupId(SStreamFileState* pFileState, uint64_t groupId) {
×
UNCOV
788
  SSHashObj* pRowMap = pFileState->rowStateBuff;
×
UNCOV
789
  void*   pIte = NULL;
×
UNCOV
790
  int32_t iter = 0;
×
UNCOV
791
  while ((pIte = tSimpleHashIterate(pRowMap, pIte, &iter)) != NULL) {
×
UNCOV
792
    size_t keyLen = 0;
×
UNCOV
793
    SWinKey* pKey = tSimpleHashGetKey(pIte, &keyLen);
×
UNCOV
794
    if (pKey->groupId == groupId) {
×
UNCOV
795
      int32_t tmpRes = tSimpleHashIterateRemove(pRowMap, pKey, keyLen, &pIte, &iter);
×
UNCOV
796
      qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
×
797
    }
798
  }
799

UNCOV
800
  while (1) {
×
UNCOV
801
    SWinKey tmp = {.ts = INT64_MIN, .groupId = groupId};
×
UNCOV
802
    SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pFileState->pFileStore, &tmp);
×
UNCOV
803
    SWinKey delKey = {.groupId = groupId};
×
UNCOV
804
    int32_t code = streamStateGetGroupKVByCur_rocksdb(pFileState->pFileStore, pCur, &delKey, NULL, 0);
×
UNCOV
805
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
806
      break;
×
807
    }
UNCOV
808
    code = streamStateDel_rocksdb(pFileState->pFileStore, &delKey);
×
UNCOV
809
    qTrace("%s at line %d res:%d", __func__, __LINE__, code);
×
810
  }
UNCOV
811
}
×
812

UNCOV
813
static int32_t recoverSessionRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
×
UNCOV
814
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
815
  int32_t lino = 0;
×
UNCOV
816
  int32_t len = 0;
×
UNCOV
817
  void*   pBuff = NULL;
×
UNCOV
818
  code = pFileState->stateFileGetFn(pFileState, pPos->pKey, &pBuff, &len);
×
UNCOV
819
  QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
820
  memcpy(pPos->pRowBuff, pBuff, len);
×
UNCOV
821
  taosMemoryFree(pBuff);
×
822

UNCOV
823
_end:
×
UNCOV
824
  if (code != TSDB_CODE_SUCCESS) {
×
825
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
826
  }
UNCOV
827
  return code;
×
828
}
829

UNCOV
830
static int32_t recoverStateRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
×
UNCOV
831
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
832
  int32_t lino = 0;
×
UNCOV
833
  pPos->pRowBuff = getFreeBuff(pFileState);
×
UNCOV
834
  if (!pPos->pRowBuff) {
×
UNCOV
835
    if (pFileState->curRowCount < pFileState->maxRowCount) {
×
UNCOV
836
      pPos->pRowBuff = taosMemoryCalloc(1, pFileState->rowSize);
×
UNCOV
837
      if (!pPos->pRowBuff) {
×
838
        code = terrno;
×
839
        QUERY_CHECK_CODE(code, lino, _end);
×
840
      }
UNCOV
841
      pFileState->curRowCount++;
×
842
    } else {
UNCOV
843
      code = clearRowBuff(pFileState);
×
UNCOV
844
      QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
845
      pPos->pRowBuff = getFreeBuff(pFileState);
×
846
    }
UNCOV
847
    QUERY_CHECK_CONDITION((pPos->pRowBuff != NULL), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
848
  }
849

UNCOV
850
  code = recoverSessionRowBuff(pFileState, pPos);
×
UNCOV
851
  QUERY_CHECK_CODE(code, lino, _end);
×
852

UNCOV
853
_end:
×
UNCOV
854
  if (code != TSDB_CODE_SUCCESS) {
×
855
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
856
  }
UNCOV
857
  return code;
×
858
}
859

UNCOV
860
int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal) {
×
UNCOV
861
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
862
  int32_t lino = 0;
×
UNCOV
863
  if (pPos->pRowBuff) {
×
UNCOV
864
    if (pPos->needFree) {
×
865
      code = recoverSessionRowBuff(pFileState, pPos);
×
866
      QUERY_CHECK_CODE(code, lino, _end);
×
867
    }
UNCOV
868
    (*pVal) = pPos->pRowBuff;
×
UNCOV
869
    goto _end;
×
870
  }
871

UNCOV
872
  code = recoverStateRowBuff(pFileState, pPos);
×
UNCOV
873
  QUERY_CHECK_CODE(code, lino, _end);
×
874

UNCOV
875
  (*pVal) = pPos->pRowBuff;
×
876
  // if (!pPos->needFree) {
877
  //   code = tdListPrepend(pFileState->usedBuffs, &pPos);
878
  //   QUERY_CHECK_CODE(code, lino, _end);
879
  // }
880

UNCOV
881
_end:
×
UNCOV
882
  if (code != TSDB_CODE_SUCCESS) {
×
883
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
884
  }
UNCOV
885
  return code;
×
886
}
887

UNCOV
888
bool hasRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, bool hasLimit, bool* pIsLast) {
×
UNCOV
889
  bool res = false;
×
UNCOV
890
  if (pIsLast != NULL) {
×
UNCOV
891
    (*pIsLast) = false;
×
892
  }
893

UNCOV
894
  SRowBuffPos** pos = tSimpleHashGet(pFileState->rowStateBuff, pKey, sizeof(SWinKey));
×
UNCOV
895
  if (pos) {
×
UNCOV
896
    res = true;
×
897
  }
UNCOV
898
  void* pSearchBuff = getSearchBuff(pFileState);
×
UNCOV
899
  if (pSearchBuff != NULL) {
×
UNCOV
900
    void** ppBuff = (void**)tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
×
UNCOV
901
    if (ppBuff != NULL) {
×
UNCOV
902
      SArray* pWinStates = (SArray*)(*ppBuff);
×
UNCOV
903
      if (pIsLast != NULL) {
×
UNCOV
904
        SWinKey* pLastKey = (SWinKey*)taosArrayGetLast(pWinStates);
×
UNCOV
905
        *pIsLast = (winKeyCmprImpl(pKey, pLastKey) == 0);
×
906
      }
UNCOV
907
      if (hasLimit && taosArrayGetSize(pWinStates) <= MIN_NUM_OF_SORT_CACHE_WIN) {
×
UNCOV
908
        res = true;
×
909
      }
UNCOV
910
      if (qDebugFlag & DEBUG_DEBUG) {
×
UNCOV
911
        if (taosArrayGetSize(pWinStates) > 0) {
×
UNCOV
912
          SWinKey* fistKey = (SWinKey*)taosArrayGet(pWinStates, 0);
×
UNCOV
913
          qDebug("===stream===check window state. buff min ts:%" PRId64 ",groupId:%" PRIu64 ".key ts:%" PRId64
×
914
                 ",groupId:%" PRIu64,
915
                 fistKey->ts, fistKey->groupId, pKey->ts, pKey->groupId);
916
        }
917
      }
918
    } else {
UNCOV
919
      res = true;
×
920
    }
921
  }
UNCOV
922
  return res;
×
923
}
924

UNCOV
925
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) {
×
UNCOV
926
  int64_t mark = (pFileState->deleteMark == INT64_MAX || pFileState->maxTs == INT64_MIN)
×
927
                     ? INT64_MIN
UNCOV
928
                     : pFileState->maxTs - pFileState->deleteMark;
×
UNCOV
929
  clearExpiredRowBuff(pFileState, mark, false);
×
UNCOV
930
  return pFileState->usedBuffs;
×
931
}
932

UNCOV
933
void flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState) {
×
UNCOV
934
  int32_t   code = TSDB_CODE_SUCCESS;
×
UNCOV
935
  int32_t   lino = 0;
×
UNCOV
936
  SListIter iter = {0};
×
UNCOV
937
  tdListInitIter(pSnapshot, &iter, TD_LIST_FORWARD);
×
938

UNCOV
939
  int64_t    st = taosGetTimestampMs();
×
UNCOV
940
  SListNode* pNode = NULL;
×
941

UNCOV
942
  int idx = streamStateGetCfIdx(pFileState->pFileStore, pFileState->cfName);
×
943

UNCOV
944
  int32_t len = (pFileState->rowSize + sizeof(uint64_t) + sizeof(int32_t) + 64) * 2;
×
UNCOV
945
  char*   buf = taosMemoryCalloc(1, len);
×
UNCOV
946
  if (!buf) {
×
947
    code = terrno;
×
948
    QUERY_CHECK_CODE(code, lino, _end);
×
949
  }
950

UNCOV
951
  void* batch = streamStateCreateBatch();
×
UNCOV
952
  if (!batch) {
×
953
    code = TSDB_CODE_OUT_OF_MEMORY;
×
954
    QUERY_CHECK_CODE(code, lino, _end);
×
955
  }
956

UNCOV
957
  while ((pNode = tdListNext(&iter)) != NULL && code == TSDB_CODE_SUCCESS) {
×
UNCOV
958
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
×
UNCOV
959
    if (pPos->beFlushed || !pPos->pRowBuff) {
×
UNCOV
960
      continue;
×
961
    }
UNCOV
962
    pPos->beFlushed = true;
×
UNCOV
963
    pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
×
964

UNCOV
965
    qDebug("===stream===flushed start:%" PRId64, pFileState->getTs(pPos->pKey));
×
UNCOV
966
    if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) {
×
UNCOV
967
      code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
×
UNCOV
968
      streamStateClearBatch(batch);
×
UNCOV
969
      QUERY_CHECK_CODE(code, lino, _end);
×
970
    }
971

UNCOV
972
    void* pSKey = pFileState->stateBuffCreateStateKeyFn(pPos, ((SStreamState*)pFileState->pFileStore)->number);
×
UNCOV
973
    QUERY_CHECK_NULL(pSKey, code, lino, _end, terrno);
×
974

UNCOV
975
    code = streamStatePutBatchOptimize(pFileState->pFileStore, idx, batch, pSKey, pPos->pRowBuff, pFileState->rowSize,
×
976
                                       0, buf);
UNCOV
977
    taosMemoryFreeClear(pSKey);
×
UNCOV
978
    QUERY_CHECK_CODE(code, lino, _end);
×
979
    // todo handle failure
UNCOV
980
    memset(buf, 0, len);
×
981
  }
UNCOV
982
  taosMemoryFreeClear(buf);
×
983

UNCOV
984
  int32_t numOfElems = streamStateGetBatchSize(batch);
×
UNCOV
985
  if (numOfElems > 0) {
×
UNCOV
986
    code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
×
UNCOV
987
    QUERY_CHECK_CODE(code, lino, _end);
×
988
  } else {
UNCOV
989
    goto _end;
×
990
  }
991

UNCOV
992
  streamStateClearBatch(batch);
×
993

UNCOV
994
  clearSearchBuff(pFileState);
×
995

UNCOV
996
  int64_t elapsed = taosGetTimestampMs() - st;
×
UNCOV
997
  qDebug("%s flush to disk in batch model completed, rows:%d, batch size:%d, elapsed time:%" PRId64 "ms",
×
998
         pFileState->id, numOfElems, BATCH_LIMIT, elapsed);
999

UNCOV
1000
  if (flushState) {
×
UNCOV
1001
    void*   valBuf = NULL;
×
UNCOV
1002
    int32_t len = 0;
×
UNCOV
1003
    code = streamFileStateEncode(&pFileState->flushMark, &valBuf, &len);
×
UNCOV
1004
    QUERY_CHECK_CODE(code, lino, _end);
×
1005

UNCOV
1006
    qDebug("===stream===flushMark write:%" PRId64, pFileState->flushMark);
×
UNCOV
1007
    code = streamStatePutBatch(pFileState->pFileStore, "default", batch, STREAM_STATE_INFO_NAME, valBuf, len, 0);
×
UNCOV
1008
    taosMemoryFree(valBuf);
×
UNCOV
1009
    QUERY_CHECK_CODE(code, lino, _end);
×
1010

UNCOV
1011
    code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
×
UNCOV
1012
    QUERY_CHECK_CODE(code, lino, _end);
×
1013
  }
1014

UNCOV
1015
_end:
×
UNCOV
1016
  if (code != TSDB_CODE_SUCCESS) {
×
1017
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1018
  }
UNCOV
1019
  taosMemoryFree(buf);
×
UNCOV
1020
  streamStateDestroyBatch(batch);
×
UNCOV
1021
}
×
1022

1023
int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId) {
×
1024
  char keyBuf[128] = {0};
×
1025
  TAOS_UNUSED(tsnprintf(keyBuf, sizeof(keyBuf), "%s:%" PRId64, TASK_KEY, checkpointId));
×
1026
  return streamDefaultDel_rocksdb(pFileState->pFileStore, keyBuf);
×
1027
}
1028

1029
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
×
1030
  int32_t code = TSDB_CODE_SUCCESS;
×
1031
  int64_t maxCheckPointId = 0;
×
1032
  {
1033
    char    buf[128] = {0};
×
1034
    void*   val = NULL;
×
1035
    int32_t len = 0;
×
1036
    memcpy(buf, TASK_KEY, strlen(TASK_KEY));
×
1037
    code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len);
×
1038
    if (code != 0 || len == 0 || val == NULL) {
×
1039
      return TSDB_CODE_FAILED;
×
1040
    }
1041
    memcpy(buf, val, len);
×
1042
    buf[len] = 0;
×
1043
    maxCheckPointId = taosStr2Int64((char*)buf, NULL, 10);
×
1044
    taosMemoryFree(val);
×
1045
  }
1046
  for (int64_t i = maxCheckPointId; i > 0; i--) {
×
1047
    char    buf[128] = {0};
×
1048
    void*   val = 0;
×
1049
    int32_t len = 0;
×
1050
    TAOS_UNUSED(tsnprintf(buf, sizeof(buf), "%s:%" PRId64, TASK_KEY, i));
×
1051
    code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len);
×
1052
    if (code != 0) {
×
1053
      return TSDB_CODE_FAILED;
×
1054
    }
1055
    memcpy(buf, val, len);
×
1056
    buf[len] = 0;
×
1057
    taosMemoryFree(val);
×
1058

1059
    TSKEY ts;
1060
    ts = taosStr2Int64((char*)buf, NULL, 10);
×
1061
    if (ts < mark) {
×
1062
      // statekey winkey.ts < mark
1063
      int32_t tmpRes = forceRemoveCheckpoint(pFileState, i);
×
1064
      qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
×
1065
      break;
×
1066
    }
1067
  }
1068
  return code;
×
1069
}
1070

UNCOV
1071
int32_t recoverSession(SStreamFileState* pFileState, int64_t ckId) {
×
UNCOV
1072
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1073
  int32_t lino = 0;
×
UNCOV
1074
  int32_t winRes = TSDB_CODE_SUCCESS;
×
UNCOV
1075
  if (pFileState->maxTs != INT64_MIN) {
×
1076
    int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs)
×
1077
                       ? INT64_MIN
1078
                       : pFileState->maxTs - pFileState->deleteMark;
×
1079
    int32_t tmpRes = deleteExpiredCheckPoint(pFileState, mark);
×
1080
    qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
×
1081
  }
1082

UNCOV
1083
  SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileState->pFileStore, INT64_MAX);
×
UNCOV
1084
  int32_t          recoverNum = TMIN(MIN_NUM_OF_RECOVER_ROW_BUFF, pFileState->maxRowCount);
×
UNCOV
1085
  while (winRes == TSDB_CODE_SUCCESS) {
×
UNCOV
1086
    if (pFileState->curRowCount >= recoverNum) {
×
UNCOV
1087
      break;
×
1088
    }
1089

UNCOV
1090
    void*       pVal = NULL;
×
UNCOV
1091
    int32_t     vlen = 0;
×
UNCOV
1092
    SSessionKey key = {0};
×
UNCOV
1093
    winRes = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, &key, &pVal, &vlen);
×
UNCOV
1094
    if (winRes != TSDB_CODE_SUCCESS) {
×
UNCOV
1095
      break;
×
1096
    }
1097

1098
    if (vlen != pFileState->rowSize) {
×
1099
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1100
      qError("[InternalERR] read key:[skey:%"PRId64 ",ekey:%"PRId64 ",groupId:%"PRIu64 "],vlen:%d, rowSize:%d", key.win.skey, key.win.ekey, key.groupId, vlen, pFileState->rowSize);
×
1101
      QUERY_CHECK_CODE(code, lino, _end);
×
1102
    }
1103

1104
    SRowBuffPos* pPos = createSessionWinBuff(pFileState, &key, pVal, &vlen);
×
1105
    pPos->beUsed = false;
×
1106
    winRes = putSessionWinResultBuff(pFileState, pPos);
×
1107
    if (winRes != TSDB_CODE_SUCCESS) {
×
1108
      break;
×
1109
    }
1110

1111
    winRes = streamStateSessionCurPrev_rocksdb(pCur);
×
1112
  }
1113

1114
_end:
×
UNCOV
1115
  if (code != TSDB_CODE_SUCCESS) {
×
1116
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1117
  }
UNCOV
1118
  streamStateFreeCur(pCur);
×
UNCOV
1119
  return code;
×
1120
}
1121

UNCOV
1122
int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
×
UNCOV
1123
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1124
  int32_t lino = 0;
×
UNCOV
1125
  int32_t winCode = TSDB_CODE_SUCCESS;
×
UNCOV
1126
  if (pFileState->maxTs != INT64_MIN) {
×
1127
    int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs)
×
1128
                       ? INT64_MIN
1129
                       : pFileState->maxTs - pFileState->deleteMark;
×
1130
    int32_t tmpRes = deleteExpiredCheckPoint(pFileState, mark);
×
1131
    qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
×
1132
  }
1133

UNCOV
1134
  SStreamStateCur* pCur = streamStateSeekToLast_rocksdb(pFileState->pFileStore);
×
UNCOV
1135
  int32_t          recoverNum = TMIN(MIN_NUM_OF_RECOVER_ROW_BUFF, pFileState->maxRowCount);
×
UNCOV
1136
  while (winCode == TSDB_CODE_SUCCESS) {
×
UNCOV
1137
    if (pFileState->curRowCount >= recoverNum) {
×
UNCOV
1138
      break;
×
1139
    }
1140

UNCOV
1141
    void*        pVal = NULL;
×
UNCOV
1142
    int32_t      vlen = 0;
×
UNCOV
1143
    SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
×
UNCOV
1144
    if (!pNewPos || !pNewPos->pRowBuff) {
×
1145
      code = TSDB_CODE_OUT_OF_MEMORY;
×
1146
      QUERY_CHECK_CODE(code, lino, _end);
×
1147
    }
1148

1149
    winCode =
UNCOV
1150
        streamStateGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, pNewPos->pKey, (const void**)&pVal, &vlen);
×
UNCOV
1151
    qDebug("===stream=== get state by cur winres:%d. %s", winCode, __func__);
×
UNCOV
1152
    if (winCode != TSDB_CODE_SUCCESS || pFileState->getTs(pNewPos->pKey) < pFileState->flushMark) {
×
UNCOV
1153
      destroyRowBuffPos(pNewPos);
×
UNCOV
1154
      SListNode* pNode = tdListPopTail(pFileState->usedBuffs);
×
UNCOV
1155
      taosMemoryFreeClear(pNode);
×
UNCOV
1156
      taosMemoryFreeClear(pVal);
×
UNCOV
1157
      break;
×
1158
    }
1159
    if (vlen != pFileState->rowSize) {
×
1160
      qError("row size mismatch, expect:%d, actual:%d", pFileState->rowSize, vlen);
×
1161
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1162
      taosMemoryFreeClear(pVal);
×
1163
      QUERY_CHECK_CODE(code, lino, _end);
×
1164
    }
1165
    memcpy(pNewPos->pRowBuff, pVal, vlen);
×
1166
    taosMemoryFreeClear(pVal);
×
1167
    pNewPos->beFlushed = true;
×
1168
    pNewPos->beUsed = false;
×
1169
    qDebug("===stream=== read checkpoint state from disc. %s", __func__);
×
1170
    code = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
×
1171
    if (code != TSDB_CODE_SUCCESS) {
×
1172
      destroyRowBuffPos(pNewPos);
×
1173
      break;
×
1174
    }
1175
    streamStateCurPrev_rocksdb(pCur);
×
1176
  }
1177

1178
_end:
×
UNCOV
1179
  if (code != TSDB_CODE_SUCCESS) {
×
1180
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1181
  }
UNCOV
1182
  streamStateFreeCur(pCur);
×
UNCOV
1183
  return code;
×
1184
}
1185

UNCOV
1186
int32_t streamFileStateGetSelectRowSize(SStreamFileState* pFileState) { return pFileState->selectivityRowSize; }
×
1187

UNCOV
1188
void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts) {
×
UNCOV
1189
  pFileState->flushMark = TMAX(pFileState->flushMark, ts);
×
UNCOV
1190
  pFileState->maxTs = TMAX(pFileState->maxTs, ts);
×
UNCOV
1191
}
×
1192

UNCOV
1193
void* getRowStateBuff(SStreamFileState* pFileState) { return pFileState->rowStateBuff; }
×
UNCOV
1194
void* getSearchBuff(SStreamFileState* pFileState) { return pFileState->searchBuff; }
×
1195

UNCOV
1196
void* getStateFileStore(SStreamFileState* pFileState) { return pFileState->pFileStore; }
×
1197

UNCOV
1198
bool isDeteled(SStreamFileState* pFileState, TSKEY ts) {
×
UNCOV
1199
  return pFileState->deleteMark != INT64_MAX && pFileState->maxTs > 0 &&
×
UNCOV
1200
         ts < (pFileState->maxTs - pFileState->deleteMark);
×
1201
}
1202

UNCOV
1203
bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) { return ts <= (pFileState->flushMark + gap); }
×
1204

UNCOV
1205
TSKEY getFlushMark(SStreamFileState* pFileState) { return pFileState->flushMark; };
×
1206

UNCOV
1207
int32_t getRowStateRowSize(SStreamFileState* pFileState) { return pFileState->rowSize; }
×
1208

UNCOV
1209
int32_t getFunctionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) {
×
UNCOV
1210
  int32_t winCode = TSDB_CODE_SUCCESS;
×
UNCOV
1211
  return pFileState->stateFunctionGetFn(pFileState, pKey, keyLen, pVal, pVLen, &winCode);
×
1212
}
1213

UNCOV
1214
int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId) {
×
UNCOV
1215
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1216
  int32_t lino = 0;
×
UNCOV
1217
  if (pFileState->maxTs != INT64_MIN) {
×
1218
    int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs)
×
1219
                       ? INT64_MIN
1220
                       : pFileState->maxTs - pFileState->deleteMark;
×
1221
    code = deleteExpiredCheckPoint(pFileState, mark);
×
1222
    QUERY_CHECK_CODE(code, lino, _end);
×
1223
  }
1224

UNCOV
1225
  SStreamStateCur* pCur = streamStateFillSeekToLast_rocksdb(pFileState->pFileStore);
×
UNCOV
1226
  if (pCur == NULL) {
×
UNCOV
1227
    return code;
×
1228
  }
UNCOV
1229
  int32_t recoverNum = TMIN(MIN_NUM_OF_RECOVER_ROW_BUFF, pFileState->maxRowCount);
×
UNCOV
1230
  int32_t winRes = TSDB_CODE_SUCCESS;
×
UNCOV
1231
  while (winRes == TSDB_CODE_SUCCESS) {
×
UNCOV
1232
    if (pFileState->curRowCount >= recoverNum) {
×
UNCOV
1233
      break;
×
1234
    }
1235

UNCOV
1236
    void*        pVal = NULL;
×
UNCOV
1237
    int32_t      vlen = 0;
×
UNCOV
1238
    SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
×
UNCOV
1239
    winRes = streamStateFillGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &vlen);
×
UNCOV
1240
    qDebug("===stream=== get state by cur winres:%d. %s", winRes, __func__);
×
UNCOV
1241
    if (winRes != TSDB_CODE_SUCCESS || isFlushedState(pFileState, pFileState->getTs(pNewPos->pKey), 0)) {
×
UNCOV
1242
      destroyRowBuffPos(pNewPos);
×
UNCOV
1243
      SListNode* pNode = tdListPopTail(pFileState->usedBuffs);
×
UNCOV
1244
      taosMemoryFreeClear(pNode);
×
UNCOV
1245
      taosMemoryFreeClear(pVal);
×
UNCOV
1246
      break;
×
1247
    }
1248

UNCOV
1249
    if (vlen != pFileState->rowSize) {
×
1250
      qError("row size mismatch, expect:%d, actual:%d", pFileState->rowSize, vlen);
×
1251
      destroyRowBuffPos(pNewPos);
×
1252
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1253
      taosMemoryFreeClear(pVal);
×
1254
      QUERY_CHECK_CODE(code, lino, _end);
×
1255
    }
1256

UNCOV
1257
    memcpy(pNewPos->pRowBuff, pVal, vlen);
×
UNCOV
1258
    taosMemoryFreeClear(pVal);
×
UNCOV
1259
    pNewPos->beFlushed = true;
×
UNCOV
1260
    pNewPos->beUsed = false;
×
UNCOV
1261
    qDebug("===stream=== read checkpoint state from disc. %s", __func__);
×
UNCOV
1262
    winRes = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
×
UNCOV
1263
    if (winRes != TSDB_CODE_SUCCESS) {
×
1264
      destroyRowBuffPos(pNewPos);
×
1265
      break;
×
1266
    }
UNCOV
1267
    streamStateCurPrev_rocksdb(pCur);
×
1268
  }
UNCOV
1269
  streamStateFreeCur(pCur);
×
1270

UNCOV
1271
_end:
×
UNCOV
1272
  if (code != TSDB_CODE_SUCCESS) {
×
1273
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1274
  }
UNCOV
1275
  return code;
×
1276
}
1277

UNCOV
1278
int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,
×
1279
                   int32_t* pWinCode) {
UNCOV
1280
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1281
  int32_t lino = 0;
×
UNCOV
1282
  (*pWinCode) = TSDB_CODE_FAILED;
×
UNCOV
1283
  pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey));
×
UNCOV
1284
  SRowBuffPos** ppPos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen);
×
UNCOV
1285
  if (ppPos) {
×
UNCOV
1286
    *pVLen = pFileState->rowSize;
×
UNCOV
1287
    *pVal = *ppPos;
×
UNCOV
1288
    (*ppPos)->beUsed = true;
×
UNCOV
1289
    (*ppPos)->beFlushed = false;
×
UNCOV
1290
    (*pWinCode) = TSDB_CODE_SUCCESS;
×
UNCOV
1291
    if ((*ppPos)->pRowBuff == NULL) {
×
1292
      code = recoverStateRowBuff(pFileState, *ppPos);
×
1293
      QUERY_CHECK_CODE(code, lino, _end);
×
1294
    }
UNCOV
1295
    goto _end;
×
1296
  }
UNCOV
1297
  TSKEY ts = pFileState->getTs(pKey);
×
UNCOV
1298
  if (!isDeteled(pFileState, ts) && isFlushedState(pFileState, ts, 0)) {
×
UNCOV
1299
    int32_t len = 0;
×
UNCOV
1300
    void*   p = NULL;
×
UNCOV
1301
    (*pWinCode) = pFileState->stateFileGetFn(pFileState, pKey, &p, &len);
×
UNCOV
1302
    qDebug("===stream===get %" PRId64 " from disc, res %d", ts, (*pWinCode));
×
UNCOV
1303
    if ((*pWinCode) == TSDB_CODE_SUCCESS) {
×
UNCOV
1304
      SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
×
UNCOV
1305
      if (!pNewPos || !pNewPos->pRowBuff) {
×
1306
        code = TSDB_CODE_OUT_OF_MEMORY;
×
1307
        QUERY_CHECK_CODE(code, lino, _end);
×
1308
      }
1309

UNCOV
1310
      memcpy(pNewPos->pKey, pKey, keyLen);
×
UNCOV
1311
      memcpy(pNewPos->pRowBuff, p, len);
×
UNCOV
1312
      code = tSimpleHashPut(pFileState->rowStateBuff, pKey, keyLen, &pNewPos, POINTER_BYTES);
×
UNCOV
1313
      QUERY_CHECK_CODE(code, lino, _end);
×
1314

UNCOV
1315
      if (pVal) {
×
UNCOV
1316
        *pVLen = pFileState->rowSize;
×
UNCOV
1317
        *pVal = pNewPos;
×
1318
      }
1319
    }
UNCOV
1320
    taosMemoryFree(p);
×
1321
  }
1322

UNCOV
1323
_end:
×
UNCOV
1324
  if (code != TSDB_CODE_SUCCESS) {
×
1325
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1326
  }
UNCOV
1327
  return code;
×
1328
}
1329

UNCOV
1330
int32_t streamFileStateGroupPut(SStreamFileState* pFileState, int64_t groupId, void* value, int32_t vLen) {
×
UNCOV
1331
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1332
  int32_t lino = 0;
×
UNCOV
1333
  if (value != NULL) {
×
1334
    code = TSDB_CODE_INVALID_PARA;
×
1335
    QUERY_CHECK_CODE(code, lino, _end);
×
1336
  }
1337

UNCOV
1338
  if (tSimpleHashGet(pFileState->pGroupIdMap, &groupId, sizeof(int64_t)) == NULL) {
×
UNCOV
1339
    if (tSimpleHashGetSize(pFileState->pGroupIdMap) <= MAX_GROUP_ID_NUM) {
×
UNCOV
1340
      code = tSimpleHashPut(pFileState->pGroupIdMap, &groupId, sizeof(int64_t), NULL, 0);
×
UNCOV
1341
      QUERY_CHECK_CODE(code, lino, _end);
×
1342
    }
UNCOV
1343
    code = streamStatePutParTag_rocksdb(pFileState->pFileStore, groupId, value, vLen);
×
UNCOV
1344
    QUERY_CHECK_CODE(code, lino, _end);
×
1345
  }
1346

UNCOV
1347
_end:
×
UNCOV
1348
  if (code != TSDB_CODE_SUCCESS) {
×
1349
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1350
  }
UNCOV
1351
  return code;
×
1352
}
1353

UNCOV
1354
void streamFileStateGroupCurNext(SStreamStateCur* pCur) {
×
UNCOV
1355
  SStreamFileState* pFileState = (SStreamFileState*)pCur->pStreamFileState;
×
UNCOV
1356
  if (pCur->hashIter == -1) {
×
UNCOV
1357
    streamStateCurNext(pFileState->pFileStore, pCur);
×
UNCOV
1358
    return;
×
1359
  }
1360

UNCOV
1361
  int64_t gpId = *(int64_t*)tSimpleHashGetKey(pCur->pHashData, NULL);
×
UNCOV
1362
  pCur->minGpId = TMAX(pCur->minGpId, gpId);
×
1363

UNCOV
1364
  SSHashObj* pHash = pFileState->pGroupIdMap;
×
UNCOV
1365
  pCur->pHashData = tSimpleHashIterate(pHash, pCur->pHashData, &pCur->hashIter);
×
UNCOV
1366
  if (!pCur->pHashData) {
×
UNCOV
1367
    pCur->hashIter = -1;
×
UNCOV
1368
    streamStateParTagSeekKeyNext_rocksdb(pFileState->pFileStore, pCur->minGpId, pCur);
×
UNCOV
1369
    return;
×
1370
  }
1371
}
1372

UNCOV
1373
int32_t streamFileStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void** pVal, int32_t* pVLen) {
×
UNCOV
1374
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1375
  if (pCur->pHashData) {
×
UNCOV
1376
    *pKey = *(int64_t*)tSimpleHashGetKey(pCur->pHashData, NULL);
×
UNCOV
1377
    return code;
×
1378
  }
UNCOV
1379
  return streamStateParTagGetKVByCur_rocksdb(pCur, pKey, NULL, NULL);
×
1380
}
1381

UNCOV
1382
SSHashObj* getGroupIdCache(SStreamFileState* pFileState) {
×
UNCOV
1383
  return pFileState->pGroupIdMap;
×
1384
}
1385

UNCOV
1386
void clearExpiredState(SStreamFileState* pFileState, int32_t numOfKeep, TSKEY minTs) {
×
UNCOV
1387
  int32_t    code = TSDB_CODE_SUCCESS;
×
UNCOV
1388
  int32_t    lino = 0;
×
UNCOV
1389
  SSHashObj* pSearchBuff = pFileState->searchBuff;
×
UNCOV
1390
  void*      pIte = NULL;
×
UNCOV
1391
  int32_t    iter = 0;
×
UNCOV
1392
  while ((pIte = tSimpleHashIterate(pSearchBuff, pIte, &iter)) != NULL) {
×
UNCOV
1393
    SArray* pWinStates = *((void**)pIte);
×
UNCOV
1394
    int32_t arraySize = TARRAY_SIZE(pWinStates);
×
UNCOV
1395
    if (minTs != INT64_MAX && arraySize > numOfKeep) {
×
UNCOV
1396
      SWinKey key = {.ts = minTs};
×
UNCOV
1397
      key.groupId = *(uint64_t*)tSimpleHashGetKey(pIte, NULL);
×
UNCOV
1398
      int32_t index = binarySearch(pWinStates, arraySize, &key, fillStateKeyCompare);
×
UNCOV
1399
      numOfKeep = TMAX(arraySize - index, MIN_NUM_OF_SORT_CACHE_WIN);
×
UNCOV
1400
      qDebug("modify numOfKeep, numOfKeep:%d. %s at line %d", numOfKeep, __func__, __LINE__);
×
1401
    }
1402

UNCOV
1403
    int32_t size = arraySize - numOfKeep;
×
UNCOV
1404
    for (int32_t i = 0; i < size; i++) {
×
UNCOV
1405
      SWinKey* pKey = taosArrayGet(pWinStates, i);
×
UNCOV
1406
      int32_t  code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, sizeof(SWinKey));
×
UNCOV
1407
      qTrace("clear expired buff, ts:%" PRId64 ",groupid:%" PRIu64 ". %s at line %d res:%d", pKey->ts, pKey->groupId, __func__, __LINE__, code_buff);
×
1408

UNCOV
1409
      if (isFlushedState(pFileState, pKey->ts, 0)) {
×
UNCOV
1410
        int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey);
×
UNCOV
1411
        qTrace("clear expired file, ts:%" PRId64 ". %s at line %d res:%d", pKey->ts, __func__, __LINE__, code_file);
×
1412
      }
1413

UNCOV
1414
      if (tSimpleHashGetSize(pFileState->pRecFlagMap) > 0) {
×
UNCOV
1415
        tSimpleHashRemove(pFileState->pRecFlagMap, pKey, sizeof(SWinKey));
×
1416
      }
1417
    }
UNCOV
1418
    taosArrayRemoveBatch(pWinStates, 0, size, NULL);
×
1419
  }
UNCOV
1420
  code = clearRowBuffNonFlush(pFileState);
×
UNCOV
1421
  QUERY_CHECK_CODE(code, lino, _end);
×
1422

UNCOV
1423
_end:
×
UNCOV
1424
  if (code != TSDB_CODE_SUCCESS) {
×
1425
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1426
  }
UNCOV
1427
}
×
1428

1429
#ifdef BUILD_NO_CALL
1430
int32_t getStateSearchRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen,
1431
                              int32_t* pWinCode) {
1432
  int32_t code = TSDB_CODE_SUCCESS;
1433
  int32_t lino = 0;
1434

1435
  code = addRowBuffIfNotExist(pFileState, (void*)pKey, sizeof(SWinKey), pVal, pVLen, pWinCode);
1436
  QUERY_CHECK_CODE(code, lino, _end);
1437

1438
  SArray*    pWinStates = NULL;
1439
  SSHashObj* pSearchBuff = getSearchBuff(pFileState);
1440
  void**     ppBuff = tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
1441
  if (ppBuff) {
1442
    pWinStates = (SArray*)(*ppBuff);
1443
  } else {
1444
    pWinStates = taosArrayInit(16, sizeof(SWinKey));
1445
    QUERY_CHECK_NULL(pWinStates, code, lino, _end, terrno);
1446

1447
    code = tSimpleHashPut(pSearchBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
1448
    QUERY_CHECK_CODE(code, lino, _end);
1449
  }
1450

1451
  // recover
1452
  if (taosArrayGetSize(pWinStates) == 0 && needClearDiskBuff(pFileState)) {
1453
    recoverHashSortBuff(pFileState, pWinStates, pKey->groupId);
1454
  }
1455

1456
  int32_t size = taosArrayGetSize(pWinStates);
1457
  int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
1458
  if (!isFlushedState(pFileState, pKey->ts, 0) || index >= 0) {
1459
    // find the first position which is smaller than the pKey
1460
    if (index >= 0) {
1461
      SWinKey* pTmpKey = taosArrayGet(pWinStates, index);
1462
      if (winKeyCmprImpl(pTmpKey, pKey) == 0) {
1463
        goto _end;
1464
      }
1465
    }
1466
    index++;
1467
    void* tmp = taosArrayInsert(pWinStates, index, pKey);
1468
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1469
  }
1470

1471
  if (size >= MAX_NUM_OF_CACHE_WIN) {
1472
    int32_t num = size - NUM_OF_CACHE_WIN;
1473
    taosArrayRemoveBatch(pWinStates, 0, num, NULL);
1474
  }
1475

1476
_end:
1477
  if (code != TSDB_CODE_SUCCESS) {
1478
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1479
  }
1480
  return code;
1481
}
1482
#endif
1483

UNCOV
1484
int32_t getRowStatePrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** ppVal,
×
1485
                           int32_t* pVLen, int32_t* pWinCode) {
UNCOV
1486
  int32_t    code = TSDB_CODE_SUCCESS;
×
UNCOV
1487
  int32_t    lino = 0;
×
UNCOV
1488
  SArray*    pWinStates = NULL;
×
UNCOV
1489
  SSHashObj* pSearchBuff = getSearchBuff(pFileState);
×
UNCOV
1490
  void*      pState = getStateFileStore(pFileState);
×
UNCOV
1491
  void**     ppBuff = (void**)tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
×
UNCOV
1492
  if (ppBuff) {
×
UNCOV
1493
    pWinStates = (SArray*)(*ppBuff);
×
1494
  } else if (needClearDiskBuff(pFileState)) {
×
1495
    qDebug("===stream=== search buff is empty.group id:%" PRId64, pKey->groupId);
×
1496
    SStreamStateCur* pCur = streamStateSeekKeyPrev_rocksdb(pState, pKey);
×
1497
    void*            tmpVal = NULL;
×
1498
    int32_t          len = 0;
×
1499
    (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pState, pCur, pResKey, (const void**)&tmpVal, &len);
×
1500
    if ((*pWinCode) == TSDB_CODE_SUCCESS) {
×
1501
      SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
×
1502
      if (!pNewPos || !pNewPos->pRowBuff) {
×
1503
        code = TSDB_CODE_OUT_OF_MEMORY;
×
1504
        QUERY_CHECK_CODE(code, lino, _end);
×
1505
      }
1506
      memcpy(pNewPos->pRowBuff, tmpVal, len);
×
1507
      taosMemoryFreeClear(tmpVal);
×
1508
      *pVLen = getRowStateRowSize(pFileState);
×
1509
      (*ppVal) = pNewPos;
×
1510
    }
1511
    streamStateFreeCur(pCur);
×
1512
    return code;
×
1513
  } else {
1514
    (*pWinCode) = TSDB_CODE_FAILED;
×
1515
    return code;
×
1516
  }
UNCOV
1517
  int32_t size = taosArrayGetSize(pWinStates);
×
UNCOV
1518
  int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
×
UNCOV
1519
  if (index >= 0) {
×
UNCOV
1520
    SWinKey* pCurKey = taosArrayGet(pWinStates, index);
×
UNCOV
1521
    if (winKeyCmprImpl(pCurKey, pKey) == 0) {
×
UNCOV
1522
      index--;
×
1523
    } else {
1524
      qDebug("%s failed at line %d since do not find cur SWinKey. trigger may be force window close", __func__,
×
1525
             __LINE__);
1526
    }
1527
  }
UNCOV
1528
  if (index == -1) {
×
UNCOV
1529
    SStreamStateCur* pCur = streamStateSeekKeyPrev_rocksdb(pState, pKey);
×
UNCOV
1530
    void*            tmpVal = NULL;
×
UNCOV
1531
    int32_t          len = 0;
×
UNCOV
1532
    (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pState, pCur, pResKey, (const void**)&tmpVal, &len);
×
UNCOV
1533
    if ((*pWinCode) == TSDB_CODE_SUCCESS) {
×
1534
      SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
×
1535
      if (!pNewPos || !pNewPos->pRowBuff) {
×
1536
        code = TSDB_CODE_OUT_OF_MEMORY;
×
1537
        QUERY_CHECK_CODE(code, lino, _end);
×
1538
      }
1539
      memcpy(pNewPos->pRowBuff, tmpVal, len);
×
1540
      taosMemoryFreeClear(tmpVal);
×
1541
      *pVLen = getRowStateRowSize(pFileState);
×
1542
      (*ppVal) = pNewPos;
×
1543
    }
UNCOV
1544
    streamStateFreeCur(pCur);
×
UNCOV
1545
    return code;
×
1546
  } else {
UNCOV
1547
    SWinKey* pPrevKey = taosArrayGet(pWinStates, index);
×
UNCOV
1548
    *pResKey = *pPrevKey;
×
UNCOV
1549
    return addRowBuffIfNotExist(pFileState, (void*)pPrevKey, sizeof(SWinKey), ppVal, pVLen, pWinCode);
×
1550
  }
1551
  (*pWinCode) = TSDB_CODE_FAILED;
1552

1553
_end:
×
1554
  if (code != TSDB_CODE_SUCCESS) {
×
1555
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1556
  }
1557
  return code;
×
1558
}
1559

UNCOV
1560
int32_t addSearchItem(SStreamFileState* pFileState, SArray* pWinStates, const SWinKey* pKey, bool* pIsEnd) {
×
UNCOV
1561
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1562
  int32_t lino = 0;
×
UNCOV
1563
  int32_t size = taosArrayGetSize(pWinStates);
×
UNCOV
1564
  int32_t index = binarySearch(pWinStates, size, pKey, fillTSKeyCompare);
×
UNCOV
1565
  if (!isFlushedState(pFileState, pKey->ts, 0) || index >= 0 || size == 0) {
×
UNCOV
1566
    if (index >= 0) {
×
UNCOV
1567
      SWinKey* pTmpKey = taosArrayGet(pWinStates, index);
×
UNCOV
1568
      if (winKeyCmprImpl(pTmpKey, pKey) == 0) {
×
UNCOV
1569
        goto _end;
×
1570
      }
1571
    }
UNCOV
1572
    index++;
×
UNCOV
1573
    (*pIsEnd) = (index >= size);
×
UNCOV
1574
    void* tmp = taosArrayInsert(pWinStates, index, pKey);
×
UNCOV
1575
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
1576
  }
1577

UNCOV
1578
_end:
×
UNCOV
1579
  if (code != TSDB_CODE_SUCCESS) {
×
1580
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1581
  }
UNCOV
1582
  return code;
×
1583
}
1584

UNCOV
1585
int32_t addArrayBuffIfNotExist(SSHashObj* pSearchBuff, uint64_t groupId, SArray** ppResStates) {
×
UNCOV
1586
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1587
  int32_t lino = 0;
×
UNCOV
1588
  SArray* pWinStates = NULL;
×
UNCOV
1589
  void**  ppBuff = tSimpleHashGet(pSearchBuff, &groupId, sizeof(uint64_t));
×
UNCOV
1590
  if (ppBuff) {
×
UNCOV
1591
    pWinStates = (SArray*)(*ppBuff);
×
1592
  } else {
UNCOV
1593
    pWinStates = taosArrayInit(16, sizeof(SWinKey));
×
UNCOV
1594
    QUERY_CHECK_NULL(pWinStates, code, lino, _end, terrno);
×
1595

UNCOV
1596
    code = tSimpleHashPut(pSearchBuff, &groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
×
UNCOV
1597
    QUERY_CHECK_CODE(code, lino, _end);
×
1598
  }
1599

UNCOV
1600
  (*ppResStates) = pWinStates;
×
1601

UNCOV
1602
_end:
×
UNCOV
1603
  if (code != TSDB_CODE_SUCCESS) {
×
1604
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1605
  }
UNCOV
1606
  return code;
×
1607
}
1608

UNCOV
1609
static void setValueBuff(TSKEY ts, char* pVal, int32_t len, char* pBuff, int32_t buffLen) {
×
UNCOV
1610
  SET_TSDATA_FLAG(pBuff, buffLen);
×
UNCOV
1611
  if (len == 0) {
×
UNCOV
1612
    *(TSKEY*)pBuff = ts;
×
UNCOV
1613
    return;
×
1614
  }
1615
  memset(pBuff, 0, buffLen - 1);
×
1616
  *(TSKEY*)pBuff = ts;
×
1617
  memcpy(pBuff + sizeof(TSKEY), pVal, len);
×
1618
}
1619

UNCOV
1620
int32_t getAndSetTsData(STableTsDataState* pTsDataState, uint64_t tableUid, TSKEY* pCurTs, void** ppCurPkVal,
×
1621
                        TSKEY lastTs, void* pLastPkVal, int32_t lastPkLen, int32_t* pWinCode) {
UNCOV
1622
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1623
  int32_t lino = 0;
×
UNCOV
1624
  bool    hasPk = (lastPkLen != 0);
×
1625

UNCOV
1626
  TSKEY* pDataVal = tSimpleHashGet(pTsDataState->pTableTsDataMap, &tableUid, sizeof(uint64_t));
×
UNCOV
1627
  if (pDataVal != NULL) {
×
UNCOV
1628
    (*pWinCode) = TSDB_CODE_SUCCESS;
×
UNCOV
1629
    *pCurTs = *pDataVal;
×
UNCOV
1630
    if ((*pCurTs) < lastTs) {
×
UNCOV
1631
      setValueBuff(lastTs, pLastPkVal, lastPkLen, (char*)pDataVal, pTsDataState->pkValLen);
×
1632
    } else {
UNCOV
1633
      if (hasPk) {
×
1634
        (*ppCurPkVal) = POINTER_SHIFT(pDataVal, sizeof(TSKEY));
×
1635
        if ((*pCurTs) == lastTs && pTsDataState->comparePkColFn((*ppCurPkVal), pLastPkVal) < 0) {
×
1636
          setValueBuff(lastTs, pLastPkVal, lastPkLen, (char*)pDataVal, pTsDataState->pkValLen);
×
1637
        }
1638
      }
1639
    }
1640
  } else {
UNCOV
1641
    setValueBuff(lastTs, pLastPkVal, lastPkLen, pTsDataState->pPkValBuff, pTsDataState->pkValLen);
×
UNCOV
1642
    int32_t size = tSimpleHashGetSize(pTsDataState->pTableTsDataMap);
×
UNCOV
1643
    if (size < MAX_STATE_MAP_SIZE) {
×
UNCOV
1644
      (*pWinCode) = TSDB_CODE_FAILED;
×
UNCOV
1645
      code = tSimpleHashPut(pTsDataState->pTableTsDataMap, &tableUid, sizeof(uint64_t), pTsDataState->pPkValBuff,
×
UNCOV
1646
                            pTsDataState->pkValLen);
×
UNCOV
1647
      QUERY_CHECK_CODE(code, lino, _end);
×
1648
    } else {
1649
      (*pWinCode) = streamStateGetParTag_rocksdb(pTsDataState->pState, tableUid, &pTsDataState->pPkValBuff,
×
1650
                                                 &pTsDataState->pkValLen);
1651
      if ((*pWinCode) == TSDB_CODE_SUCCESS) {
×
1652
        *pCurTs = *(TSKEY*)pTsDataState->pPkValBuff;
×
1653
        if (hasPk) {
×
1654
          (*ppCurPkVal) = POINTER_SHIFT(pTsDataState->pPkValBuff, sizeof(TSKEY));
×
1655
        }
1656
      }
1657

1658
      int32_t tmpCode = streamStatePutParTag_rocksdb(pTsDataState->pState, tableUid, pTsDataState->pPkValBuff,
×
1659
                                                     pTsDataState->pkValLen);
1660
    }
1661
  }
1662

UNCOV
1663
_end:
×
UNCOV
1664
  if (code != TSDB_CODE_SUCCESS) {
×
1665
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1666
  }
UNCOV
1667
  return code;
×
1668
}
1669

UNCOV
1670
int32_t doTsDataCommit(STableTsDataState* pTsDataState) {
×
UNCOV
1671
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1672
  int32_t lino = 0;
×
UNCOV
1673
  void*   batch = NULL;
×
UNCOV
1674
  char*   pTempBuf = NULL;
×
1675

UNCOV
1676
  batch = streamStateCreateBatch();
×
UNCOV
1677
  QUERY_CHECK_NULL(batch, code, lino, _end, terrno);
×
UNCOV
1678
  int           idx = streamStateGetCfIdx(pTsDataState->pState, "partag");
×
UNCOV
1679
  int32_t       len = (pTsDataState->pkValLen + sizeof(uint64_t) + sizeof(int32_t) + 64) * 2;
×
UNCOV
1680
  pTempBuf = taosMemoryCalloc(1, len);
×
UNCOV
1681
  QUERY_CHECK_NULL(pTempBuf, code, lino, _end, terrno);
×
1682

UNCOV
1683
  void*   pIte = NULL;
×
UNCOV
1684
  int32_t iter = 0;
×
UNCOV
1685
  while ((pIte = tSimpleHashIterate(pTsDataState->pTableTsDataMap, pIte, &iter)) != NULL) {
×
UNCOV
1686
    if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) {
×
1687
      code = streamStatePutBatch_rocksdb(pTsDataState->pState, batch);
×
1688
      streamStateClearBatch(batch);
×
1689
      QUERY_CHECK_CODE(code, lino, _end);
×
1690
    }
1691

UNCOV
1692
    if (HAS_TSDATA_FLAG(pIte, pTsDataState->pkValLen)) {
×
UNCOV
1693
      void* pKey = tSimpleHashGetKey(pIte, NULL);
×
UNCOV
1694
      UNSET_TSDATA_FLAG(pIte, pTsDataState->pkValLen);
×
UNCOV
1695
      code = streamStatePutBatchOptimize(pTsDataState->pState, idx, batch, pKey, pIte, pTsDataState->pkValLen, 0,
×
1696
                                         pTempBuf);
UNCOV
1697
      QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
1698
      memset(pTempBuf, 0, len);
×
UNCOV
1699
      qDebug("flush ts data,table id:%" PRIu64 , *(uint64_t*)pKey);
×
1700
    }
1701
  }
1702

UNCOV
1703
  int32_t numOfElems = streamStateGetBatchSize(batch);
×
UNCOV
1704
  if (numOfElems > 0) {
×
UNCOV
1705
    code = streamStatePutBatch_rocksdb(pTsDataState->pState, batch);
×
UNCOV
1706
    QUERY_CHECK_CODE(code, lino, _end);
×
1707
  }
1708

UNCOV
1709
_end:
×
UNCOV
1710
  if (code != TSDB_CODE_SUCCESS) {
×
1711
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1712
  }
UNCOV
1713
  taosMemoryFree(pTempBuf);
×
UNCOV
1714
  streamStateDestroyBatch(batch);
×
UNCOV
1715
  return code;
×
1716
}
1717

UNCOV
1718
int32_t doRangeDataCommit(STableTsDataState* pTsDataState) {
×
UNCOV
1719
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1720
  int32_t lino = 0;
×
UNCOV
1721
  void*   batch = NULL;
×
1722

UNCOV
1723
  batch = streamStateCreateBatch();
×
UNCOV
1724
  QUERY_CHECK_NULL(batch, code, lino, _end, terrno);
×
UNCOV
1725
  int           idx = streamStateGetCfIdx(pTsDataState->pState, "sess");
×
UNCOV
1726
  int32_t       len = (pTsDataState->pkValLen + sizeof(uint64_t) + sizeof(int32_t) + 64) * 2;
×
1727

UNCOV
1728
  int32_t size = taosArrayGetSize(pTsDataState->pScanRanges);
×
UNCOV
1729
  for (int32_t i = 0; i < size; i++) {
×
1730
    SScanRange* pRange = taosArrayGet(pTsDataState->pScanRanges, i);
×
1731
    if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) {
×
1732
      code = streamStatePutBatch_rocksdb(pTsDataState->pState, batch);
×
1733
      streamStateClearBatch(batch);
×
1734
      QUERY_CHECK_CODE(code, lino, _end);
×
1735
    }
1736
    SSessionKey key = {.win = pRange->win, .groupId = 0};
×
1737
    int32_t     uidSize = tSimpleHashGetSize(pRange->pUIds);
×
1738
    int32_t     gpIdSize = tSimpleHashGetSize(pRange->pGroupIds);
×
1739
    int32_t     size = uidSize + gpIdSize;
×
1740
    uint64_t*   pIdBuf = (uint64_t*)taosMemoryCalloc(1, size);
×
1741
    void*       pIte = NULL;
×
1742
    int32_t     iter = 0;
×
1743
    int32_t     i = 0;
×
1744
    while ((pIte = tSimpleHashIterate(pTsDataState->pTableTsDataMap, pIte, &iter)) != NULL) {
×
1745
      void* pTempKey = tSimpleHashGetKey(pIte, NULL);
×
1746
      pIdBuf[i] = *(uint64_t*)pTempKey;
×
1747
      i++;
×
1748
    }
1749

1750
    code = streamStatePutBatchOptimize(pTsDataState->pState, idx, batch, &key, (void*)pIdBuf, size, 0,
×
1751
                                       NULL);
1752
    QUERY_CHECK_CODE(code, lino, _end);
×
1753
  }
1754

UNCOV
1755
  int32_t numOfElems = streamStateGetBatchSize(batch);
×
UNCOV
1756
  if (numOfElems > 0) {
×
1757
    code = streamStatePutBatch_rocksdb(pTsDataState->pState, batch);
×
1758
    QUERY_CHECK_CODE(code, lino, _end);
×
1759
  }
1760

UNCOV
1761
_end:
×
UNCOV
1762
  if (code != TSDB_CODE_SUCCESS) {
×
1763
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1764
  }
UNCOV
1765
  streamStateDestroyBatch(batch);
×
UNCOV
1766
  return code;
×
1767
}
1768

UNCOV
1769
int32_t initTsDataState(STableTsDataState** ppTsDataState, int8_t pkType, int32_t pkLen, void* pState, void* pOtherState) {
×
UNCOV
1770
  int32_t    code = TSDB_CODE_SUCCESS;
×
UNCOV
1771
  int32_t    lino = 0;
×
1772

UNCOV
1773
  STableTsDataState* pTsDataState = taosMemoryCalloc(1, sizeof(STableTsDataState));
×
UNCOV
1774
  QUERY_CHECK_NULL(pTsDataState, code, lino, _end, terrno);
×
1775

UNCOV
1776
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT);
×
UNCOV
1777
  pTsDataState->pTableTsDataMap = tSimpleHashInit(DEFAULT_STATE_MAP_CAPACITY, hashFn);
×
UNCOV
1778
  QUERY_CHECK_NULL(pTsDataState->pTableTsDataMap, code, lino, _end, terrno);
×
1779

UNCOV
1780
  pTsDataState->pkValLen = sizeof(TSKEY) + pkLen + sizeof(char);
×
UNCOV
1781
  pTsDataState->pPkValBuff = taosMemoryCalloc(1, pTsDataState->pkValLen);
×
UNCOV
1782
  QUERY_CHECK_NULL(pTsDataState->pPkValBuff, code, lino, _end, terrno);
×
1783

UNCOV
1784
  if (pkLen != 0) {
×
1785
    pTsDataState->comparePkColFn = getKeyComparFunc(pkType, TSDB_ORDER_ASC);
×
1786
  } else {
UNCOV
1787
    pTsDataState->comparePkColFn = NULL;
×
1788
  }
1789

UNCOV
1790
  pTsDataState->pScanRanges = taosArrayInit(64, sizeof(SScanRange));
×
UNCOV
1791
  QUERY_CHECK_NULL(pTsDataState->pScanRanges, code, lino, _end, terrno);
×
1792

UNCOV
1793
  pTsDataState->pState = pState;
×
UNCOV
1794
  pTsDataState->recValueLen = sizeof(SRecDataInfo) + pkLen;
×
UNCOV
1795
  pTsDataState->pRecValueBuff = taosMemoryCalloc(1, pTsDataState->recValueLen);
×
UNCOV
1796
  QUERY_CHECK_NULL(pTsDataState->pRecValueBuff, code, lino, _end, terrno);
×
1797

UNCOV
1798
  pTsDataState->curRecId = -1;
×
1799

UNCOV
1800
  pTsDataState->pStreamTaskState = pOtherState;
×
1801

UNCOV
1802
  pTsDataState->cfgIndex = streamStateGetCfIdx(pTsDataState->pState, "sess");
×
UNCOV
1803
  pTsDataState->pBatch = streamStateCreateBatch();
×
UNCOV
1804
  QUERY_CHECK_NULL(pTsDataState->pBatch, code, lino, _end, TSDB_CODE_FAILED);
×
1805
  
UNCOV
1806
  pTsDataState->batchBufflen = (pTsDataState->recValueLen + sizeof(uint64_t) + sizeof(int32_t) + 64) * 2;
×
UNCOV
1807
  pTsDataState->pBatchBuff = taosMemoryCalloc(1, pTsDataState->batchBufflen);
×
1808

UNCOV
1809
  (*ppTsDataState) = pTsDataState;
×
1810

UNCOV
1811
_end:
×
UNCOV
1812
  if (code != TSDB_CODE_SUCCESS) {
×
1813
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1814
  }
UNCOV
1815
  return code;
×
1816
}
1817

1818
static void destroyScanRange(SScanRange* pRange) {
×
1819
  pRange->win.skey = INT64_MIN;
×
1820
  pRange->win.ekey = INT64_MIN;
×
1821
  tSimpleHashCleanup(pRange->pUIds);
×
1822
  pRange->pUIds = NULL;
×
1823
  tSimpleHashCleanup(pRange->pGroupIds);
×
1824
  pRange->pGroupIds = NULL;
×
1825
}
×
1826

UNCOV
1827
void destroyTsDataState(STableTsDataState* pTsDataState) {
×
UNCOV
1828
  SArray* pScanRanges = pTsDataState->pScanRanges;
×
UNCOV
1829
  int32_t size = taosArrayGetSize(pScanRanges);
×
UNCOV
1830
  for (int32_t i = 0; i < size; i++) {
×
1831
    SScanRange* pRange = taosArrayGet(pScanRanges, i);
×
1832
    destroyScanRange(pRange);
×
1833
  }
UNCOV
1834
  taosArrayDestroy(pTsDataState->pScanRanges);
×
UNCOV
1835
  tSimpleHashCleanup(pTsDataState->pTableTsDataMap);
×
UNCOV
1836
  taosMemoryFreeClear(pTsDataState->pPkValBuff);
×
UNCOV
1837
  taosMemoryFreeClear(pTsDataState->pState);
×
UNCOV
1838
  taosMemoryFreeClear(pTsDataState->pRecValueBuff);
×
UNCOV
1839
  pTsDataState->pStreamTaskState = NULL;
×
1840

UNCOV
1841
  streamStateClearBatch(pTsDataState->pBatch);
×
UNCOV
1842
  streamStateDestroyBatch(pTsDataState->pBatch);
×
UNCOV
1843
  pTsDataState->pBatch = NULL;
×
UNCOV
1844
  taosMemoryFreeClear(pTsDataState->pBatchBuff);
×
1845

UNCOV
1846
  taosMemoryFreeClear(pTsDataState);
×
UNCOV
1847
}
×
1848

UNCOV
1849
int32_t recoverTsData(STableTsDataState* pTsDataState) {
×
UNCOV
1850
  int32_t          code = TSDB_CODE_SUCCESS;
×
UNCOV
1851
  int32_t          lino = 0;
×
UNCOV
1852
  SStreamStateCur* pCur = createStateCursor(NULL);
×
UNCOV
1853
  streamStateParTagSeekKeyNext_rocksdb(pTsDataState->pState, INT64_MIN, pCur);
×
UNCOV
1854
  while (1) {
×
UNCOV
1855
    uint64_t tableUid = 0;
×
UNCOV
1856
    void*    pVal = NULL;
×
UNCOV
1857
    int32_t  len = 0;
×
UNCOV
1858
    int32_t  winCode = streamStateParTagGetKVByCur_rocksdb(pCur, &tableUid, &pVal, &len);
×
UNCOV
1859
    if (winCode != TSDB_CODE_SUCCESS) {
×
UNCOV
1860
      break;
×
1861
    }
UNCOV
1862
    if (pTsDataState->pkValLen != len) {
×
1863
      taosMemoryFree(pVal);
×
1864
      streamStateCurNext_rocksdb(pCur);
×
1865
      continue;
×
1866
    }
UNCOV
1867
    UNSET_TSDATA_FLAG(pVal, len);
×
UNCOV
1868
    code = tSimpleHashPut(pTsDataState->pTableTsDataMap, &tableUid, sizeof(uint64_t), pVal, len);
×
UNCOV
1869
    taosMemoryFree(pVal);
×
UNCOV
1870
    QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
1871
    streamStateCurNext_rocksdb(pCur);
×
1872
  }
1873

UNCOV
1874
_end:
×
UNCOV
1875
  streamStateFreeCur(pCur);
×
UNCOV
1876
  if (code != TSDB_CODE_SUCCESS) {
×
1877
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1878
  }
UNCOV
1879
  return code;
×
1880
}
1881

UNCOV
1882
SStreamStateCur* getLastStateCur(SStreamFileState* pFileState, getStateBuffFn fn) {
×
UNCOV
1883
  SStreamStateCur* pCur = createStateCursor(pFileState);
×
UNCOV
1884
  if (pCur == NULL) {
×
1885
    return NULL;
×
1886
  }
UNCOV
1887
  SSHashObj* pSearchBuff = fn(pFileState);
×
UNCOV
1888
  pCur->buffIndex = 0;
×
UNCOV
1889
  pCur->hashIter = 0;
×
UNCOV
1890
  pCur->pHashData = NULL;
×
UNCOV
1891
  pCur->pHashData = tSimpleHashIterate(pSearchBuff, pCur->pHashData, &pCur->hashIter);
×
UNCOV
1892
  return pCur;
×
1893
}
1894

UNCOV
1895
void moveLastStateCurNext(SStreamStateCur* pCur, getStateBuffFn fn) {
×
UNCOV
1896
  SSHashObj* pSearchBuff = fn(pCur->pStreamFileState);
×
UNCOV
1897
  pCur->pHashData = tSimpleHashIterate(pSearchBuff, pCur->pHashData, &pCur->hashIter);
×
UNCOV
1898
}
×
1899

UNCOV
1900
int32_t getNLastStateKVByCur(SStreamStateCur* pCur, int32_t num, SArray* pRes) {
×
UNCOV
1901
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1902
  int32_t lino = 0;
×
UNCOV
1903
  SArray*  pWinStates = NULL;
×
UNCOV
1904
  int32_t size = 0;
×
1905

1906
  while(1) {
UNCOV
1907
    if (pCur->pHashData == NULL) {
×
UNCOV
1908
      return TSDB_CODE_FAILED;
×
1909
    }
UNCOV
1910
    pWinStates = *((void**)pCur->pHashData);
×
UNCOV
1911
    size = taosArrayGetSize(pWinStates);
×
UNCOV
1912
    if (size > 0) {
×
UNCOV
1913
      break;
×
1914
    }
1915
    moveLastStateCurNext(pCur, getSearchBuff);
×
1916
  }
1917

UNCOV
1918
  int32_t i = TMAX(size - num, 0);
×
1919

UNCOV
1920
  for ( ; i < size; i++) {
×
UNCOV
1921
    SWinKey* pKey = taosArrayGet(pWinStates, i);
×
UNCOV
1922
    int32_t  len = 0;
×
UNCOV
1923
    void*    pVal = NULL;
×
UNCOV
1924
    int32_t  winCode = TSDB_CODE_SUCCESS;
×
UNCOV
1925
    code = addRowBuffIfNotExist(pCur->pStreamFileState, (void*)pKey, sizeof(SWinKey), &pVal, &len, &winCode);
×
UNCOV
1926
    QUERY_CHECK_CODE(code, lino, _end);
×
1927

UNCOV
1928
    if (winCode != TSDB_CODE_SUCCESS) {
×
1929
      qError("%s failed at line %d since window not exist. ts:%" PRId64 ",groupId:%" PRIu64, __func__, __LINE__,
×
1930
             pKey->ts, pKey->groupId);
1931
    }
1932

UNCOV
1933
    void* pTempRes = taosArrayPush(pRes, &pVal);
×
UNCOV
1934
    QUERY_CHECK_NULL(pTempRes, code, lino, _end, terrno);
×
1935
  }
1936

UNCOV
1937
_end:
×
UNCOV
1938
  if (code != TSDB_CODE_SUCCESS) {
×
1939
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1940
  }
UNCOV
1941
  return code;
×
1942
}
1943

UNCOV
1944
int32_t reloadTsDataState(STableTsDataState* pTsDataState) {
×
UNCOV
1945
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1946
  int32_t lino = 0;
×
1947

UNCOV
1948
  STableTsDataState tmpState = *pTsDataState;
×
UNCOV
1949
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT);
×
UNCOV
1950
  tmpState.pTableTsDataMap = tSimpleHashInit(DEFAULT_STATE_MAP_CAPACITY, hashFn);
×
UNCOV
1951
  QUERY_CHECK_NULL(tmpState.pTableTsDataMap, code, lino, _end, terrno);
×
1952

UNCOV
1953
  code = recoverTsData(&tmpState);
×
UNCOV
1954
  QUERY_CHECK_CODE(code, lino, _end);
×
1955

UNCOV
1956
  void*   pIte = NULL;
×
UNCOV
1957
  int32_t iter = 0;
×
UNCOV
1958
  while ((pIte = tSimpleHashIterate(pTsDataState->pTableTsDataMap, pIte, &iter)) != NULL) {
×
1959
    size_t keyLen = 0;
×
1960
    void* pKey = tSimpleHashGetKey(pIte, &keyLen);
×
1961
    code = tSimpleHashPut(tmpState.pTableTsDataMap, pKey, keyLen, pIte, pTsDataState->pkValLen);
×
1962
    QUERY_CHECK_CODE(code, lino, _end);
×
1963
  }
UNCOV
1964
  tSimpleHashCleanup(pTsDataState->pTableTsDataMap);
×
UNCOV
1965
  pTsDataState->pTableTsDataMap = tmpState.pTableTsDataMap;
×
1966

UNCOV
1967
_end:
×
UNCOV
1968
  if (code != TSDB_CODE_SUCCESS) {
×
1969
    tSimpleHashCleanup(tmpState.pTableTsDataMap);
×
1970
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1971
  }
UNCOV
1972
  return code;
×
1973
}
1974

UNCOV
1975
int32_t saveRecInfoToDisk(STableTsDataState* pTsDataState, SSessionKey* pKey, SRecDataInfo* pVal, int32_t vLen) {
×
UNCOV
1976
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1977
  int32_t lino = 0;
×
1978

UNCOV
1979
  SStateSessionKey stateKey = {.key = *pKey, .opNum = ((SStreamState*)pTsDataState->pState)->number};
×
UNCOV
1980
  code = streamStatePutBatchOptimize(pTsDataState->pState, pTsDataState->cfgIndex, pTsDataState->pBatch, &stateKey, pVal, vLen, 0,
×
1981
                                     pTsDataState->pBatchBuff);
UNCOV
1982
  QUERY_CHECK_CODE(code, lino, _end);
×
1983

UNCOV
1984
  memset(pTsDataState->pBatchBuff, 0, pTsDataState->batchBufflen);
×
1985

UNCOV
1986
  if (streamStateGetBatchSize(pTsDataState->pBatch) >= BATCH_LIMIT) {
×
1987
    code = streamStatePutBatch_rocksdb(pTsDataState->pState, pTsDataState->pBatch);
×
1988
    streamStateClearBatch(pTsDataState->pBatch);
×
1989
    QUERY_CHECK_CODE(code, lino, _end);
×
1990
  }
1991

UNCOV
1992
_end:
×
UNCOV
1993
  if (code != TSDB_CODE_SUCCESS) {
×
1994
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1995
  }
UNCOV
1996
  return code;
×
1997
}
1998

UNCOV
1999
int32_t flushRemainRecInfoToDisk(STableTsDataState* pTsDataState) {
×
UNCOV
2000
  int32_t code = streamStatePutBatch_rocksdb(pTsDataState->pState, pTsDataState->pBatch);
×
UNCOV
2001
  streamStateClearBatch(pTsDataState->pBatch);
×
UNCOV
2002
  return code;
×
2003
}
2004

UNCOV
2005
int32_t recoverHashSortBuff(SStreamFileState* pFileState, SArray* pWinStates, uint64_t groupId) {
×
UNCOV
2006
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
2007
  int32_t lino = 0;
×
2008

UNCOV
2009
  SWinKey          start = {.groupId = groupId, .ts = INT64_MAX};
×
UNCOV
2010
  void*            pState = getStateFileStore(pFileState);
×
UNCOV
2011
  SStreamStateCur* pCur = streamStateSeekKeyPrev_rocksdb(pState, &start);
×
UNCOV
2012
  for (int32_t i = 0; i < NUM_OF_CACHE_WIN; i++) {
×
UNCOV
2013
    SWinKey tmpKey = {.groupId = groupId};
×
UNCOV
2014
    int32_t tmpRes = streamStateGetGroupKVByCur_rocksdb(pState, pCur, &tmpKey, NULL, 0);
×
UNCOV
2015
    if (tmpRes != TSDB_CODE_SUCCESS) {
×
UNCOV
2016
      break;
×
2017
    }
UNCOV
2018
    void* tmp = taosArrayPush(pWinStates, &tmpKey);
×
UNCOV
2019
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
UNCOV
2020
    streamStateCurPrev_rocksdb(pCur);
×
2021
  }
UNCOV
2022
  taosArraySort(pWinStates, winKeyCmprImpl);
×
UNCOV
2023
  streamStateFreeCur(pCur);
×
2024

UNCOV
2025
_end:
×
UNCOV
2026
  if (code != TSDB_CODE_SUCCESS) {
×
2027
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2028
  }
UNCOV
2029
  return code;
×
2030
}
2031

UNCOV
2032
int32_t getRowStateAllPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SArray* pResArray, int32_t maxNum) {
×
UNCOV
2033
  int32_t    code = TSDB_CODE_SUCCESS;
×
UNCOV
2034
  int32_t    lino = 0;
×
UNCOV
2035
  SWinKey*   pPrevKey = NULL;
×
UNCOV
2036
  SSHashObj* pSearchBuff = getSearchBuff(pFileState);
×
UNCOV
2037
  void*      pState = getStateFileStore(pFileState);
×
UNCOV
2038
  void**     ppBuff = (void**)tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
×
UNCOV
2039
  int32_t    num = 0;
×
UNCOV
2040
  if (ppBuff) {
×
UNCOV
2041
    SArray* pWinStates = (SArray*)(*ppBuff);
×
UNCOV
2042
    int32_t size = taosArrayGetSize(pWinStates);
×
UNCOV
2043
    int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
×
UNCOV
2044
    for (; index >= 0 && num <= maxNum; index--) {
×
UNCOV
2045
      pPrevKey = taosArrayGet(pWinStates, index);
×
UNCOV
2046
      if (winKeyCmprImpl(pPrevKey, pKey) == 0) {
×
2047
        continue;
×
2048
      }
UNCOV
2049
      void*   pVal = NULL;
×
UNCOV
2050
      int32_t len = 0;
×
UNCOV
2051
      int32_t winCode = TSDB_CODE_SUCCESS;
×
UNCOV
2052
      code = addRowBuffIfNotExist(pFileState, (void*)pPrevKey, sizeof(SWinKey), &pVal, &len, &winCode);
×
UNCOV
2053
      QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
2054
      void* tempRes = taosArrayPush(pResArray, &pVal);
×
UNCOV
2055
      QUERY_CHECK_NULL(tempRes, code, lino, _end, terrno);
×
UNCOV
2056
      num++;
×
2057
    }
2058
  }
2059

UNCOV
2060
_end:
×
UNCOV
2061
  if (code != TSDB_CODE_SUCCESS) {
×
2062
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2063
  }
UNCOV
2064
  return code;
×
2065
}
2066

UNCOV
2067
int32_t setStateRecFlag(SStreamFileState* pFileState, const void* pKey, int32_t keyLen, int32_t mode) {
×
UNCOV
2068
  return tSimpleHashPut(pFileState->pRecFlagMap, pKey, keyLen, &mode, sizeof(int32_t));
×
2069
}
2070

UNCOV
2071
int32_t getStateRecFlag(SStreamFileState* pFileState, const void* pKey, int32_t keyLen, int32_t* pMode) {
×
UNCOV
2072
  void* pVal = tSimpleHashGet(pFileState->pRecFlagMap, pKey, keyLen);
×
UNCOV
2073
  if (pVal == NULL) {
×
UNCOV
2074
    return TSDB_CODE_FAILED;
×
2075
  }
UNCOV
2076
  *pMode = *(int32_t*) pVal;
×
UNCOV
2077
  return TSDB_CODE_SUCCESS;
×
2078
}
2079

2080
void clearExpiredSessionState(SStreamFileState* pFileState, int32_t numOfKeep, TSKEY minTs, SSHashObj* pFlushGroup) {
×
2081
  int32_t    code = TSDB_CODE_SUCCESS;
×
2082
  int32_t    lino = 0;
×
2083
  SSHashObj* pSessionBuff = pFileState->rowStateBuff;
×
2084
  SStreamSnapshot* pFlushList = NULL;
×
2085
  if (pFlushGroup != NULL) {
×
2086
    pFlushList = tdListNew(POINTER_BYTES);
×
2087
  }
2088
  void*      pIte = NULL;
×
2089
  int32_t    iter = 0;
×
2090
  while ((pIte = tSimpleHashIterate(pSessionBuff, pIte, &iter)) != NULL) {
×
2091
    SArray* pWinStates = *((void**)pIte);
×
2092
    int32_t arraySize = TARRAY_SIZE(pWinStates);
×
2093
    if (minTs != INT64_MAX && arraySize > numOfKeep) {
×
2094
      SSessionKey key = {.win.skey = minTs, .win.ekey = minTs};
×
2095
      key.groupId = *(uint64_t*)tSimpleHashGetKey(pIte, NULL);
×
2096
      int32_t index = binarySearch(pWinStates, arraySize, &key, fillStateKeyCompare);
×
2097
      numOfKeep = TMAX(arraySize - index, MIN_NUM_OF_SORT_CACHE_WIN);
×
2098
      qDebug("modify numOfKeep, numOfKeep:%d. %s at line %d", numOfKeep, __func__, __LINE__);
×
2099
    }
2100

2101
    int32_t size = arraySize - numOfKeep;
×
2102
    for (int32_t i = 0; i < size; i++) {
×
2103
      SRowBuffPos* pPos = taosArrayGetP(pWinStates, i);
×
2104
      SSessionKey* pKey = pPos->pKey;
×
2105
      if (tSimpleHashGetSize(pFileState->pRecFlagMap) > 0) {
×
2106
        tSimpleHashRemove(pFileState->pRecFlagMap, pKey, sizeof(SSessionKey));
×
2107
      }
2108
      pPos->invalid = true;
×
2109

2110
      if (i == 0 && pFlushGroup != NULL) {
×
2111
        void* pGpVal = tSimpleHashGet(pFlushGroup, &pKey->groupId, sizeof(uint64_t));
×
2112
        if (pGpVal == NULL) {
×
2113
          code = tdListAppend(pFlushList, &pPos);
×
2114
          QUERY_CHECK_CODE(code, lino, _end);
×
2115
          code = tSimpleHashPut(pFlushGroup, &pKey->groupId, sizeof(uint64_t), NULL, 0);
×
2116
          QUERY_CHECK_CODE(code, lino, _end);
×
2117
          continue;
×
2118
        }
2119
      }
2120
      pPos->beFlushed = true;
×
2121
      qTrace("clear expired session buff, ts:%" PRId64 ",groupid:%" PRIu64 ". %s at line %d", pKey->win.skey, pKey->groupId, __func__, __LINE__);
×
2122

2123
      if (isFlushedState(pFileState, pKey->win.skey, 0)) {
×
2124
        int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey);
×
2125
        qTrace("clear expired file, ts:%" PRId64 ". %s at line %d res:%d", pKey->win.skey, __func__, __LINE__, code_file);
×
2126
      }
2127
    }
2128
    taosArrayRemoveBatch(pWinStates, 0, size, NULL);
×
2129
  }
2130

2131
  if (pFlushList != NULL) {
×
2132
    flushSnapshot(pFileState, pFlushList, false);
×
2133
    code = clearRowBuffNonFlush(pFileState);
×
2134
    QUERY_CHECK_CODE(code, lino, _end);
×
2135
    tdListFreeP(pFlushList, destroyRowBuffPosPtr);
×
2136
  }
2137

2138
_end:
×
2139
  if (code != TSDB_CODE_SUCCESS) {
×
2140
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2141
  }
2142
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc