• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3533

20 Nov 2024 07:11AM UTC coverage: 58.848% (-1.9%) from 60.78%
#3533

push

travis-ci

web-flow
Merge pull request #28823 from taosdata/fix/3.0/TD-32587

fix:[TD-32587]fix stmt segmentation fault

115578 of 252434 branches covered (45.79%)

Branch coverage included in aggregate %.

1 of 4 new or added lines in 1 file covered. (25.0%)

8038 existing lines in 233 files now uncovered.

194926 of 275199 relevant lines covered (70.83%)

1494459.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.06
/source/libs/stream/src/streamState.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamState.h"
17
#include "executor.h"
18
#include "osMemory.h"
19
#include "rocksdb/c.h"
20
#include "streamBackendRocksdb.h"
21
#include "streamInt.h"
22
#include "tcoding.h"
23
#include "tcommon.h"
24
#include "tcompare.h"
25
#include "tref.h"
26

27
#define MAX_TABLE_NAME_NUM 200000
28

29
int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
2,497✔
30
  if (pWin1->groupId > pWin2->groupId) {
2,497!
31
    return 1;
×
32
  } else if (pWin1->groupId < pWin2->groupId) {
2,497!
UNCOV
33
    return -1;
×
34
  }
35

36
  if (pWin1->win.skey > pWin2->win.ekey) {
2,497✔
37
    return 1;
390✔
38
  } else if (pWin1->win.ekey < pWin2->win.skey) {
2,107✔
39
    return -1;
572✔
40
  }
41

42
  return 0;
1,535✔
43
}
44

45
int countRangeKeyEqual(const SSessionKey* pWin1, const SSessionKey* pWin2) {
23✔
46
  if (pWin1->groupId == pWin2->groupId && pWin1->win.skey <= pWin2->win.skey && pWin2->win.skey <= pWin1->win.ekey) {
23!
47
    return 0;
17✔
48
  }
49

50
  return 1;
6✔
51
}
52

53
int sessionWinKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
170,370✔
54
  if (pWin1->groupId > pWin2->groupId) {
170,370✔
55
    return 1;
12,457✔
56
  } else if (pWin1->groupId < pWin2->groupId) {
157,913✔
57
    return -1;
22,034✔
58
  }
59

60
  if (pWin1->win.skey > pWin2->win.skey) {
135,879✔
61
    return 1;
7,721✔
62
  } else if (pWin1->win.skey < pWin2->win.skey) {
128,158✔
63
    return -1;
18,353✔
64
  }
65

66
  if (pWin1->win.ekey > pWin2->win.ekey) {
109,805✔
67
    return 1;
3,751✔
68
  } else if (pWin1->win.ekey < pWin2->win.ekey) {
106,054✔
69
    return -1;
6,078✔
70
  }
71

72
  return 0;
99,976✔
73
}
74

75
int stateSessionKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
163,947✔
76
  SStateSessionKey* pWin1 = (SStateSessionKey*)pKey1;
163,947✔
77
  SStateSessionKey* pWin2 = (SStateSessionKey*)pKey2;
163,947✔
78

79
  if (pWin1->opNum > pWin2->opNum) {
163,947✔
80
    return 1;
788✔
81
  } else if (pWin1->opNum < pWin2->opNum) {
163,159✔
82
    return -1;
5,492✔
83
  }
84

85
  return sessionWinKeyCmpr(&pWin1->key, &pWin2->key);
157,667✔
86
}
87

88
int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
33,109✔
89
  SStateKey* pWin1 = (SStateKey*)pKey1;
33,109✔
90
  SStateKey* pWin2 = (SStateKey*)pKey2;
33,109✔
91

92
  if (pWin1->opNum > pWin2->opNum) {
33,109✔
93
    return 1;
2,330✔
94
  } else if (pWin1->opNum < pWin2->opNum) {
30,779✔
95
    return -1;
1,525✔
96
  }
97

98
  return winKeyCmprImpl(&pWin1->key, &pWin2->key);
29,254✔
99
}
100

101
SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, int32_t taskId) {
1,869✔
102
  int32_t code = TSDB_CODE_SUCCESS;
1,869✔
103
  int32_t lino = 0;
1,869✔
104

105
  SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
1,869✔
106
  stDebug("open stream state %p, %s", pState, path);
1,870✔
107

108
  if (pState == NULL) {
1,870!
109
    code = terrno;
×
110
    QUERY_CHECK_CODE(code, lino, _end);
×
111
  }
112

113
  pState->pTdbState = taosMemoryCalloc(1, sizeof(STdbState));
1,870✔
114
  if (pState->pTdbState == NULL) {
1,870!
115
    streamStateDestroy(pState, true);
×
116
    code = terrno;
×
117
    QUERY_CHECK_CODE(code, lino, _end);
×
118
  }
119

120
  SStreamTask* pStreamTask = pTask;
1,870✔
121
  pState->streamId = streamId;
1,870✔
122
  pState->taskId = taskId;
1,870✔
123
  TAOS_UNUSED(tsnprintf(pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr), "0x%" PRIx64 "-0x%x", pState->streamId, pState->taskId));
1,870✔
124

125
  code = streamTaskSetDb(pStreamTask->pMeta, pTask, pState->pTdbState->idstr);
1,870✔
126
  QUERY_CHECK_CODE(code, lino, _end);
1,870!
127

128
  SStreamMeta* pMeta = pStreamTask->pMeta;
1,870✔
129
  pState->pTdbState->pOwner = pTask;
1,870✔
130
  pState->pFileState = NULL;
1,870✔
131
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT);
1,870✔
132
  pState->parNameMap = tSimpleHashInit(1024, hashFn);
1,870✔
133
  QUERY_CHECK_NULL(pState->parNameMap, code, lino, _end, terrno);
1,870!
134

135
  stInfo("open state %p on backend %p 0x%" PRIx64 "-%d succ", pState, pMeta->streamBackend, pState->streamId,
1,870!
136
         pState->taskId);
137
  return pState;
1,870✔
138

139
_end:
×
140
  if (code != TSDB_CODE_SUCCESS) {
×
141
    qError("0x%x %s failed at line %d since %s", taskId, __func__, lino, tstrerror(code));
×
142
  }
143
  return NULL;
×
144
}
145

146
int32_t streamStateDelTaskDb(SStreamState* pState) {
×
147
  SStreamTask* pTask = pState->pTdbState->pOwner;
×
148
  taskDbRemoveRef(pTask->pBackend);
×
149
  taosMemoryFree(pTask);
×
150
  return 0;
×
151
}
152
void streamStateClose(SStreamState* pState, bool remove) {
1,854✔
153
  SStreamTask* pTask = pState->pTdbState->pOwner;
1,854✔
154
  streamStateDestroy(pState, remove);
1,854✔
155
}
1,854✔
156

157
int32_t streamStateBegin(SStreamState* pState) { return 0; }
×
158

159
void streamStateCommit(SStreamState* pState) {
471✔
160
  if (pState->pFileState) {
471!
161
    SStreamSnapshot* pShot = getSnapshot(pState->pFileState);
471✔
162
    flushSnapshot(pState->pFileState, pShot, true);
471✔
163
  }
164
}
471✔
165

166
int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
180,157✔
167
  int32_t code = TSDB_CODE_SUCCESS;
180,157✔
168
  int32_t lino = 0;
180,157✔
169
  void*   pVal = NULL;
180,157✔
170
  int32_t len = getRowStateRowSize(pState->pFileState);
180,157✔
171
  int32_t tmpLen = len;
180,159✔
172
  code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &tmpLen);
180,159✔
173
  QUERY_CHECK_CODE(code, lino, _end);
180,155!
174

175
  char*   buf = ((SRowBuffPos*)pVal)->pRowBuff;
180,155✔
176
  int32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState);
180,155✔
177
  memcpy(buf + len - rowSize, value, vLen);
180,153✔
178

179
_end:
180,153✔
180
  if (code != TSDB_CODE_SUCCESS) {
180,153!
181
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
182
  }
183
  return code;
180,153✔
184
}
185
int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen) {
69,205✔
186
  int32_t code = TSDB_CODE_SUCCESS;
69,205✔
187
  int32_t lino = 0;
69,205✔
188
  void*   pVal = NULL;
69,205✔
189
  int32_t len = getRowStateRowSize(pState->pFileState);
69,205✔
190
  int32_t tmpLen = len;
69,205✔
191
  code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &tmpLen);
69,205✔
192
  QUERY_CHECK_CODE(code, lino, _end);
69,204!
193

194
  char*   buf = ((SRowBuffPos*)pVal)->pRowBuff;
69,204✔
195
  int32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState);
69,204✔
196
  *ppVal = buf + len - rowSize;
69,204✔
197
  streamStateReleaseBuf(pState, pVal, false);
69,204✔
198

199
_end:
69,203✔
200
  if (code != TSDB_CODE_SUCCESS) {
69,203!
201
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
202
  }
203
  return code;
69,203✔
204
}
205

206
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { return 0; }
×
207

208
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen, int32_t* pWinCode) {
263,702✔
209
  return addRowBuffIfNotExist(pState->pFileState, (void*)key, sizeof(SWinKey), pVal, pVLen, pWinCode);
263,702✔
210
}
211

212
bool streamStateCheck(SStreamState* pState, const SWinKey* key) {
25✔
213
  return hasRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey));
25✔
214
}
215

216
int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal) {
79,224✔
217
  int32_t code = getRowBuffByPos(pState->pFileState, pos, pVal);
79,224✔
218
  streamStateReleaseBuf(pState, pos, false);
79,224✔
219
  return code;
79,224✔
220
}
221

222
void streamStateDel(SStreamState* pState, const SWinKey* key) {
1,990✔
223
  deleteRowBuff(pState->pFileState, key, sizeof(SWinKey));
1,990✔
224
}
1,990✔
225

226
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
597✔
227
  return streamStateFillPut_rocksdb(pState, key, value, vLen);
597✔
228
}
229

230
int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen, int32_t* pWinCode) {
849✔
231
  if (pState->pFileState) {
849✔
232
    return getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), pVal, pVLen, pWinCode);
84✔
233
  }
234
  return streamStateFillGet_rocksdb(pState, key, pVal, pVLen);
765✔
235
}
236

237
int32_t streamStateFillAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen,
18✔
238
                                     int32_t* pWinCode) {
239
  return getHashSortRowBuff(pState->pFileState, key, pVal, pVLen, pWinCode);
18✔
240
}
241

242
int32_t streamStateFillGetNext(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen,
102✔
243
                               int32_t* pWinCode) {
244
  return getHashSortNextRow(pState->pFileState, pKey, pResKey, pVal, pVLen, pWinCode);
102✔
245
}
246

247
int32_t streamStateFillGetPrev(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen,
93✔
248
                               int32_t* pWinCode) {
249
  return getHashSortPrevRow(pState->pFileState, pKey, pResKey, pVal, pVLen, pWinCode);
93✔
250
}
251

252
void streamStateFillDel(SStreamState* pState, const SWinKey* key) {
60✔
253
  int32_t code = streamStateFillDel_rocksdb(pState, key);
60✔
254
  qTrace("%s at line %d res %d", __func__, __LINE__, code);
60!
255
}
60✔
256

257
void streamStateClear(SStreamState* pState) { streamFileStateClear(pState->pFileState); }
1,227✔
258

259
void streamStateSetNumber(SStreamState* pState, int32_t number, int32_t tsIdex) {
3,065✔
260
  pState->number = number;
3,065✔
261
  pState->tsIndex = tsIdex;
3,065✔
262
}
3,065✔
263

264
void streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen) {
398✔
265
  int32_t code = TSDB_CODE_SUCCESS;
398✔
266
  int32_t lino = 0;
398✔
267

268
  char* cfName = "default";
398✔
269
  void* batch = streamStateCreateBatch();
398✔
270
  code = streamStatePutBatch(pState, cfName, batch, pKey, pVal, vLen, 0);
398✔
271
  QUERY_CHECK_CODE(code, lino, _end);
398!
272

273
  code = streamStatePutBatch_rocksdb(pState, batch);
398✔
274
  QUERY_CHECK_CODE(code, lino, _end);
398!
275

276
_end:
398✔
277
  if (code != TSDB_CODE_SUCCESS) {
398!
278
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
279
  }
280
  streamStateDestroyBatch(batch);
398✔
281
}
398✔
282

283
int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen) {
3,461✔
284
  int32_t code = TSDB_CODE_SUCCESS;
3,461✔
285
  code = streamDefaultGet_rocksdb(pState, pKey, pVal, pLen);
3,461✔
286
  return code;
3,461✔
287
}
288

289
int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen,
263,699✔
290
                                 int32_t* pWinCode) {
291
  int32_t code = TSDB_CODE_SUCCESS;
263,699✔
292
  int32_t lino = 0;
263,699✔
293
  code = streamStateGet(pState, key, pVal, pVLen, pWinCode);
263,699✔
294
  QUERY_CHECK_CODE(code, lino, _end);
263,687!
295

296
  SSHashObj* pSearchBuff = getSearchBuff(pState->pFileState);
263,687✔
297
  if (pSearchBuff != NULL) {
263,685✔
298
    SArray* pWinStates = NULL;
60✔
299
    code = addArrayBuffIfNotExist(pSearchBuff, key->groupId, &pWinStates);
60✔
300
    QUERY_CHECK_CODE(code, lino, _end);
60!
301
    code = addSearchItem(pState->pFileState, pWinStates, key);
60✔
302
    QUERY_CHECK_CODE(code, lino, _end);
60!
303
  }
304
_end:
263,625✔
305
  if (code != TSDB_CODE_SUCCESS) {
263,685!
306
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
307
  }
308
  return code;
263,682✔
309
}
310

311
void streamStateReleaseBuf(SStreamState* pState, void* pVal, bool used) {
151,914✔
312
  if (!pVal) {
151,914✔
313
    return;
2,540✔
314
  }
315
  streamFileStateReleaseBuff(pState->pFileState, pVal, used);
149,374✔
316
}
317

318
void streamStateClearBuff(SStreamState* pState, void* pVal) { streamFileStateClearBuff(pState->pFileState, pVal); }
×
319

320
SStreamStateCur* streamStateFillGetCur(SStreamState* pState, const SWinKey* key) {
×
321
  return streamStateFillGetCur_rocksdb(pState, key);
×
322
}
323

324
SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key) {
75✔
325
  return streamStateGetAndCheckCur_rocksdb(pState, key);
75✔
326
}
327

328
int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
×
329
  return streamStateGetKVByCur_rocksdb(getStateFileStore(pCur->pStreamFileState), pCur, pKey, pVal, pVLen);
×
330
}
331

332
int32_t streamStateFillGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
×
333
  return streamStateFillGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
×
334
}
335

336
int32_t streamStateFillGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
1,434✔
337
  return streamStateFillGetGroupKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
1,434✔
338
}
339

340
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key) {
×
341
  return streamStateSeekKeyNext_rocksdb(pState, key);
×
342
}
343

344
SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* key) {
108✔
345
  return streamStateFillSeekKeyNext_rocksdb(pState, key);
108✔
346
}
347

348
SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey* key) {
688✔
349
  return streamStateFillSeekKeyPrev_rocksdb(pState, key);
688✔
350
}
351

352
void streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) { sessionWinStateMoveToNext(pCur); }
1,251✔
353

354
void streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) {
×
355
  qTrace("move cursor to next");
×
356
  streamStateCurPrev_rocksdb(pCur);
×
357
}
×
358

359
void streamStateResetCur(SStreamStateCur* pCur) {
10,232✔
360
  if (!pCur) {
10,232!
361
    return;
×
362
  }
363
  if (pCur->iter) rocksdb_iter_destroy(pCur->iter);
10,232✔
364
  if (pCur->snapshot) rocksdb_release_snapshot(pCur->db, pCur->snapshot);
10,232✔
365
  if (pCur->readOpt) rocksdb_readoptions_destroy(pCur->readOpt);
10,232✔
366

367
  memset(pCur, 0, sizeof(SStreamStateCur));
10,232✔
368

369
  pCur->buffIndex = -1;
10,232✔
370
}
371

372
void streamStateFreeCur(SStreamStateCur* pCur) {
13,253✔
373
  if (!pCur) {
13,253✔
374
    return;
3,298✔
375
  }
376
  streamStateResetCur(pCur);
9,955✔
377
  taosMemoryFree(pCur);
9,955✔
378
}
379

380
void streamStateFreeVal(void* val) { taosMemoryFree(val); }
82,549✔
381

382
int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void* value, int32_t vLen) {
2,774✔
383
  int32_t      code = TSDB_CODE_SUCCESS;
2,774✔
384
  int32_t      lino = 0;
2,774✔
385
  SRowBuffPos* pos = (SRowBuffPos*)value;
2,774✔
386
  if (pos->needFree) {
2,774✔
387
    if (isFlushedState(pState->pFileState, key->win.ekey, 0)) {
250✔
388
      if (!pos->pRowBuff) {
242!
389
        goto _end;
×
390
      }
391
      code = streamStateSessionPut_rocksdb(pState, key, pos->pRowBuff, vLen);
242✔
392
      QUERY_CHECK_CODE(code, lino, _end);
242!
393

394
      streamStateReleaseBuf(pState, pos, true);
242✔
395
      code = putFreeBuff(pState->pFileState, pos);
242✔
396
      QUERY_CHECK_CODE(code, lino, _end);
242!
397

398
      stDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 ".code:%d", key->win.skey,
242!
399
              key->win.ekey, key->groupId, code);
400
    } else {
401
      pos->beFlushed = false;
8✔
402
      code = putSessionWinResultBuff(pState->pFileState, value);
8✔
403
      QUERY_CHECK_CODE(code, lino, _end);
8!
404
    }
405
  }
406

407
_end:
2,532✔
408
  if (code != TSDB_CODE_SUCCESS) {
2,774!
409
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
410
  }
411
  return code;
2,774✔
412
}
413

414
int32_t streamStateSessionAllocWinBuffByNextPosition(SStreamState* pState, SStreamStateCur* pCur,
42✔
415
                                                     const SSessionKey* pKey, void** pVal, int32_t* pVLen) {
416
  return allocSessioncWinBuffByNextPosition(pState->pFileState, pCur, pKey, pVal, pVLen);
42✔
417
}
418

419
int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen, int32_t* pWinCode) {
67✔
420
  return getSessionFlushedBuff(pState->pFileState, key, pVal, pVLen, pWinCode);
67✔
421
}
422

423
void streamStateSessionDel(SStreamState* pState, const SSessionKey* key) {
767✔
424
  qDebug("===stream===delete skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64, key->win.skey, key->win.ekey,
767!
425
         key->groupId);
426
  deleteRowBuff(pState->pFileState, key, sizeof(SSessionKey));
767✔
427
}
767✔
428

429
void streamStateSessionReset(SStreamState* pState, void* pVal) {
4✔
430
  int32_t len = getRowStateRowSize(pState->pFileState);
4✔
431
  memset(pVal, 0, len);
4✔
432
}
4✔
433

434
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key) {
74✔
435
  return sessionWinStateSeekKeyCurrentPrev(pState->pFileState, key);
74✔
436
}
437

438
SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, const SSessionKey* key) {
130✔
439
  return sessionWinStateSeekKeyCurrentNext(pState->pFileState, key);
130✔
440
}
441

442
SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key) {
2,636✔
443
  return sessionWinStateSeekKeyNext(pState->pFileState, key);
2,636✔
444
}
445

446
SStreamStateCur* streamStateCountSeekKeyPrev(SStreamState* pState, const SSessionKey* key, COUNT_TYPE count) {
61✔
447
  return countWinStateSeekKeyPrev(pState->pFileState, key, count);
61✔
448
}
449

450
int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
3,054✔
451
  return sessionWinStateGetKVByCur(pCur, pKey, pVal, pVLen);
3,054✔
452
}
453

454
void streamStateSessionClear(SStreamState* pState) {
138✔
455
  sessionWinStateClear(pState->pFileState);
138✔
456
  streamStateSessionClear_rocksdb(pState);
138✔
457
}
138✔
458

459
int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
1,963✔
460
  return sessionWinStateGetKeyByRange(pState->pFileState, key, curKey, sessionRangeKeyCmpr);
1,963✔
461
}
462

463
int32_t streamStateCountGetKeyByRange(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
24✔
464
  return sessionWinStateGetKeyByRange(pState->pFileState, key, curKey, countRangeKeyEqual);
24✔
465
}
466

467
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen,
932✔
468
                                        int32_t* pWinCode) {
469
  return getSessionWinResultBuff(pState->pFileState, key, gap, pVal, pVLen, pWinCode);
932✔
470
}
471

472
int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
1,556✔
473
                                      state_key_cmpr_fn fn, void** pVal, int32_t* pVLen, int32_t* pWinCode) {
474
  return getStateWinResultBuff(pState->pFileState, key, pKeyData, keyDataLen, fn, pVal, pVLen, pWinCode);
1,556✔
475
}
476

477
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
366✔
478
  int32_t code = TSDB_CODE_SUCCESS;
366✔
479
  int32_t lino = 0;
366✔
480
  if (tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)) == NULL) {
366✔
481
    if (tSimpleHashGetSize(pState->parNameMap) < MAX_TABLE_NAME_NUM) {
353!
482
      code = tSimpleHashPut(pState->parNameMap, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN);
353✔
483
      QUERY_CHECK_CODE(code, lino, _end);
353!
484
    }
485
    code = streamStatePutParName_rocksdb(pState, groupId, tbname);
353✔
486
    QUERY_CHECK_CODE(code, lino, _end);
353!
487
  }
488

489
_end:
366✔
490
  if (code != TSDB_CODE_SUCCESS) {
366!
491
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
492
  }
493
  return code;
366✔
494
}
495

496
int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal, bool onlyCache, int32_t* pWinCode) {
82,685✔
497
  int32_t code = TSDB_CODE_SUCCESS;
82,685✔
498
  int32_t lino = 0;
82,685✔
499
  void*   pStr = tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t));
82,685✔
500
  if (!pStr) {
82,686✔
501
    if (onlyCache && tSimpleHashGetSize(pState->parNameMap) < MAX_TABLE_NAME_NUM) {
79,650!
502
      (*pWinCode) = TSDB_CODE_FAILED;
965✔
503
      goto _end;
965✔
504
    }
505
    (*pWinCode) = streamStateGetParName_rocksdb(pState, groupId, pVal);
78,685✔
506
    if ((*pWinCode) == TSDB_CODE_SUCCESS && tSimpleHashGetSize(pState->parNameMap) < MAX_TABLE_NAME_NUM) {
78,685!
507
      code = tSimpleHashPut(pState->parNameMap, &groupId, sizeof(int64_t), *pVal, TSDB_TABLE_NAME_LEN);
10✔
508
      QUERY_CHECK_CODE(code, lino, _end);
10!
509
    }
510
    goto _end;
78,685✔
511
  }
512
  *pVal = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
3,036✔
513
  if (!(*pVal)) {
3,036!
514
    code = terrno;
×
515
    QUERY_CHECK_CODE(code, lino, _end);
×
516
  }
517

518
  memcpy(*pVal, pStr, TSDB_TABLE_NAME_LEN);
3,036✔
519
  (*pWinCode) = TSDB_CODE_SUCCESS;
3,036✔
520

521
_end:
82,686✔
522
  if (code != TSDB_CODE_SUCCESS) {
82,686!
523
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
524
  }
525
  return code;
82,686✔
526
}
527

UNCOV
528
int32_t streamStateDeleteParName(SStreamState* pState, int64_t groupId) {
×
UNCOV
529
  int32_t code = tSimpleHashRemove(pState->parNameMap, &groupId, sizeof(int64_t));
×
UNCOV
530
  if (TSDB_CODE_SUCCESS != code) {
×
531
    qWarn("failed to remove parname from cache, code:%d", code);
×
532
  }
UNCOV
533
  code = streamStateDeleteParName_rocksdb(pState, groupId);
×
UNCOV
534
  if (TSDB_CODE_SUCCESS != code) {
×
535
    qWarn("failed to remove parname from rocksdb, code:%d", code);
×
536
  }
UNCOV
537
  return TSDB_CODE_SUCCESS;
×
538
}
539

540
void streamStateDestroy(SStreamState* pState, bool remove) {
1,854✔
541
  streamFileStateDestroy(pState->pFileState);
1,854✔
542
  // streamStateDestroy_rocksdb(pState, remove);
543
  tSimpleHashCleanup(pState->parNameMap);
1,854✔
544
  // do nothong
545
  taosMemoryFreeClear(pState->pTdbState);
1,854!
546
  taosMemoryFreeClear(pState);
1,854!
547
}
1,854✔
548

549
int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark) {
×
550
  return deleteExpiredCheckPoint(pState->pFileState, mark);
×
551
}
552

553
void streamStateReloadInfo(SStreamState* pState, TSKEY ts) { streamFileStateReloadInfo(pState->pFileState, ts); }
119✔
554

555
void streamStateCopyBackend(SStreamState* src, SStreamState* dst) {
548✔
556
  dst->pFileState = src->pFileState;
548✔
557
  dst->parNameMap = src->parNameMap;
548✔
558
  dst->number = src->number;
548✔
559
  dst->taskId = src->taskId;
548✔
560
  dst->streamId = src->streamId;
548✔
561
  if (dst->pTdbState == NULL) {
548!
562
    dst->pTdbState = taosMemoryCalloc(1, sizeof(STdbState));
548✔
563
    dst->pTdbState->pOwner = taosMemoryCalloc(1, sizeof(SStreamTask));
548✔
564
  }
565
  dst->dump = 1;
548✔
566
  dst->pTdbState->pOwner->pBackend = src->pTdbState->pOwner->pBackend;
548✔
567
  dst->pResultRowStore.resultRowPut = src->pResultRowStore.resultRowPut;
548✔
568
  dst->pResultRowStore.resultRowGet = src->pResultRowStore.resultRowGet;
548✔
569
  dst->pExprSupp = src->pExprSupp;
548✔
570
  return;
548✔
571
}
572
SStreamStateCur* createStreamStateCursor() {
9,957✔
573
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
9,957✔
574
  if (pCur == NULL) {
9,957!
575
    return NULL;
×
576
  }
577
  pCur->buffIndex = -1;
9,957✔
578
  return pCur;
9,957✔
579
}
580

581
// count window
582
int32_t streamStateCountWinAddIfNotExist(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** ppVal,
72✔
583
                                         int32_t* pVLen, int32_t* pWinCode) {
584
  return getCountWinResultBuff(pState->pFileState, pKey, winCount, ppVal, pVLen, pWinCode);
72✔
585
}
586

587
int32_t streamStateCountWinAdd(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen) {
42✔
588
  return createCountWinResultBuff(pState->pFileState, pKey, winCount, pVal, pVLen);
42✔
589
}
590

591
int32_t streamStateGroupPut(SStreamState* pState, int64_t groupId, void* value, int32_t vLen) {
62✔
592
  return streamFileStateGroupPut(pState->pFileState, groupId, value, vLen);
62✔
593
}
594

595
SStreamStateCur* streamStateGroupGetCur(SStreamState* pState) {
437✔
596
  SStreamStateCur* pCur = createStateCursor(pState->pFileState);
437✔
597
  pCur->hashIter = 0;
437✔
598
  pCur->pHashData = NULL;
437✔
599
  SSHashObj* pMap = getGroupIdCache(pState->pFileState);
437✔
600
  pCur->pHashData = tSimpleHashIterate(pMap, pCur->pHashData, &pCur->hashIter);
437✔
601
  if (pCur->pHashData == NULL) {
437✔
602
    pCur->hashIter = -1;
302✔
603
    streamStateParTagSeekKeyNext_rocksdb(pState, INT64_MIN, pCur);
302✔
604
  }
605
  return pCur;
437✔
606
}
607

608
void streamStateGroupCurNext(SStreamStateCur* pCur) {
217✔
609
  streamFileStateGroupCurNext(pCur);
217✔
610
}
217✔
611

612
int32_t streamStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void** pVal, int32_t* pVLen) {
654✔
613
  if (pVal != NULL) {
654!
614
    return -1;
×
615
  }
616
  return streamFileStateGroupGetKVByCur(pCur, pKey, pVal, pVLen);
654✔
617
}
618

619
void streamStateClearExpiredState(SStreamState* pState) {
564✔
620
  clearExpiredState(pState->pFileState);
564✔
621
}
564✔
622

623
void streamStateSetFillInfo(SStreamState* pState) {
14✔
624
  setFillInfo(pState->pFileState);
14✔
625
}
14✔
626

627
int32_t streamStateGetPrev(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen,
56✔
628
                           int32_t* pWinCode) {
629
  return getRowStatePrevRow(pState->pFileState, pKey, pResKey, pVal, pVLen, pWinCode);
56✔
630
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc