• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3942

25 Apr 2025 11:21AM UTC coverage: 62.853% (+0.3%) from 62.507%
#3942

push

travis-ci

web-flow
docs: jdbc tmq supports database subscription. [TS-6222] (#30819)

* docs: jdbc tmq supports database subscription. [TS-6222]

* Update docs/zh/07-develop/07-tmq.md

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* Update 07-tmq.md

---------

Co-authored-by: haoranchen <haoran920c@163.com>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

156603 of 317531 branches covered (49.32%)

Branch coverage included in aggregate %.

241895 of 316485 relevant lines covered (76.43%)

6664240.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.73
/source/libs/stream/src/streamBackendRocksdb.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamBackendRocksdb.h"
17
#include "lz4.h"
18
#include "streamInt.h"
19
#include "tcommon.h"
20
#include "tref.h"
21

22
typedef struct SCompactFilteFactory {
23
  void* status;
24
} SCompactFilteFactory;
25

26
typedef struct {
27
  rocksdb_t*                       db;
28
  rocksdb_column_family_handle_t** pHandle;
29
  rocksdb_writeoptions_t*          wOpt;
30
  rocksdb_readoptions_t*           rOpt;
31
  rocksdb_options_t**              cfOpt;
32
  rocksdb_options_t*               dbOpt;
33
  RocksdbCfParam*                  param;
34
  void*                            pBackend;
35
  SListNode*                       pCompareNode;
36
  rocksdb_comparator_t**           pCompares;
37
} RocksdbCfInst;
38

39
int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf);
40

41
void            destroyRocksdbCfInst(RocksdbCfInst* inst);
42
int32_t         getCfIdx(const char* cfName);
43
STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath);
44

45
static int32_t backendCopyFiles(const char* src, const char* dst);
46

47
void        destroyCompactFilteFactory(void* arg);
48
void        destroyCompactFilte(void* arg);
49
const char* compactFilteFactoryName(void* arg);
50
const char* compactFilteFactoryNameSess(void* arg);
51
const char* compactFilteFactoryNameState(void* arg);
52
const char* compactFilteFactoryNameFunc(void* arg);
53
const char* compactFilteFactoryNameFill(void* arg);
54

55
const char* compactFilteName(void* arg);
56
const char* compactFilteNameSess(void* arg);
57
const char* compactFilteNameState(void* arg);
58
const char* compactFilteNameFill(void* arg);
59
const char* compactFilteNameFunc(void* arg);
60

61
unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
62
                           char** newval, size_t* newvlen, unsigned char* value_changed);
63
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx);
64
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterSess(void* arg, rocksdb_compactionfiltercontext_t* ctx);
65
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterState(void* arg, rocksdb_compactionfiltercontext_t* ctx);
66
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFunc(void* arg, rocksdb_compactionfiltercontext_t* ctx);
67
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFill(void* arg, rocksdb_compactionfiltercontext_t* ctx);
68

69
typedef int (*__db_key_encode_fn_t)(void* key, char* buf);
70
typedef int (*__db_key_decode_fn_t)(void* key, char* buf);
71
typedef int (*__db_key_tostr_fn_t)(void* key, char* buf);
72
typedef const char* (*__db_key_cmpname_fn_t)(void* statue);
73
typedef int (*__db_key_cmp_fn_t)(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
74
typedef void (*__db_key_cmp_destroy_fn_t)(void* state);
75
typedef int32_t (*__db_value_encode_fn_t)(void* value, int32_t vlen, int64_t ttl, char** dest);
76
typedef int32_t (*__db_value_decode_fn_t)(void* value, int32_t vlen, int64_t* ttl, char** dest);
77

78
typedef rocksdb_compactionfilter_t* (*__db_factory_create_fn_t)(void* arg, rocksdb_compactionfiltercontext_t* ctx);
79
typedef const char* (*__db_factory_name_fn_t)(void* arg);
80
typedef void (*__db_factory_destroy_fn_t)(void* arg);
81
typedef struct {
82
  const char*               key;
83
  int32_t                   len;
84
  int                       idx;
85
  __db_key_cmp_fn_t         cmpKey;
86
  __db_key_encode_fn_t      enFunc;
87
  __db_key_decode_fn_t      deFunc;
88
  __db_key_tostr_fn_t       toStrFunc;
89
  __db_key_cmpname_fn_t     cmpName;
90
  __db_key_cmp_destroy_fn_t destroyCmp;
91
  __db_value_encode_fn_t    enValueFunc;
92
  __db_value_decode_fn_t    deValueFunc;
93

94
  __db_factory_create_fn_t  createFilter;
95
  __db_factory_destroy_fn_t destroyFilter;
96
  __db_factory_name_fn_t    funcName;
97

98
} SCfInit;
99

100
const char* compareDefaultName(void* name);
101
const char* compareStateName(void* name);
102
const char* compareWinKeyName(void* name);
103
const char* compareSessionKeyName(void* name);
104
const char* compareFuncKeyName(void* name);
105
const char* compareParKeyName(void* name);
106
const char* comparePartagKeyName(void* name);
107

108
int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
109
int defaultKeyEncode(void* k, char* buf);
110
int defaultKeyDecode(void* k, char* buf);
111
int defaultKeyToString(void* k, char* buf);
112

113
int stateKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
114
int stateKeyEncode(void* k, char* buf);
115
int stateKeyDecode(void* k, char* buf);
116
int stateKeyToString(void* k, char* buf);
117

118
int stateSessionKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
119
int stateSessionKeyEncode(void* ses, char* buf);
120
int stateSessionKeyDecode(void* ses, char* buf);
121
int stateSessionKeyToString(void* k, char* buf);
122

123
int winKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
124
int winKeyEncode(void* k, char* buf);
125
int winKeyDecode(void* k, char* buf);
126
int winKeyToString(void* k, char* buf);
127

128
int tupleKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
129
int tupleKeyEncode(void* k, char* buf);
130
int tupleKeyDecode(void* k, char* buf);
131
int tupleKeyToString(void* k, char* buf);
132

133
int parKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
134
int parKeyEncode(void* k, char* buf);
135
int parKeyDecode(void* k, char* buf);
136
int parKeyToString(void* k, char* buf);
137

138
int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest);
139
int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest);
140
int32_t valueToString(void* k, char* buf);
141
int32_t valueIsStale(void* k, int64_t ts);
142

143
void        destroyCompare(void* arg);
144
static void cleanDir(const char* pPath, const char* id);
145

146
static bool                streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len);
147
static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName,
148
                                                 rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt);
149

150
void taskDbRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp);
151
void taskDbUnRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp);
152

153
int32_t chkpAddExtraInfo(char* pChkpIdDir, int64_t chkpId, int64_t processId);
154
int32_t chkpLoadExtraInfo(char* pChkpIdDir, int64_t* chkpId, int64_t* processId);
155

156
int32_t  copyFiles(const char* src, const char* dst);
157
uint32_t nextPow2(uint32_t x);
158

159
SCfInit ginitDict[] = {
160
    {"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName,
161
     destroyCompare, valueEncode, valueDecode, compactFilteFactoryCreateFilter, destroyCompactFilteFactory,
162
     compactFilteFactoryName},
163

164
    {"state", 5, 1, stateKeyDBComp, stateKeyEncode, stateKeyDecode, stateKeyToString, compareStateName, destroyCompare,
165
     valueEncode, valueDecode, compactFilteFactoryCreateFilterState, destroyCompactFilteFactory,
166
     compactFilteFactoryNameState},
167

168
    {"fill", 4, 2, winKeyDBComp, winKeyEncode, winKeyDecode, winKeyToString, compareWinKeyName, destroyCompare,
169
     valueEncode, valueDecode, compactFilteFactoryCreateFilterFill, destroyCompactFilteFactory,
170
     compactFilteFactoryNameFill},
171

172
    {"sess", 4, 3, stateSessionKeyDBComp, stateSessionKeyEncode, stateSessionKeyDecode, stateSessionKeyToString,
173
     compareSessionKeyName, destroyCompare, valueEncode, valueDecode, compactFilteFactoryCreateFilterSess,
174
     destroyCompactFilteFactory, compactFilteFactoryNameSess},
175

176
    {"func", 4, 4, tupleKeyDBComp, tupleKeyEncode, tupleKeyDecode, tupleKeyToString, compareFuncKeyName, destroyCompare,
177
     valueEncode, valueDecode, compactFilteFactoryCreateFilterFunc, destroyCompactFilteFactory,
178
     compactFilteFactoryNameFunc},
179

180
    {"parname", 7, 5, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, compareParKeyName, destroyCompare,
181
     valueEncode, valueDecode, compactFilteFactoryCreateFilter, destroyCompactFilteFactory, compactFilteFactoryName},
182

183
    {"partag", 6, 6, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, comparePartagKeyName, destroyCompare,
184
     valueEncode, valueDecode, compactFilteFactoryCreateFilter, destroyCompactFilteFactory, compactFilteFactoryName},
185
};
186

187
int32_t getCfIdx(const char* cfName) {
5,448✔
188
  int    idx = -1;
5,448✔
189
  size_t len = strlen(cfName);
5,448✔
190
  for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
5,534!
191
    if (len == ginitDict[i].len && strncmp(cfName, ginitDict[i].key, strlen(cfName)) == 0) {
5,534✔
192
      idx = i;
5,448✔
193
      break;
5,448✔
194
    }
195
  }
196
  return idx;
5,448✔
197
}
198

199
bool isValidCheckpoint(const char* dir) {
13✔
200
  // not implement yet
201
  return true;
13✔
202
}
203

204
/*
205
 *copy pChkpIdDir's file to state dir
206
 */
207
int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
2✔
208
  // impl later
209
  int32_t code = 0;
2✔
210
  int32_t cap = strlen(path) + 64;
2✔
211
  int32_t nBytes = 0;
2✔
212

213
  char* state = taosMemoryCalloc(1, cap);
2!
214
  if (state == NULL) {
2!
215
    return terrno;
×
216
  }
217

218
  nBytes = snprintf(state, cap, "%s%s%s", path, TD_DIRSEP, "state");
2✔
219
  if (nBytes <= 0 || nBytes >= cap) {
2!
220
    taosMemoryFree(state);
×
221
    return TSDB_CODE_OUT_OF_RANGE;
×
222
  }
223

224
  if (chkpId != 0) {
2!
225
    char* chkp = taosMemoryCalloc(1, cap);
2!
226
    if (chkp == NULL) {
2!
227
      taosMemoryFree(state);
×
228
      return terrno;
×
229
    }
230

231
    nBytes = snprintf(chkp, cap, "%s%s%s%scheckpoint%" PRId64, path, TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId);
2✔
232
    if (nBytes <= 0 || nBytes >= cap) {
2!
233
      taosMemoryFree(state);
×
234
      taosMemoryFree(chkp);
×
235
      return TSDB_CODE_OUT_OF_RANGE;
×
236
    }
237

238
    if (taosIsDir(chkp) && isValidCheckpoint(chkp)) {
2!
239
      cleanDir(state, "");
×
240
      code = backendCopyFiles(chkp, state);
×
241
      if (code != 0) {
×
242
        stError("failed to restart stream backend from %s, reason: %s", chkp, tstrerror(TAOS_SYSTEM_ERROR(code)));
×
243
      } else {
244
        stInfo("start to restart stream backend at checkpoint path: %s", chkp);
×
245
      }
246

247
    } else {
248
      stError("failed to start stream backend at %s, reason: %s, restart from default state dir:%s", chkp,
2!
249
              tstrerror(terrno), state);
250
      code = taosMkDir(state);
2✔
251
      if (code != 0) {
2!
252
        code = TAOS_SYSTEM_ERROR(ERRNO);
×
253
      }
254
    }
255

256
    taosMemoryFree(chkp);
2!
257
  }
258

259
  *dst = state;
2✔
260
  return code;
2✔
261
}
262

263
typedef struct {
264
  char    pCurrName[24];
265
  int64_t currChkptId;
266

267
  char    pManifestName[24];
268
  int64_t manifestChkptId;
269

270
  char    processName[24];
271
  int64_t processId;
272
} SSChkpMetaOnS3;
273

274
int32_t remoteChkp_readMetaData(char* path, SSChkpMetaOnS3** pMeta) {
1✔
275
  int32_t   code = 0;
1✔
276
  int32_t   cap = strlen(path) + 32;
1✔
277
  TdFilePtr pFile = NULL;
1✔
278

279
  char* metaPath = taosMemoryCalloc(1, cap);
1!
280
  if (metaPath == NULL) {
1!
281
    return terrno;
×
282
  }
283

284
  int32_t n = snprintf(metaPath, cap, "%s%s%s", path, TD_DIRSEP, "META");
1✔
285
  if (n <= 0 || n >= cap) {
1!
286
    taosMemoryFree(metaPath);
×
287
    return TSDB_CODE_OUT_OF_MEMORY;
×
288
  }
289

290
  pFile = taosOpenFile(path, TD_FILE_READ);
1✔
291
  if (pFile == NULL) {
1!
292
    code = terrno;
1✔
293
    goto _EXIT;
1✔
294
  }
295

296
  char buf[256] = {0};
×
297
  if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) {
×
298
    code = terrno;
×
299
    goto _EXIT;
×
300
  }
301

302
  SSChkpMetaOnS3* p = taosMemoryCalloc(1, sizeof(SSChkpMetaOnS3));
×
303
  if (p == NULL) {
×
304
    code = terrno;
×
305
    goto _EXIT;
×
306
  }
307
  n = sscanf(buf, META_ON_S3_FORMATE, p->pCurrName, &p->currChkptId, p->pManifestName, &p->manifestChkptId,
×
308
             p->processName, &p->processId);
×
309
  if (n != 6) {
×
310
    code = TSDB_CODE_INVALID_MSG;
×
311
    taosMemoryFree(p);
×
312
    goto _EXIT;
×
313
  }
314

315
  if (p->currChkptId != p->manifestChkptId) {
×
316
    code = TSDB_CODE_INVALID_MSG;
×
317
    taosMemoryFree(p);
×
318
    goto _EXIT;
×
319
  }
320
  *pMeta = p;
×
321
  code = 0;
×
322
_EXIT:
1✔
323
  taosCloseFile(&pFile);
1✔
324
  taosMemoryFree(metaPath);
1!
325
  return code;
1✔
326
}
327

328
int32_t remoteChkp_validAndCvtMeta(char* path, SSChkpMetaOnS3* pMeta, int64_t chkpId) {
×
329
  int32_t code = 0;
×
330
  int32_t nBytes = 0;
×
331

332
  int32_t cap = strlen(path) + 64;
×
333
  char*   src = taosMemoryCalloc(1, cap);
×
334
  char*   dst = taosMemoryCalloc(1, cap);
×
335
  if (src == NULL || dst == NULL) {
×
336
    code = terrno;
×
337
    goto _EXIT;
×
338
  }
339

340
  if (pMeta->currChkptId != chkpId || pMeta->manifestChkptId != chkpId) {
×
341
    code = TSDB_CODE_INVALID_CFG;
×
342
    goto _EXIT;
×
343
  }
344
  // rename current_chkp/mainfest to current
345
  for (int i = 0; i < 2; i++) {
×
346
    char* key = (i == 0 ? pMeta->pCurrName : pMeta->pManifestName);
×
347
    if (strlen(key) <= 0) {
×
348
      code = TSDB_CODE_INVALID_PARA;
×
349
      goto _EXIT;
×
350
    }
351

352
    nBytes = snprintf(src, cap, "%s%s%s_%" PRId64, path, TD_DIRSEP, key, pMeta->currChkptId);
×
353
    if (nBytes <= 0 || nBytes >= cap) {
×
354
      code = TSDB_CODE_OUT_OF_RANGE;
×
355
      goto _EXIT;
×
356
    }
357

358
    if (taosStatFile(src, NULL, NULL, NULL) != 0) {
×
359
      code = terrno;
×
360
      goto _EXIT;
×
361
    }
362

363
    nBytes = snprintf(dst, cap, "%s%s%s", path, TD_DIRSEP, key);
×
364
    if (nBytes <= 0 || nBytes >= cap) {
×
365
      code = TSDB_CODE_OUT_OF_RANGE;
×
366
      goto _EXIT;
×
367
    }
368

369
    code = taosRenameFile(src, dst);
×
370
    if (code != 0) {
×
371
      goto _EXIT;
×
372
    }
373

374
    memset(src, 0, cap);
×
375
    memset(dst, 0, cap);
×
376
  }
377
  code = 0;
×
378

379
// rename manifest_chkp to manifest
380
_EXIT:
×
381
  taosMemoryFree(src);
×
382
  taosMemoryFree(dst);
×
383
  return code;
×
384
}
385
int32_t remoteChkpGetDelFile(char* path, SArray* toDel) {
1✔
386
  int32_t code = 0;
1✔
387
  int32_t nBytes = 0;
1✔
388

389
  SSChkpMetaOnS3* pMeta = NULL;
1✔
390
  code = remoteChkp_readMetaData(path, &pMeta);
1✔
391
  if (code != 0) {
1!
392
    return code;
1✔
393
  }
394

395
  for (int i = 0; i < 2; i++) {
×
396
    char* key = (i == 0 ? pMeta->pCurrName : pMeta->pManifestName);
×
397

398
    int32_t cap = strlen(key) + 32;
×
399
    char*   p = taosMemoryCalloc(1, cap);
×
400
    if (p == NULL) {
×
401
      taosMemoryFree(pMeta);
×
402
      return terrno;
×
403
    }
404

405
    nBytes = snprintf(p, cap, "%s_%" PRId64, key, pMeta->currChkptId);
×
406
    if (nBytes <= 0 || nBytes >= cap) {
×
407
      taosMemoryFree(pMeta);
×
408
      taosMemoryFree(p);
×
409
      return TSDB_CODE_OUT_OF_RANGE;
×
410
    }
411
    if (taosArrayPush(toDel, &p) == NULL) {
×
412
      taosMemoryFree(pMeta);
×
413
      taosMemoryFree(p);
×
414
      return terrno;
×
415
    }
416
  }
417

418
  return 0;
×
419
}
420

421
void cleanDir(const char* pPath, const char* id) {
2,705✔
422
  if (pPath == NULL) {
2,705!
423
    stError("%s try to clean dir, but path is NULL", id);
×
424
    return;
×
425
  }
426

427
  if (taosIsDir(pPath)) {
2,705!
428
    taosRemoveDir(pPath);
2,705✔
429
    if (taosMkDir(pPath) != 0) {
2,705!
430
      stError("%s failed to create dir:%s", id, pPath);
×
431
    }
432
    stInfo("%s clear dir:%s, succ", id, pPath);
2,705!
433
  }
434
}
435

436
int32_t createDirIfNotExist(const char* pPath) {
8,115✔
437
  if (!taosIsDir(pPath)) {
8,115✔
438
    return taosMulMkDir(pPath);
7,889✔
439
  } else {
440
    return 0;
225✔
441
  }
442
}
443

444
int32_t rebuildFromRemoteChkp_rsync(const char* key, char* checkpointPath, int64_t checkpointId, char* defaultPath) {
×
445
  int32_t code = 0;
×
446
  if (taosIsDir(checkpointPath)) {
×
447
    taosRemoveDir(checkpointPath);
×
448
    stDebug("remove local checkpoint data dir:%s succ", checkpointPath);
×
449
  }
450

451
  cleanDir(defaultPath, key);
×
452
  stDebug("clear local default dir before downloading checkpoint data:%s succ", defaultPath);
×
453

454
  code = streamTaskDownloadCheckpointData(key, checkpointPath, checkpointId);
×
455
  if (code != 0) {
×
456
    stError("failed to download checkpoint data:%s", key);
×
457
    return code;
×
458
  }
459

460
  stDebug("download remote checkpoint data for checkpointId:%" PRId64 ", %s", checkpointId, key);
×
461
  return backendCopyFiles(checkpointPath, defaultPath);
×
462
}
463

464
int32_t rebuildDataFromS3(char* chkpPath, int64_t chkpId) {
×
465
  SSChkpMetaOnS3* pMeta = NULL;
×
466

467
  int32_t code = remoteChkp_readMetaData(chkpPath, &pMeta);
×
468
  if (code != 0) {
×
469
    return code;
×
470
  }
471

472
  if (pMeta->currChkptId != chkpId || pMeta->manifestChkptId != chkpId) {
×
473
    taosMemoryFree(pMeta);
×
474
    return TSDB_CODE_INVALID_PARA;
×
475
  }
476

477
  code = remoteChkp_validAndCvtMeta(chkpPath, pMeta, chkpId);
×
478
  if (code != 0) {
×
479
    taosMemoryFree(pMeta);
×
480
    return code;
×
481
  }
482
  taosMemoryFree(pMeta);
×
483

484
  return chkpAddExtraInfo(chkpPath, chkpId, pMeta->processId);
×
485
}
486

487
int32_t rebuildFromRemoteChkp_s3(const char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
×
488
  int8_t  rename = 0;
×
489
  int32_t code = streamTaskDownloadCheckpointData(key, chkpPath, chkpId);
×
490
  if (code != 0) {
×
491
    return code;
×
492
  }
493

494
  int32_t cap = strlen(defaultPath) + 32;
×
495

496
  char* defaultTmp = taosMemoryCalloc(1, cap);
×
497
  if (defaultTmp == NULL) {
×
498
    return terrno;
×
499
  }
500

501
  int32_t nBytes = snprintf(defaultPath, cap, "%s%s", defaultPath, "_tmp");
×
502
  if (nBytes <= 0 || nBytes >= cap) {
×
503
    taosMemoryFree(defaultPath);
×
504
    return TSDB_CODE_OUT_OF_RANGE;
×
505
  }
506

507
  if (taosIsDir(defaultTmp)) taosRemoveDir(defaultTmp);
×
508
  if (taosIsDir(defaultPath)) {
×
509
    code = taosRenameFile(defaultPath, defaultTmp);
×
510
    if (code != 0) {
×
511
      goto _EXIT;
×
512
    } else {
513
      rename = 1;
×
514
    }
515
  } else {
516
    code = taosMkDir(defaultPath);
×
517
    if (code != 0) {
×
518
      code = TAOS_SYSTEM_ERROR(ERRNO);
×
519
      goto _EXIT;
×
520
    }
521
  }
522

523
  code = rebuildDataFromS3(chkpPath, chkpId);
×
524
  if (code != 0) {
×
525
    goto _EXIT;
×
526
  }
527

528
  code = backendCopyFiles(chkpPath, defaultPath);
×
529
  if (code != 0) {
×
530
    goto _EXIT;
×
531
  }
532
  code = 0;
×
533

534
_EXIT:
×
535
  if (code != 0) {
×
536
    if (rename) {
×
537
      TAOS_UNUSED(taosRenameFile(defaultTmp, defaultPath));
×
538
    }
539
  }
540

541
  if (taosIsDir(defaultPath)) {
×
542
    taosRemoveDir(defaultPath);
×
543
  }
544

545
  taosMemoryFree(defaultTmp);
×
546
  return code;
×
547
}
548

549
int32_t rebuildFromRemoteCheckpoint(const char* key, char* checkpointPath, int64_t checkpointId, char* defaultPath) {
5✔
550
  ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType();
5✔
551
  if (type == DATA_UPLOAD_S3) {
5!
552
    return rebuildFromRemoteChkp_s3(key, checkpointPath, checkpointId, defaultPath);
×
553
  } else if (type == DATA_UPLOAD_RSYNC) {
5!
554
    return rebuildFromRemoteChkp_rsync(key, checkpointPath, checkpointId, defaultPath);
×
555
  } else {
556
    stError("%s no remote backup checkpoint data for:%" PRId64, key, checkpointId);
5!
557
  }
558

559
  return -1;
5✔
560
}
561

562
int32_t copyFiles_create(char* src, char* dst, int8_t type) {
26✔
563
  // create and copy file
564
  int32_t err = taosCopyFile(src, dst);
26✔
565

566
  if (ERRNO == EXDEV || ERRNO == ENOTSUP) {
26!
567
    SET_ERRNO(0);
×
568
    return 0;
×
569
  }
570
  return 0;
26✔
571
}
572
int32_t copyFiles_hardlink(char* src, char* dst, int8_t type) {
63✔
573
  // same fs and hard link
574
  return taosLinkFile(src, dst);
63✔
575
}
576

577
int32_t backendFileCopyFilesImpl(const char* src, const char* dst) {
13✔
578
  const char* current = "CURRENT";
13✔
579
  size_t      currLen = strlen(current);
13✔
580

581
  const char* info = "info";
13✔
582
  size_t      infoLen = strlen(info);
13✔
583

584
  int32_t code = 0;
13✔
585
  int32_t sLen = strlen(src);
13✔
586
  int32_t dLen = strlen(dst);
13✔
587
  int32_t cap = TMAX(sLen, dLen) + 64;
13✔
588
  int32_t nBytes = 0;
13✔
589

590
  char* srcName = taosMemoryCalloc(1, cap);
13!
591
  char* dstName = taosMemoryCalloc(1, cap);
13!
592
  if (srcName == NULL || dstName == NULL) {
13!
593
    taosMemoryFree(srcName);
×
594
    taosMemoryFree(dstName);
×
595
    return terrno;
×
596
  }
597

598
  // copy file to dst
599
  TdDirPtr pDir = taosOpenDir(src);
13✔
600
  if (pDir == NULL) {
13!
601
    code = terrno;
×
602
    goto _ERROR;
×
603
  }
604

605
  SET_ERRNO(0);
13✔
606
  TdDirEntryPtr de = NULL;
13✔
607
  while ((de = taosReadDir(pDir)) != NULL) {
128✔
608
    char* name = taosGetDirEntryName(de);
115✔
609
    if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) {
115✔
610
      continue;
26✔
611
    }
612

613
    nBytes = snprintf(srcName, cap, "%s%s%s", src, TD_DIRSEP, name);
89✔
614
    if (nBytes <= 0 || nBytes >= cap) {
89!
615
      code = TSDB_CODE_OUT_OF_RANGE;
×
616
      goto _ERROR;
×
617
    }
618

619
    nBytes = snprintf(dstName, cap, "%s%s%s", dst, TD_DIRSEP, name);
89✔
620
    if (nBytes <= 0 || nBytes >= cap) {
89!
621
      code = TSDB_CODE_OUT_OF_RANGE;
×
622
      goto _ERROR;
×
623
    }
624

625
    if (strncmp(name, current, strlen(name) <= currLen ? strlen(name) : currLen) == 0) {
89✔
626
      code = copyFiles_create(srcName, dstName, 0);
13✔
627
      if (code != 0) {
13!
628
        code = TAOS_SYSTEM_ERROR(ERRNO);
×
629
        stError("failed to copy file, detail: %s to %s reason:%s", srcName, dstName, tstrerror(code));
×
630
        goto _ERROR;
×
631
      }
632
    } else if (strncmp(name, info, strlen(name) <= infoLen ? strlen(name) : infoLen) == 0) {
76✔
633
      code = copyFiles_create(srcName, dstName, 0);
13✔
634
      if (code != 0) {
13!
635
        code = TAOS_SYSTEM_ERROR(ERRNO);
×
636
        stError("failed to copy file, detail: %s to %s reason:%s", srcName, dstName, tstrerror(code));
×
637
        goto _ERROR;
×
638
      }
639

640
    } else {
641
      code = copyFiles_hardlink(srcName, dstName, 0);
63✔
642
      if (code != 0) {
63!
643
        code = TAOS_SYSTEM_ERROR(ERRNO);
×
644
        stError("failed to hard link file, detail:%s to %s, reason:%s", srcName, dstName, tstrerror(code));
×
645
        goto _ERROR;
×
646
      } else {
647
        stDebug("succ hard link file:%s to %s", srcName, dstName);
63✔
648
      }
649
    }
650

651
    memset(srcName, 0, cap);
89✔
652
    memset(dstName, 0, cap);
89✔
653
  }
654

655
  taosMemoryFreeClear(srcName);
13!
656
  taosMemoryFreeClear(dstName);
13!
657
  TAOS_UNUSED(taosCloseDir(&pDir));
13✔
658
  return code;
13✔
659

660
_ERROR:
×
661
  taosMemoryFreeClear(srcName);
×
662
  taosMemoryFreeClear(dstName);
×
663
  TAOS_UNUSED(taosCloseDir(&pDir));
×
664
  return code;
×
665
}
666

667
int32_t backendCopyFiles(const char* src, const char* dst) { return backendFileCopyFilesImpl(src, dst); }
13✔
668

669
static int32_t rebuildFromLocalCheckpoint(const char* pTaskIdStr, const char* checkpointPath, int64_t checkpointId,
18✔
670
                                          const char* defaultPath, int64_t* processVer) {
671
  int32_t code = 0;
18✔
672
  cleanDir(defaultPath, pTaskIdStr);
18✔
673

674
  if (taosIsDir(checkpointPath) && isValidCheckpoint(checkpointPath)) {
18!
675
    stDebug("%s local checkpoint data existed, checkpointId:%" PRId64 " copy to backend dir", pTaskIdStr, checkpointId);
13✔
676

677
    code = backendCopyFiles(checkpointPath, defaultPath);
13✔
678
    if (code != TSDB_CODE_SUCCESS) {
13!
679
      cleanDir(defaultPath, pTaskIdStr);
×
680
      stError("%s failed to start stream backend from local %s, reason:%s, try download checkpoint from remote",
×
681
              pTaskIdStr, checkpointPath, tstrerror(TAOS_SYSTEM_ERROR(code)));
682
      code = TSDB_CODE_SUCCESS;
×
683
    } else {
684
      stInfo("%s copy checkpoint data from:%s to:%s succ, try to start stream backend", pTaskIdStr, checkpointPath,
13!
685
             defaultPath);
686
    }
687
  } else {
688
    code = terrno;
5✔
689
    stError("%s no valid data for checkpointId:%" PRId64 " in %s", pTaskIdStr, checkpointId, checkpointPath);
5!
690
  }
691

692
  return code;
18✔
693
}
694

695
int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId, char** dbPrefixPath, char** dbPath,
2,703✔
696
                              int64_t* processVer) {
697
  int32_t code = 0;
2,703✔
698

699
  char* prefixPath = NULL;
2,703✔
700
  char* defaultPath = NULL;
2,703✔
701
  char* checkpointPath = NULL;
2,703✔
702
  char* checkpointRoot = NULL;
2,703✔
703

704
  int32_t cap = strlen(path) + 128;
2,703✔
705
  int32_t nBytes;
706

707
  // alloc buf
708
  prefixPath = taosMemoryCalloc(1, cap);
2,703!
709
  defaultPath = taosMemoryCalloc(1, cap);
2,705!
710
  checkpointPath = taosMemoryCalloc(1, cap);
2,705!
711
  checkpointRoot = taosMemoryCalloc(1, cap);
2,705!
712
  if (prefixPath == NULL || defaultPath == NULL || checkpointPath == NULL || checkpointRoot == NULL) {
2,705!
713
    code = terrno;
×
714
    goto _EXIT;
×
715
  }
716

717
  nBytes = snprintf(prefixPath, cap, "%s%s%s", path, TD_DIRSEP, key);
2,705✔
718
  if (nBytes <= 0 || nBytes >= cap) {
2,705!
719
    code = TSDB_CODE_OUT_OF_RANGE;
×
720
    goto _EXIT;
×
721
  }
722

723
  code = createDirIfNotExist(prefixPath);
2,705✔
724
  if (code != 0) {
2,705!
725
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
726
    goto _EXIT;
×
727
  }
728

729
  nBytes = snprintf(defaultPath, cap, "%s%s%s", prefixPath, TD_DIRSEP, "state");
2,705✔
730
  if (nBytes <= 0 || nBytes >= cap) {
2,705!
731
    code = TSDB_CODE_OUT_OF_RANGE;
×
732
    goto _EXIT;
×
733
  }
734

735
  code = createDirIfNotExist(defaultPath);
2,705✔
736
  if (code != 0) {
2,705!
737
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
738
    goto _EXIT;
×
739
  }
740

741
  nBytes = snprintf(checkpointRoot, cap, "%s%s%s", prefixPath, TD_DIRSEP, "checkpoints");
2,705✔
742
  if (nBytes <= 0 || nBytes >= cap) {
2,705!
743
    code = TSDB_CODE_OUT_OF_RANGE;
×
744
    goto _EXIT;
×
745
  }
746

747
  code = createDirIfNotExist(checkpointRoot);
2,705✔
748
  if (code != 0) {
2,705!
749
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
750
    goto _EXIT;
×
751
  }
752

753
  stDebug("%s check local backend dir:%s, checkpointId:%" PRId64 " succ", key, defaultPath, chkptId);
2,705✔
754
  if (chkptId > 0) {
2,705✔
755
    nBytes = snprintf(checkpointPath, cap, "%s%s%s%s%s%" PRId64, prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP,
18✔
756
                      "checkpoint", chkptId);
757
    if (nBytes <= 0 || nBytes >= cap) {
18!
758
      code = TSDB_CODE_OUT_OF_RANGE;
×
759
      goto _EXIT;
×
760
    }
761

762
    code = rebuildFromLocalCheckpoint(key, checkpointPath, chkptId, defaultPath, processVer);
18✔
763
    if (code != 0) {
18✔
764
      code = rebuildFromRemoteCheckpoint(key, checkpointPath, chkptId, defaultPath);
5✔
765
    }
766

767
    if (code != 0) {
18✔
768
      stError("failed to start stream backend at %s, restart from defaultPath:%s, reason:%s", checkpointPath,
5!
769
              defaultPath, tstrerror(code));
770
      code = 0;  // reset the error code
5✔
771
    }
772
  } else {  // no valid checkpoint id
773
    stInfo("%s no valid checkpoint ever generated, no need to copy checkpoint data, clean defaultPath:%s", key,
2,687!
774
           defaultPath);
775
    cleanDir(defaultPath, key);
2,687✔
776
  }
777

778
  *dbPath = defaultPath;
2,705✔
779
  *dbPrefixPath = prefixPath;
2,705✔
780
  defaultPath = NULL;
2,705✔
781
  prefixPath = NULL;
2,705✔
782

783
  code = 0;
2,705✔
784

785
_EXIT:
2,705✔
786
  taosMemoryFree(defaultPath);
2,705!
787
  taosMemoryFree(prefixPath);
2,705!
788
  taosMemoryFree(checkpointPath);
2,705!
789
  taosMemoryFree(checkpointRoot);
2,705!
790
  return code;
2,705✔
791
}
792
bool streamBackendDataIsExist(const char* path, int64_t chkpId) {
57✔
793
  bool    exist = true;
57✔
794
  int32_t cap = strlen(path) + 32;
57✔
795

796
  char* state = taosMemoryCalloc(1, cap);
57!
797
  if (state == NULL) {
57!
798
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
799
    return false;
×
800
  }
801

802
  int16_t nBytes = snprintf(state, cap, "%s%s%s", path, TD_DIRSEP, "state");
57✔
803
  if (nBytes <= 0 || nBytes >= cap) {
57!
804
    terrno = TSDB_CODE_OUT_OF_RANGE;
×
805
    exist = false;
×
806
  } else {
807
    if (!taosDirExist(state)) {
57!
808
      exist = false;
57✔
809
    }
810
  }
811

812
  taosMemoryFree(state);
57!
813
  return exist;
57✔
814
}
815

816
int32_t streamBackendInit(const char* streamPath, int64_t chkpId, int32_t vgId, SBackendWrapper** pBackend) {
2✔
817
  char*   backendPath = NULL;
2✔
818
  int32_t code = 0;
2✔
819
  int32_t lino = 0;
2✔
820
  char*   err = NULL;
2✔
821
  size_t  nCf = 0;
2✔
822

823
  *pBackend = NULL;
2✔
824

825
  code = rebuildDirFromCheckpoint(streamPath, chkpId, &backendPath);
2✔
826
  TSDB_CHECK_CODE(code, lino, _EXIT);
2!
827

828
  stDebug("start to init stream backend:%s, checkpointId:%" PRId64 " vgId:%d", backendPath, chkpId, vgId);
2!
829

830
  uint32_t         dbMemLimit = nextPow2(tsMaxStreamBackendCache) << 20;
2✔
831
  SBackendWrapper* pHandle = taosMemoryCalloc(1, sizeof(SBackendWrapper));
2!
832
  TSDB_CHECK_NULL(pHandle, code, lino, _EXIT, terrno);
2!
833

834
  pHandle->list = tdListNew(sizeof(SCfComparator));
2✔
835
  TSDB_CHECK_NULL(pHandle->list, code, lino, _EXIT, terrno);
2!
836

837
  code = taosThreadMutexInit(&pHandle->mutex, NULL);
2✔
838
  TSDB_CHECK_CODE(code, lino, _EXIT);
2!
839

840
  code = taosThreadMutexInit(&pHandle->cfMutex, NULL);
2✔
841
  TSDB_CHECK_CODE(code, lino, _EXIT);
2!
842

843
  pHandle->cfInst = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
2✔
844
  TSDB_CHECK_NULL(pHandle->cfInst, code, lino, _EXIT, terrno);
2!
845

846
  pHandle->vgId = vgId;
2✔
847

848
  rocksdb_env_t* env = rocksdb_create_default_env();  // rocksdb_envoptions_create();
2✔
849

850
  int32_t nBGThread = tsNumOfSnodeStreamThreads <= 2 ? 1 : tsNumOfSnodeStreamThreads / 2;
2!
851
  rocksdb_env_set_low_priority_background_threads(env, nBGThread);
2✔
852
  rocksdb_env_set_high_priority_background_threads(env, nBGThread);
2✔
853

854
  rocksdb_cache_t* cache = rocksdb_cache_create_lru(dbMemLimit / 2);
2✔
855

856
  rocksdb_options_t* opts = rocksdb_options_create();
2✔
857
  rocksdb_options_set_env(opts, env);
2✔
858
  rocksdb_options_set_create_if_missing(opts, 1);
2✔
859
  rocksdb_options_set_create_missing_column_families(opts, 1);
2✔
860
  rocksdb_options_set_max_total_wal_size(opts, dbMemLimit);
2✔
861
  rocksdb_options_set_recycle_log_file_num(opts, 6);
2✔
862
  rocksdb_options_set_max_write_buffer_number(opts, 3);
2✔
863
  rocksdb_options_set_info_log_level(opts, 1);
2✔
864
  rocksdb_options_set_db_write_buffer_size(opts, dbMemLimit);
2✔
865
  rocksdb_options_set_write_buffer_size(opts, dbMemLimit / 2);
2✔
866
  rocksdb_options_set_atomic_flush(opts, 1);
2✔
867

868
  pHandle->env = env;
2✔
869
  pHandle->dbOpt = opts;
2✔
870
  pHandle->cache = cache;
2✔
871
  pHandle->filterFactory = rocksdb_compactionfilterfactory_create(
2✔
872
      NULL, destroyCompactFilteFactory, compactFilteFactoryCreateFilter, compactFilteFactoryName);
873
  rocksdb_options_set_compaction_filter_factory(pHandle->dbOpt, pHandle->filterFactory);
2✔
874

875
  char** cfs = rocksdb_list_column_families(opts, backendPath, &nCf, &err);
2✔
876
  if (nCf == 0 || nCf == 1 || err != NULL) {
2!
877
    taosMemoryFreeClear(err);
2!
878
    pHandle->db = rocksdb_open(opts, backendPath, &err);
2✔
879
    if (err != NULL) {
2!
880
      stError("failed to open rocksdb, path:%s, reason:%s", backendPath, err);
×
881
      taosMemoryFreeClear(err);
×
882
      rocksdb_list_column_families_destroy(cfs, nCf);
×
883
      goto _EXIT;
×
884
    }
885
  } else {
886
    /*
887
      list all cf and get prefix
888
    */
889
    code = streamStateOpenBackendCf(pHandle, (char*)backendPath, cfs, nCf);
×
890
    if (code != 0) {
×
891
      rocksdb_list_column_families_destroy(cfs, nCf);
×
892
      goto _EXIT;
×
893
    }
894
  }
895

896
  if (cfs != NULL) {
2!
897
    rocksdb_list_column_families_destroy(cfs, nCf);
2✔
898
  }
899

900
  stDebug("init stream backend at %s, backend:%p, vgId:%d", backendPath, pHandle, vgId);
2!
901
  taosMemoryFreeClear(backendPath);
2!
902

903
  *pBackend = pHandle;
2✔
904
  return code;
2✔
905

906
_EXIT:
×
907
  rocksdb_options_destroy(opts);
×
908
  rocksdb_cache_destroy(cache);
×
909
  rocksdb_env_destroy(env);
×
910
  streamMutexDestroy(&pHandle->mutex);
×
911
  streamMutexDestroy(&pHandle->cfMutex);
×
912
  taosHashCleanup(pHandle->cfInst);
×
913
  pHandle->list = tdListFree(pHandle->list);
×
914
  taosMemoryFree(pHandle);
×
915
  stDebug("failed to init stream backend at %s, vgId:%d line:%d code:%s", backendPath, vgId, lino, tstrerror(code));
×
916
  taosMemoryFree(backendPath);
×
917
  return code;
×
918
}
919

920
void streamBackendCleanup(void* arg) {
2✔
921
  SBackendWrapper* pHandle = (SBackendWrapper*)arg;
2✔
922

923
  void* pIter = taosHashIterate(pHandle->cfInst, NULL);
2✔
924
  while (pIter != NULL) {
2!
925
    RocksdbCfInst* inst = *(RocksdbCfInst**)pIter;
×
926
    destroyRocksdbCfInst(inst);
×
927
    pIter = taosHashIterate(pHandle->cfInst, pIter);
×
928
  }
929

930
  taosHashCleanup(pHandle->cfInst);
2✔
931

932
  if (pHandle->db) {
2!
933
    rocksdb_close(pHandle->db);
2✔
934
    pHandle->db = NULL;
2✔
935
  }
936

937
  rocksdb_options_destroy(pHandle->dbOpt);
2✔
938
  rocksdb_env_destroy(pHandle->env);
2✔
939
  rocksdb_cache_destroy(pHandle->cache);
2✔
940

941
  SListNode* head = tdListPopHead(pHandle->list);
2✔
942
  while (head != NULL) {
2!
943
    streamStateDestroyCompar(head->data);
×
944
    taosMemoryFree(head);
×
945
    head = tdListPopHead(pHandle->list);
×
946
  }
947

948
  pHandle->list = tdListFree(pHandle->list);
2✔
949
  streamMutexDestroy(&pHandle->mutex);
2✔
950

951
  streamMutexDestroy(&pHandle->cfMutex);
2✔
952
  stDebug("vgId:%d destroy stream backend:%p", (int32_t)pHandle->vgId, pHandle);
2!
953
  taosMemoryFree(pHandle);
2!
954
}
2✔
955

956
void streamBackendHandleCleanup(void* arg) {
×
957
  SBackendCfWrapper* wrapper = arg;
×
958
  bool               remove = wrapper->remove;
×
959
  TAOS_UNUSED(taosThreadRwlockWrlock(&wrapper->rwLock));
×
960

961
  stDebug("start to do-close backendWrapper %p, %s", wrapper, wrapper->idstr);
×
962
  if (wrapper->rocksdb == NULL) {
×
963
    TAOS_UNUSED(taosThreadRwlockUnlock(&wrapper->rwLock));
×
964
    return;
×
965
  }
966

967
  int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
×
968

969
  char* err = NULL;
×
970
  if (remove) {
×
971
    for (int i = 0; i < cfLen; i++) {
×
972
      if (wrapper->pHandle[i] != NULL) rocksdb_drop_column_family(wrapper->rocksdb, wrapper->pHandle[i], &err);
×
973
      if (err != NULL) {
×
974
        stError("failed to drop cf:%s_%s, reason:%s", wrapper->idstr, ginitDict[i].key, err);
×
975
        taosMemoryFreeClear(err);
×
976
      }
977
    }
978
  } else {
979
    rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
×
980
    rocksdb_flushoptions_set_wait(flushOpt, 1);
×
981

982
    for (int i = 0; i < cfLen; i++) {
×
983
      if (wrapper->pHandle[i] != NULL) rocksdb_flush_cf(wrapper->rocksdb, flushOpt, wrapper->pHandle[i], &err);
×
984
      if (err != NULL) {
×
985
        stError("failed to flush cf:%s_%s, reason:%s", wrapper->idstr, ginitDict[i].key, err);
×
986
        taosMemoryFreeClear(err);
×
987
      }
988
    }
989
    rocksdb_flushoptions_destroy(flushOpt);
×
990
  }
991

992
  for (int i = 0; i < cfLen; i++) {
×
993
    if (wrapper->pHandle[i] != NULL) {
×
994
      rocksdb_column_family_handle_destroy(wrapper->pHandle[i]);
×
995
    }
996
  }
997
  taosMemoryFreeClear(wrapper->pHandle);
×
998

999
  for (int i = 0; i < cfLen; i++) {
×
1000
    rocksdb_options_destroy(wrapper->cfOpts[i]);
×
1001
    rocksdb_block_based_options_destroy(((RocksdbCfParam*)wrapper->param)[i].tableOpt);
×
1002
  }
1003

1004
  if (remove) {
×
1005
    streamBackendDelCompare(wrapper->pBackend, wrapper->pComparNode);
×
1006
  }
1007
  rocksdb_writeoptions_destroy(wrapper->writeOpts);
×
1008
  wrapper->writeOpts = NULL;
×
1009

1010
  rocksdb_readoptions_destroy(wrapper->readOpts);
×
1011
  wrapper->readOpts = NULL;
×
1012
  taosMemoryFreeClear(wrapper->cfOpts);
×
1013
  taosMemoryFreeClear(wrapper->param);
×
1014
  TAOS_UNUSED(taosThreadRwlockUnlock(&wrapper->rwLock));
×
1015

1016
  TAOS_UNUSED(taosThreadRwlockDestroy(&wrapper->rwLock));
×
1017
  wrapper->rocksdb = NULL;
×
1018
  // taosReleaseRef(streamBackendId, wrapper->backendId);
1019

1020
  stDebug("end to do-close backendwrapper %p, %s", wrapper, wrapper->idstr);
×
1021
  taosMemoryFree(wrapper);
×
1022
  return;
×
1023
}
1024

1025
#ifdef BUILD_NO_CALL
1026
int32_t getLatestCheckpoint(void* arg, int64_t* checkpoint) {
1027
  SStreamMeta* pMeta = arg;
1028
  taosWLockLatch(&pMeta->chkpDirLock);
1029
  int64_t tc = 0;
1030
  int32_t sz = taosArrayGetSize(pMeta->chkpSaved);
1031
  if (sz <= 0) {
1032
    taosWUnLockLatch(&pMeta->chkpDirLock);
1033
    return -1;
1034
  } else {
1035
    tc = *(int64_t*)taosArrayGetLast(pMeta->chkpSaved);
1036
  }
1037

1038
  taosArrayPush(pMeta->chkpInUse, &tc);
1039

1040
  *checkpoint = tc;
1041
  taosWUnLockLatch(&pMeta->chkpDirLock);
1042
  return 0;
1043
}
1044
/*
1045
 *  checkpointSave |--cp1--|--cp2--|--cp3--|--cp4--|--cp5--|
1046
 *  chkpInUse: |--cp2--|--cp4--|
1047
 *  chkpInUse is doing translation, cannot del until
1048
 *  replication is finished
1049
 */
1050
int32_t delObsoleteCheckpoint(void* arg, const char* path) {
1051
  SStreamMeta* pMeta = arg;
1052

1053
  taosWLockLatch(&pMeta->chkpDirLock);
1054

1055
  SArray* chkpDel = taosArrayInit(10, sizeof(int64_t));
1056
  SArray* chkpDup = taosArrayInit(10, sizeof(int64_t));
1057

1058
  int64_t firsId = 0;
1059
  if (taosArrayGetSize(pMeta->chkpInUse) >= 1) {
1060
    firsId = *(int64_t*)taosArrayGet(pMeta->chkpInUse, 0);
1061

1062
    for (int i = 0; i < taosArrayGetSize(pMeta->chkpSaved); i++) {
1063
      int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpSaved, i);
1064
      if (id >= firsId) {
1065
        taosArrayPush(chkpDup, &id);
1066
      } else {
1067
        taosArrayPush(chkpDel, &id);
1068
      }
1069
    }
1070
  } else {
1071
    int32_t sz = taosArrayGetSize(pMeta->chkpSaved);
1072
    int32_t dsz = sz - pMeta->chkpCap;  // del size
1073

1074
    for (int i = 0; i < dsz; i++) {
1075
      int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpSaved, i);
1076
      taosArrayPush(chkpDel, &id);
1077
    }
1078
    for (int i = dsz < 0 ? 0 : dsz; i < sz; i++) {
1079
      int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpSaved, i);
1080
      taosArrayPush(chkpDup, &id);
1081
    }
1082
  }
1083
  taosArrayDestroy(pMeta->chkpSaved);
1084
  pMeta->chkpSaved = chkpDup;
1085

1086
  taosWUnLockLatch(&pMeta->chkpDirLock);
1087

1088
  for (int i = 0; i < taosArrayGetSize(chkpDel); i++) {
1089
    int64_t id = *(int64_t*)taosArrayGet(chkpDel, i);
1090
    char    tbuf[256] = {0};
1091
    sprintf(tbuf, "%s%scheckpoint%" PRId64, path, TD_DIRSEP, id);
1092
    if (taosIsDir(tbuf)) {
1093
      taosRemoveDir(tbuf);
1094
    }
1095
  }
1096
  taosArrayDestroy(chkpDel);
1097
  return 0;
1098
}
1099
#endif
1100
/*
1101
 *  checkpointSave |--cp1--|--cp2--|--cp3--|--cp4--|--cp5--|
1102
 *  chkpInUse: |--cp2--|--cp4--|
1103
 *  chkpInUse is doing translation, cannot del until
1104
 *  replication is finished
1105
 */
1106
int32_t chkpMayDelObsolete(void* arg, int64_t chkpId, char* path, SArray* pDroppedList) {
906✔
1107
  int32_t         code = 0;
906✔
1108
  STaskDbWrapper* pBackend = arg;
906✔
1109
  SArray*         chkpDup = NULL;
906✔
1110
  TAOS_UNUSED(taosThreadRwlockWrlock(&pBackend->chkpDirLock));
906✔
1111

1112
  if (taosArrayPush(pBackend->chkpSaved, &chkpId) == NULL) {
1,812!
1113
    TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
1114
  }
1115

1116
  chkpDup = taosArrayInit(8, sizeof(int64_t));
906✔
1117
  if (chkpDup == NULL) {
906!
1118
    TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
1119
  }
1120

1121
  int64_t firsId = 0;
906✔
1122
  if (taosArrayGetSize(pBackend->chkpInUse) >= 1) {
906!
1123
    firsId = *(int64_t*)taosArrayGet(pBackend->chkpInUse, 0);
×
1124

1125
    for (int i = 0; i < taosArrayGetSize(pBackend->chkpSaved); i++) {
×
1126
      int64_t id = *(int64_t*)taosArrayGet(pBackend->chkpSaved, i);
×
1127
      if (id >= firsId) {
×
1128
        if (taosArrayPush(chkpDup, &id) == NULL) {
×
1129
          TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
1130
        }
1131
      } else {
1132
        if (taosArrayPush(pDroppedList, &id) == NULL) {
×
1133
          TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
1134
        }
1135
      }
1136
    }
1137
  } else {
1138
    int32_t sz = taosArrayGetSize(pBackend->chkpSaved);
906✔
1139
    int32_t dsz = sz - pBackend->chkpCap;  // del size
906✔
1140

1141
    for (int i = 0; i < dsz; i++) {
906!
1142
      int64_t id = *(int64_t*)taosArrayGet(pBackend->chkpSaved, i);
×
1143
      if (taosArrayPush(pDroppedList, &id) == NULL) {
×
1144
        TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
1145
      }
1146
    }
1147
    for (int i = dsz < 0 ? 0 : dsz; i < sz; i++) {
1,842✔
1148
      int64_t id = *(int64_t*)taosArrayGet(pBackend->chkpSaved, i);
936✔
1149
      if (taosArrayPush(chkpDup, &id) == NULL) {
936!
1150
        TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
1151
      }
1152
    }
1153
  }
1154

1155
  taosArrayDestroy(pBackend->chkpSaved);
906✔
1156
  pBackend->chkpSaved = chkpDup;
906✔
1157
  chkpDup = NULL;
906✔
1158

1159
  TAOS_UNUSED(taosThreadRwlockUnlock(&pBackend->chkpDirLock));
906✔
1160

1161
  for (int i = 0; i < taosArrayGetSize(pDroppedList); i++) {
906!
1162
    int64_t id = *(int64_t*)taosArrayGet(pDroppedList, i);
×
1163
    char    tbuf[256] = {0};
×
1164
    if (snprintf(tbuf, sizeof(tbuf), "%s%scheckpoint%" PRId64, path, TD_DIRSEP, id) >= sizeof(tbuf)) {
×
1165
      code = TSDB_CODE_OUT_OF_RANGE;
×
1166
      TAOS_CHECK_GOTO(code, NULL, _exception);
×
1167
    }
1168

1169
    stInfo("backend remove expired checkpoint: %s, remaining:%d", tbuf, pBackend->chkpCap);
×
1170
    if (taosIsDir(tbuf)) {
×
1171
      taosRemoveDir(tbuf);
×
1172
    }
1173
  }
1174

1175
  return 0;
906✔
1176

1177
_exception:
×
1178
  taosArrayDestroy(chkpDup);
×
1179
  TAOS_UNUSED(taosThreadRwlockUnlock(&pBackend->chkpDirLock));
×
1180
  return code;
×
1181
}
1182

1183
int chkpIdComp(const void* a, const void* b) {
14✔
1184
  int64_t x = *(int64_t*)a;
14✔
1185
  int64_t y = *(int64_t*)b;
14✔
1186
  if (x == y) return 0;
14!
1187

1188
  return x < y ? -1 : 1;
14!
1189
}
1190
int32_t taskDbLoadChkpInfo(STaskDbWrapper* pBackend) {
2,703✔
1191
  int32_t code = 0;
2,703✔
1192
  int32_t nBytes = 0;
2,703✔
1193
  int32_t cap = 256;
2,703✔
1194
  char*   pChkpDir = taosMemoryCalloc(1, cap);
2,703!
1195
  if (pChkpDir == NULL) {
2,705!
1196
    return terrno;
×
1197
  }
1198

1199
  nBytes = snprintf(pChkpDir, cap, "%s%s%s", pBackend->path, TD_DIRSEP, "checkpoints");
2,705✔
1200
  if (nBytes >= cap) {
2,705!
1201
    return TSDB_CODE_OUT_OF_RANGE;
×
1202
  }
1203
  if (!taosIsDir(pChkpDir)) {
2,705!
1204
    taosMemoryFree(pChkpDir);
×
1205
    return 0;
×
1206
  }
1207
  TdDirPtr pDir = taosOpenDir(pChkpDir);
2,705✔
1208
  if (pDir == NULL) {
2,705!
1209
    taosMemoryFree(pChkpDir);
×
1210
    return 0;
×
1211
  }
1212
  TdDirEntryPtr de = NULL;
2,705✔
1213
  while ((de = taosReadDir(pDir)) != NULL) {
8,147✔
1214
    if (strcmp(taosGetDirEntryName(de), ".") == 0 || strcmp(taosGetDirEntryName(de), "..") == 0) continue;
5,442✔
1215

1216
    if (taosDirEntryIsDir(de)) {
35!
1217
      char    checkpointPrefix[32] = {0};
35✔
1218
      int64_t checkpointId = 0;
35✔
1219

1220
      int ret = sscanf(taosGetDirEntryName(de), "checkpoint%" PRId64, &checkpointId);
35✔
1221
      if (ret == 1) {
35!
1222
        if (taosArrayPush(pBackend->chkpSaved, &checkpointId) == NULL) {
70!
1223
          TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
1224
        }
1225
      }
1226
    } else {
1227
      continue;
×
1228
    }
1229
  }
1230
  taosArraySort(pBackend->chkpSaved, chkpIdComp);
2,705✔
1231

1232
  taosMemoryFree(pChkpDir);
2,705!
1233
  TAOS_UNUSED(taosCloseDir(&pDir));
2,705✔
1234

1235
  return 0;
2,705✔
1236
_exception:
×
1237
  taosMemoryFree(pChkpDir);
×
1238
  TAOS_UNUSED(taosCloseDir(&pDir));
×
1239
  return code;
×
1240
}
1241
int32_t chkpGetAllDbCfHandle2(STaskDbWrapper* pBackend, rocksdb_column_family_handle_t*** ppHandle) {
906✔
1242
  int32_t code = 0;
906✔
1243
  SArray* pHandle = taosArrayInit(8, POINTER_BYTES);
906✔
1244
  for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
7,248✔
1245
    if (pBackend->pCf[i]) {
6,342✔
1246
      rocksdb_column_family_handle_t* p = pBackend->pCf[i];
2,152✔
1247
      if (taosArrayPush(pHandle, &p) == NULL) {
2,152!
1248
        code = terrno;
×
1249
        goto _exception;
×
1250
      }
1251
    }
1252
  }
1253
  int32_t nCf = taosArrayGetSize(pHandle);
906✔
1254
  if (nCf == 0) {
906!
1255
    taosArrayDestroy(pHandle);
×
1256
    return nCf;
×
1257
  }
1258

1259
  rocksdb_column_family_handle_t** ppCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
906!
1260
  if (ppCf == NULL) {
906!
1261
    TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
1262
  }
1263
  for (int i = 0; i < nCf; i++) {
3,058✔
1264
    ppCf[i] = taosArrayGetP(pHandle, i);
2,152✔
1265
  }
1266

1267
  taosArrayDestroy(pHandle);
906✔
1268

1269
  *ppHandle = ppCf;
906✔
1270
  return nCf;
906✔
1271
_exception:
×
1272
  taosArrayDestroy(pHandle);
×
1273
  return code;
×
1274
}
1275

1276
int32_t chkpDoDbCheckpoint(rocksdb_t* db, char* path) {
906✔
1277
  int32_t               code = -1;
906✔
1278
  char*                 err = NULL;
906✔
1279
  rocksdb_checkpoint_t* cp = rocksdb_checkpoint_object_create(db, &err);
906✔
1280
  if (cp == NULL || err != NULL) {
906!
1281
    stError("failed to do checkpoint at:%s, reason:%s", path, err);
×
1282
    taosMemoryFreeClear(err);
×
1283
    code = TSDB_CODE_THIRDPARTY_ERROR;
×
1284
    goto _ERROR;
×
1285
  }
1286
  rocksdb_checkpoint_create(cp, path, UINT64_MAX, &err);
906✔
1287
  if (err != NULL) {
906!
1288
    stError("failed to do checkpoint at:%s, reason:%s", path, err);
×
1289
    taosMemoryFreeClear(err);
×
1290
    code = TSDB_CODE_THIRDPARTY_ERROR;
×
1291
  } else {
1292
    code = 0;
906✔
1293
  }
1294
_ERROR:
906✔
1295
  rocksdb_checkpoint_object_destroy(cp);
906✔
1296
  return code;
906✔
1297
}
1298

1299
int32_t chkpPreFlushDb(rocksdb_t* db, rocksdb_column_family_handle_t** cf, int32_t nCf) {
743✔
1300
  int   code = 0;
743✔
1301
  char* err = NULL;
743✔
1302

1303
  rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
743✔
1304
  if (flushOpt == NULL) {
743!
1305
    return TSDB_CODE_OUT_OF_MEMORY;
×
1306
  }
1307

1308
  rocksdb_flushoptions_set_wait(flushOpt, 1);
743✔
1309

1310
  rocksdb_flush_cfs(db, flushOpt, cf, nCf, &err);
743✔
1311
  if (err != NULL) {
743!
1312
    stError("failed to flush db before streamBackend clean up, reason:%s", err);
×
1313
    taosMemoryFree(err);
×
1314
    code = TSDB_CODE_THIRDPARTY_ERROR;
×
1315
  }
1316
  rocksdb_flushoptions_destroy(flushOpt);
743✔
1317
  return code;
743✔
1318
}
1319

1320
int32_t chkpPreBuildDir(char* path, int64_t chkpId, char** chkpDir, char** chkpIdDir) {
906✔
1321
  int32_t code = 0;
906✔
1322
  int32_t cap = strlen(path) + 256;
906✔
1323
  int32_t nBytes = 0;
906✔
1324

1325
  char* pChkpDir = taosMemoryCalloc(1, cap);
906!
1326
  char* pChkpIdDir = taosMemoryCalloc(1, cap);
906!
1327
  if (pChkpDir == NULL || pChkpIdDir == NULL) {
906!
1328
    code = terrno;
×
1329
    goto _EXIT;
×
1330
  }
1331

1332
  nBytes = snprintf(pChkpDir, cap, "%s%s%s", path, TD_DIRSEP, "checkpoints");
906✔
1333
  if (nBytes <= 0 || nBytes >= cap) {
906!
1334
    code = TSDB_CODE_OUT_OF_RANGE;
×
1335
    goto _EXIT;
×
1336
  }
1337

1338
  nBytes = snprintf(pChkpIdDir, cap, "%s%s%s%" PRId64, pChkpDir, TD_DIRSEP, "checkpoint", chkpId);
906✔
1339
  if (nBytes <= 0 || nBytes >= cap) {
906!
1340
    code = TSDB_CODE_OUT_OF_RANGE;
×
1341
    goto _EXIT;
×
1342
  }
1343

1344
  code = taosMulModeMkDir(pChkpDir, 0755, true);
906✔
1345
  if (code != 0) {
906!
1346
    code = terrno;
×
1347
    stError("failed to prepare checkpoint dir, path:%s, reason:%s", path, tstrerror(code));
×
1348
    goto _EXIT;
×
1349
  }
1350

1351
  if (taosIsDir(pChkpIdDir)) {
906✔
1352
    stInfo("stream rm exist checkpoint%s", pChkpIdDir);
1!
1353
    taosRemoveDir(pChkpIdDir);
1✔
1354
  }
1355

1356
  *chkpDir = pChkpDir;
906✔
1357
  *chkpIdDir = pChkpIdDir;
906✔
1358
  return 0;
906✔
1359
_EXIT:
×
1360
  taosMemoryFree(pChkpDir);
×
1361
  taosMemoryFree(pChkpIdDir);
×
1362
  return code;
×
1363
}
1364

1365
int32_t taskDbBuildSnap(void* arg, SArray* pSnap) {
95✔
1366
  // vnode task->db
1367
  SStreamMeta* pMeta = arg;
95✔
1368
  int32_t      code = 0;
95✔
1369

1370
  streamMutexLock(&pMeta->backendMutex);
95✔
1371
  void*   pIter = taosHashIterate(pMeta->pTaskDbUnique, NULL);
95✔
1372

1373
  while (pIter) {
95!
1374
    STaskDbWrapper* pTaskDb = *(STaskDbWrapper**)pIter;
×
1375

1376
    void* p = taskDbAddRef(pTaskDb);
×
1377
    if (p == NULL) {
×
1378
      terrno = 0;
×
1379
      pIter = taosHashIterate(pMeta->pTaskDbUnique, pIter);
×
1380
      continue;
×
1381
    }
1382

1383
    // add chkpId to in-use-ckpkIdSet
1384
    taskDbRefChkp(pTaskDb, pTaskDb->chkpId);
×
1385

1386
    SArray* pList = taosArrayInit(4, sizeof(int64_t));
×
1387
    if (pList == NULL) {
×
1388
      stError("vgId:%d failed to prepare list during build snap, code:%s", pMeta->vgId, tstrerror(code));
×
1389
      continue;
×
1390
    }
1391

1392
    SStreamTask* pTask = pTaskDb->pTask;
×
1393
    code = taskDbDoCheckpoint(pTaskDb, pTaskDb->chkpId, pTask->chkInfo.processedVer, pList);
×
1394
    taosArrayDestroy(pList);
×
1395

1396
    if (code != 0) { // remove chkpId from in-use-ckpkIdSet
×
1397
      taskDbUnRefChkp(pTaskDb, pTaskDb->chkpId);
×
1398
      taskDbRemoveRef(pTaskDb);
×
1399
      break;
×
1400
    }
1401

1402
    SStreamTaskSnap snap = {.streamId = pTask->id.streamId,
×
1403
                            .taskId = pTask->id.taskId,
×
1404
                            .chkpId = pTaskDb->chkpId,
×
1405
                            .dbPrefixPath = taosStrdup(pTaskDb->path)};
×
1406
    if (snap.dbPrefixPath == NULL) {
×
1407
      // remove chkpid from chkp-in-use set
1408
      taskDbUnRefChkp(pTaskDb, pTaskDb->chkpId);
×
1409
      taskDbRemoveRef(pTaskDb);
×
1410
      code = terrno;
×
1411
      break;
×
1412
    }
1413
    if (taosArrayPush(pSnap, &snap) == NULL) {
×
1414
      taskDbUnRefChkp(pTaskDb, pTaskDb->chkpId);
×
1415
      taskDbRemoveRef(pTaskDb);
×
1416
      code = terrno;
×
1417
      break;
×
1418
    }
1419

1420
    taskDbRemoveRef(pTaskDb);
×
1421
    pIter = taosHashIterate(pMeta->pTaskDbUnique, pIter);
×
1422
  }
1423
  streamMutexUnlock(&pMeta->backendMutex);
95✔
1424
  return code;
95✔
1425
}
1426
int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo) {
95✔
1427
  if (pSnapInfo == NULL) return 0;
95!
1428
  SStreamMeta* pMeta = arg;
95✔
1429
  int32_t      code = 0;
95✔
1430
  int32_t      cap = 256;
95✔
1431
  int32_t      nBytes = 0;
95✔
1432
  streamMutexLock(&pMeta->backendMutex);
95✔
1433

1434
  char buf[256] = {0};
95✔
1435
  for (int i = 0; i < taosArrayGetSize(pSnapInfo); i++) {
95!
1436
    SStreamTaskSnap* pSnap = taosArrayGet(pSnapInfo, i);
×
1437
    nBytes = snprintf(buf, cap, "0x%" PRIx64 "-0x%x", pSnap->streamId, (int32_t)pSnap->taskId);
×
1438
    if (nBytes <= 0 || nBytes >= cap) {
×
1439
      code = TSDB_CODE_OUT_OF_RANGE;
×
1440
      break;
×
1441
    }
1442
    STaskDbWrapper** pTaskDb = taosHashGet(pMeta->pTaskDbUnique, buf, strlen(buf));
×
1443
    if (pTaskDb == NULL || *pTaskDb == NULL) {
×
1444
      stWarn("stream backend:%p failed to find task db, streamId:% " PRId64, pMeta, pSnap->streamId);
×
1445
      memset(buf, 0, sizeof(buf));
×
1446
      continue;
×
1447
    }
1448
    memset(buf, 0, sizeof(buf));
×
1449

1450
    taskDbUnRefChkp(*pTaskDb, pSnap->chkpId);
×
1451
  }
1452
  streamMutexUnlock(&pMeta->backendMutex);
95✔
1453
  return code;
95✔
1454
}
1455
#ifdef BUILD_NO_CALL
1456
int32_t streamBackendAddInUseChkp(void* arg, int64_t chkpId) {
1457
  // if (arg == NULL) return 0;
1458

1459
  // SStreamMeta* pMeta = arg;
1460
  // taosWLockLatch(&pMeta->chkpDirLock);
1461
  // taosArrayPush(pMeta->chkpInUse, &chkpId);
1462
  // taosWUnLockLatch(&pMeta->chkpDirLock);
1463
  return 0;
1464
}
1465
int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId) {
1466
  return 0;
1467
  // if (arg == NULL) return 0;
1468

1469
  // SStreamMeta* pMeta = arg;
1470
  // taosWLockLatch(&pMeta->chkpDirLock);
1471
  // if (taosArrayGetSize(pMeta->chkpInUse) > 0) {
1472
  //   int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpInUse, 0);
1473
  //   if (id == chkpId) {
1474
  //     taosArrayPopFrontBatch(pMeta->chkpInUse, 1);
1475
  //   }
1476
  // }
1477
  // taosWUnLockLatch(&pMeta->chkpDirLock);
1478
}
1479
#endif
1480

1481
/*
1482
   0
1483
*/
1484

1485
void* taskAcquireDb(int64_t refId) {
1✔
1486
  // acquire
1487
  void* p = taosAcquireRef(taskDbWrapperId, refId);
1✔
1488
  return p;
1✔
1489
}
1490
void taskReleaseDb(int64_t refId) {
1✔
1491
  // release
1492
  TAOS_UNUSED(taosReleaseRef(taskDbWrapperId, refId));
1✔
1493
}
1✔
1494

1495
int64_t taskGetDBRef(void* arg) {
2✔
1496
  if (arg == NULL) return -1;
2!
1497
  STaskDbWrapper* pDb = arg;
2✔
1498
  return pDb->refId;
2✔
1499
}
1500

1501
int32_t chkpLoadExtraInfo(char* pChkpIdDir, int64_t* chkpId, int64_t* processId) {
2,708✔
1502
  TdFilePtr pFile = NULL;
2,708✔
1503
  int32_t   code = 0;
2,708✔
1504

1505
  char    buf[256] = {0};
2,708✔
1506
  int32_t nBytes = 0;
2,708✔
1507

1508
  int32_t len = strlen(pChkpIdDir);
2,708✔
1509
  if (len == 0) {
2,708!
1510
    code = TSDB_CODE_INVALID_PARA;
×
1511
    stError("failed to load extra info, dir:%s, reason:%s", pChkpIdDir, tstrerror(code));
×
1512
    return code;
×
1513
  }
1514

1515
  int32_t cap = len + 64;
2,708✔
1516
  char*   pDst = taosMemoryCalloc(1, cap);
2,708!
1517
  if (pDst == NULL) {
2,708!
1518
    code = terrno;
×
1519
    stError("failed to alloc memory to load extra info, dir:%s", pChkpIdDir);
×
1520
    goto _EXIT;
×
1521
  }
1522

1523
  nBytes = snprintf(pDst, cap, "%s%sinfo", pChkpIdDir, TD_DIRSEP);
2,708✔
1524
  if (nBytes <= 0 || nBytes >= cap) {
2,708!
1525
    code = TSDB_CODE_OUT_OF_RANGE;
×
1526
    stError("failed to build dst to load extra info, dir:%s", pChkpIdDir);
×
1527
    goto _EXIT;
×
1528
  }
1529

1530
  pFile = taosOpenFile(pDst, TD_FILE_READ);
2,708✔
1531
  if (pFile == NULL) {
2,708✔
1532
    // compatible with previous version
1533
    *processId = -1;
2,692✔
1534
    code = 0;
2,692✔
1535
    stWarn("failed to open file to load extra info, file:%s, reason:%s", pDst, tstrerror(terrno));
2,692!
1536
    goto _EXIT;
2,692✔
1537
  }
1538

1539
  if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) {
16!
1540
    code = terrno;
×
1541
    stError("failed to read file to load extra info, file:%s, reason:%s", pDst, tstrerror(code));
×
1542
    goto _EXIT;
×
1543
  }
1544

1545
  if (sscanf(buf, "%" PRId64 " %" PRId64, chkpId, processId) < 2) {
16!
1546
    code = TSDB_CODE_INVALID_PARA;
×
1547
    stError("failed to read file content to load extra info, file:%s, reason:%s", pDst, tstrerror(code));
×
1548
    goto _EXIT;
×
1549
  }
1550
  code = 0;
16✔
1551
_EXIT:
2,708✔
1552
  taosMemoryFree(pDst);
2,708!
1553
  TAOS_UNUSED(taosCloseFile(&pFile));
2,708✔
1554
  return code;
2,708✔
1555
}
1556
int32_t chkpAddExtraInfo(char* pChkpIdDir, int64_t chkpId, int64_t processId) {
906✔
1557
  int32_t code = 0;
906✔
1558

1559
  TdFilePtr pFile = NULL;
906✔
1560

1561
  char    buf[256] = {0};
906✔
1562
  int32_t nBytes = 0;
906✔
1563

1564
  int32_t len = strlen(pChkpIdDir);
906✔
1565
  if (len == 0) {
906!
1566
    code = TSDB_CODE_INVALID_PARA;
×
1567
    stError("failed to add extra info, dir:%s, reason:%s", pChkpIdDir, tstrerror(code));
×
1568
    return code;
×
1569
  }
1570

1571
  int32_t cap = len + 64;
906✔
1572
  char*   pDst = taosMemoryCalloc(1, cap);
906!
1573
  if (pDst == NULL) {
906!
1574
    code = terrno;
×
1575
    stError("failed to alloc memory to add extra info, dir:%s", pChkpIdDir);
×
1576
    goto _EXIT;
×
1577
  }
1578

1579
  nBytes = snprintf(pDst, cap, "%s%sinfo", pChkpIdDir, TD_DIRSEP);
906✔
1580
  if (nBytes <= 0 || nBytes >= cap) {
906!
1581
    code = TSDB_CODE_OUT_OF_RANGE;
×
1582
    stError("failed to build dst to add extra info, dir:%s, reason:%s", pChkpIdDir, tstrerror(code));
×
1583
    goto _EXIT;
×
1584
  }
1585

1586
  pFile = taosOpenFile(pDst, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
906✔
1587
  if (pFile == NULL) {
906!
1588
    code = terrno;
×
1589
    stError("failed to open file to add extra info, file:%s, reason:%s", pDst, tstrerror(code));
×
1590
    goto _EXIT;
×
1591
  }
1592

1593
  nBytes = snprintf(buf, sizeof(buf), "%" PRId64 " %" PRId64, chkpId, processId);
906✔
1594
  if (nBytes <= 0 || nBytes >= sizeof(buf)) {
906!
1595
    code = TSDB_CODE_OUT_OF_RANGE;
×
1596
    stError("failed to build content to add extra info, dir:%s,reason:%s", pChkpIdDir, tstrerror(code));
×
1597
    goto _EXIT;
×
1598
  }
1599

1600
  if (nBytes != taosWriteFile(pFile, buf, nBytes)) {
906!
1601
    code = terrno;
×
1602
    stError("failed to write file to add extra info, file:%s, reason:%s", pDst, tstrerror(code));
×
1603
    goto _EXIT;
×
1604
  }
1605
  code = 0;
906✔
1606

1607
_EXIT:
906✔
1608
  TAOS_UNUSED(taosCloseFile(&pFile));
906✔
1609
  taosMemoryFree(pDst);
906!
1610
  return code;
906✔
1611
}
1612

1613
int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId, int64_t processedVer, SArray *pList) {
906✔
1614
  STaskDbWrapper* pTaskDb = arg;
906✔
1615
  int64_t         st = taosGetTimestampMs();
906✔
1616
  int32_t         code = 0;
906✔
1617
  int64_t         refId = pTaskDb->refId;
906✔
1618
  char*           pChkpDir = NULL;
906✔
1619
  char*           pChkpIdDir = NULL;
906✔
1620

1621
  if (taosAcquireRef(taskDbWrapperId, refId) == NULL) {
906!
1622
    code = terrno;
×
1623
    return code;
×
1624
  }
1625

1626
  if ((code = chkpPreBuildDir(pTaskDb->path, chkpId, &pChkpDir, &pChkpIdDir)) < 0) {
906!
1627
    goto _EXIT;
×
1628
  }
1629
  // Get all cf and acquire cfWrappter
1630
  rocksdb_column_family_handle_t** ppCf = NULL;
906✔
1631

1632
  int32_t nCf = chkpGetAllDbCfHandle2(pTaskDb, &ppCf);
906✔
1633
  stDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pTaskDb, pChkpIdDir, nCf);
906✔
1634

1635
  int64_t written = atomic_load_64(&pTaskDb->dataWritten);
906✔
1636

1637
  // flush db
1638
  if (written > 0) {
906✔
1639
    stDebug("stream backend:%p start to flush db at:%s, data written:%" PRId64, pTaskDb, pChkpIdDir, written);
743✔
1640
    code = chkpPreFlushDb(pTaskDb->db, ppCf, nCf);
743✔
1641
    if (code != 0) goto _EXIT;
743!
1642
  } else {
1643
    stDebug("stream backend:%p not need flush db at:%s, data written:%" PRId64, pTaskDb, pChkpIdDir, written);
163✔
1644
  }
1645

1646
  // do checkpoint
1647
  if ((code = chkpDoDbCheckpoint(pTaskDb->db, pChkpIdDir)) != 0) {
906!
1648
    stError("stream backend:%p failed to do checkpoint at:%s", pTaskDb, pChkpIdDir);
×
1649
    goto _EXIT;
×
1650
  } else {
1651
    stDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pTaskDb, pChkpIdDir,
1,801✔
1652
            taosGetTimestampMs() - st);
1653
  }
1654

1655
  // add extra info to checkpoint
1656
  if ((code = chkpAddExtraInfo(pChkpIdDir, chkpId, processedVer)) != 0) {
906!
1657
    stError("stream backend:%p failed to add extra info to checkpoint at:%s", pTaskDb, pChkpIdDir);
×
1658
    goto _EXIT;
×
1659
  }
1660

1661
  // delete ttl checkpoint
1662
  code = chkpMayDelObsolete(pTaskDb, chkpId, pChkpDir, pList);
906✔
1663
  if (code < 0) {
906!
1664
    goto _EXIT;
×
1665
  }
1666

1667
  TAOS_UNUSED(atomic_store_64(&pTaskDb->dataWritten, 0));
906✔
1668
  pTaskDb->chkpId = chkpId;
906✔
1669

1670
_EXIT:
906✔
1671

1672
  // clear checkpoint dir if failed
1673
  if (code != 0 && pChkpDir != NULL) {
906!
1674
    if (taosDirExist(pChkpIdDir)) {
×
1675
      taosRemoveDir(pChkpIdDir);
×
1676
    }
1677
  }
1678
  taosMemoryFree(pChkpIdDir);
906!
1679
  taosMemoryFree(pChkpDir);
906!
1680

1681
  TAOS_UNUSED(taosReleaseRef(taskDbWrapperId, refId));
906✔
1682
  taosMemoryFree(ppCf);
906!
1683
  return code;
906✔
1684
}
1685

1686
int32_t streamBackendDoCheckpoint(void* arg, int64_t chkpId, int64_t processVer, SArray* pList) {
901✔
1687
  return taskDbDoCheckpoint(arg, chkpId, processVer, pList);
901✔
1688
}
1689

1690
SListNode* streamBackendAddCompare(void* backend, void* arg) {
×
1691
  SBackendWrapper* pHandle = (SBackendWrapper*)backend;
×
1692
  SListNode*       node = NULL;
×
1693
  streamMutexLock(&pHandle->mutex);
×
1694
  node = tdListAdd(pHandle->list, arg);
×
1695
  streamMutexUnlock(&pHandle->mutex);
×
1696
  return node;
×
1697
}
1698
void streamBackendDelCompare(void* backend, void* arg) {
×
1699
  SBackendWrapper* pHandle = (SBackendWrapper*)backend;
×
1700
  SListNode*       node = NULL;
×
1701
  streamMutexLock(&pHandle->mutex);
×
1702
  node = tdListPopNode(pHandle->list, arg);
×
1703
  streamMutexUnlock(&pHandle->mutex);
×
1704
  if (node) {
×
1705
    streamStateDestroyCompar(node->data);
×
1706
    taosMemoryFree(node);
×
1707
  }
1708
}
×
1709
void destroyRocksdbCfInst(RocksdbCfInst* inst) {
×
1710
  int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
×
1711
  if (inst->pHandle) {
×
1712
    for (int i = 0; i < cfLen; i++) {
×
1713
      if (inst->pHandle[i]) rocksdb_column_family_handle_destroy((inst->pHandle)[i]);
×
1714
    }
1715
    taosMemoryFree(inst->pHandle);
×
1716
  }
1717

1718
  if (inst->cfOpt) {
×
1719
    for (int i = 0; i < cfLen; i++) {
×
1720
      rocksdb_options_destroy(inst->cfOpt[i]);
×
1721
      rocksdb_block_based_options_destroy(((RocksdbCfParam*)inst->param)[i].tableOpt);
×
1722
    }
1723
    taosMemoryFreeClear(inst->cfOpt);
×
1724
    taosMemoryFreeClear(inst->param);
×
1725
  }
1726
  if (inst->wOpt) rocksdb_writeoptions_destroy(inst->wOpt);
×
1727
  if (inst->rOpt) rocksdb_readoptions_destroy(inst->rOpt);
×
1728

1729
  taosMemoryFree(inst);
×
1730
}
×
1731

1732
// |key|-----value------|
1733
// |key|ttl|len|userData
1734

1735
int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
24,653✔
1736
  int len = aLen < bLen ? aLen : bLen;
24,653✔
1737
  int ret = memcmp(aBuf, bBuf, len);
24,653✔
1738
  if (ret == 0) {
24,653✔
1739
    if (aLen < bLen)
6,250✔
1740
      return -1;
268✔
1741
    else if (aLen > bLen)
5,982✔
1742
      return 1;
58✔
1743
    else
1744
      return 0;
5,924✔
1745
  } else {
1746
    return ret;
18,403✔
1747
  }
1748
}
1749
int streamStateValueIsStale(char* v) {
16,305✔
1750
  int64_t ts = 0;
16,305✔
1751
  TAOS_UNUSED(taosDecodeFixedI64(v, &ts));
1752
  return (ts != 0 && ts < taosGetTimestampMs()) ? 1 : 0;
16,305!
1753
}
1754
int iterValueIsStale(rocksdb_iterator_t* iter) {
8,667✔
1755
  size_t len;
1756
  char*  v = (char*)rocksdb_iter_value(iter, &len);
8,667✔
1757
  return streamStateValueIsStale(v);
8,651✔
1758
}
1759
int defaultKeyEncode(void* k, char* buf) {
13,011✔
1760
  int len = strlen((char*)k);
13,011✔
1761
  memcpy(buf, (char*)k, len);
13,011✔
1762
  return len;
13,011✔
1763
}
1764
int defaultKeyDecode(void* k, char* buf) {
×
1765
  int len = strlen(buf);
×
1766
  memcpy(k, buf, len);
×
1767
  return len;
×
1768
}
1769
int defaultKeyToString(void* k, char* buf) {
6,016✔
1770
  // just to debug
1771
  return sprintf(buf, "key: %s", (char*)k);
6,016✔
1772
}
1773
//
1774
//  SStateKey
1775
//  |--groupid--|---ts------|--opNum----|
1776
//  |--uint64_t-|-uint64_t--|--int64_t--|
1777
//
1778
//
1779
//
1780
int stateKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
43,286,176✔
1781
  SStateKey key1, key2;
1782
  memset(&key1, 0, sizeof(key1));
43,286,176✔
1783
  memset(&key2, 0, sizeof(key2));
43,286,176✔
1784

1785
  char* p1 = (char*)aBuf;
43,286,176✔
1786
  char* p2 = (char*)bBuf;
43,286,176!
1787

1788
  p1 = taosDecodeFixedU64(p1, &key1.key.groupId);
43,286,176!
1789
  p2 = taosDecodeFixedU64(p2, &key2.key.groupId);
43,286,176!
1790

1791
  p1 = taosDecodeFixedI64(p1, &key1.key.ts);
43,286,176!
1792
  p2 = taosDecodeFixedI64(p2, &key2.key.ts);
43,286,176!
1793

1794
  TAOS_UNUSED(taosDecodeFixedI64(p1, &key1.opNum));
1795
  TAOS_UNUSED(taosDecodeFixedI64(p2, &key2.opNum));
1796

1797
  return stateKeyCmpr(&key1, sizeof(key1), &key2, sizeof(key2));
43,286,176✔
1798
}
1799

1800
int stateKeyEncode(void* k, char* buf) {
437,234✔
1801
  SStateKey* key = k;
437,234✔
1802
  int        len = 0;
437,234✔
1803
  len += taosEncodeFixedU64((void**)&buf, key->key.groupId);
437,234!
1804
  len += taosEncodeFixedI64((void**)&buf, key->key.ts);
437,234!
1805
  len += taosEncodeFixedI32((void**)&buf, key->key.numInGroup);
437,234!
1806
  len += taosEncodeFixedI64((void**)&buf, key->opNum);
437,234!
1807
  return len;
437,234✔
1808
}
1809
int stateKeyDecode(void* k, char* buf) {
13✔
1810
  SStateKey* key = k;
13✔
1811
  int        len = 0;
13✔
1812
  char*      p = buf;
13✔
1813
  p = taosDecodeFixedU64(p, &key->key.groupId);
13!
1814
  p = taosDecodeFixedI64(p, &key->key.ts);
13!
1815
  p = taosDecodeFixedI32(p, &key->key.numInGroup);
13!
1816
  p = taosDecodeFixedI64(p, &key->opNum);
13!
1817
  return p - buf;
13✔
1818
}
1819

1820
int32_t stateKeyToString(void* k, char* buf) {
860,384✔
1821
  SStateKey* key = k;
860,384✔
1822
  int        n = 0;
860,384✔
1823
  n += sprintf(buf + n, "[groupId:%" PRIu64 ",", key->key.groupId);
860,384✔
1824
  n += sprintf(buf + n, "ts:%" PRIi64 ",", key->key.ts);
860,384✔
1825
  n += sprintf(buf + n, "opNum:%" PRIi64 "]", key->opNum);
860,384✔
1826
  return n;
860,384✔
1827
}
1828

1829
//
1830
// SStateSessionKey
1831
//  |-----------SSessionKey----------|
1832
//  |-----STimeWindow-----|
1833
//  |---skey--|---ekey----|--groupId-|--opNum--|
1834
//  |---int64-|--int64_t--|--uint64--|--int64_t|
1835
// |
1836
//
1837
int stateSessionKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
366,663✔
1838
  SStateSessionKey w1, w2;
1839
  memset(&w1, 0, sizeof(w1));
366,663✔
1840
  memset(&w2, 0, sizeof(w2));
366,663✔
1841

1842
  char* p1 = (char*)aBuf;
366,663✔
1843
  char* p2 = (char*)bBuf;
366,663!
1844

1845
  p1 = taosDecodeFixedI64(p1, &w1.key.win.skey);
366,663!
1846
  p2 = taosDecodeFixedI64(p2, &w2.key.win.skey);
366,663✔
1847

1848
  p1 = taosDecodeFixedI64(p1, &w1.key.win.ekey);
366,663✔
1849
  p2 = taosDecodeFixedI64(p2, &w2.key.win.ekey);
366,663!
1850

1851
  p1 = taosDecodeFixedU64(p1, &w1.key.groupId);
366,663!
1852
  p2 = taosDecodeFixedU64(p2, &w2.key.groupId);
366,663!
1853

1854
  p1 = taosDecodeFixedI64(p1, &w1.opNum);
366,663!
1855
  p2 = taosDecodeFixedI64(p2, &w2.opNum);
366,663✔
1856

1857
  return stateSessionKeyCmpr(&w1, sizeof(w1), &w2, sizeof(w2));
366,663✔
1858
}
1859
int stateSessionKeyEncode(void* k, char* buf) {
7,997✔
1860
  SStateSessionKey* sess = k;
7,997✔
1861
  int               len = 0;
7,997✔
1862
  len += taosEncodeFixedI64((void**)&buf, sess->key.win.skey);
7,997!
1863
  len += taosEncodeFixedI64((void**)&buf, sess->key.win.ekey);
7,997!
1864
  len += taosEncodeFixedU64((void**)&buf, sess->key.groupId);
7,997!
1865
  len += taosEncodeFixedI64((void**)&buf, sess->opNum);
7,997!
1866
  return len;
7,997✔
1867
}
1868
int stateSessionKeyDecode(void* k, char* buf) {
3,833✔
1869
  SStateSessionKey* sess = k;
3,833✔
1870
  int               len = 0;
3,833✔
1871

1872
  char* p = buf;
3,833✔
1873
  p = taosDecodeFixedI64(p, &sess->key.win.skey);
3,833!
1874
  p = taosDecodeFixedI64(p, &sess->key.win.ekey);
3,833!
1875
  p = taosDecodeFixedU64(p, &sess->key.groupId);
3,833!
1876
  p = taosDecodeFixedI64(p, &sess->opNum);
3,833!
1877
  return p - buf;
3,833✔
1878
}
1879
int stateSessionKeyToString(void* k, char* buf) {
1,408✔
1880
  SStateSessionKey* key = k;
1,408✔
1881
  int               n = 0;
1,408✔
1882
  n += sprintf(buf + n, "[skey:%" PRIi64 ",", key->key.win.skey);
1,408✔
1883
  n += sprintf(buf + n, "ekey:%" PRIi64 ",", key->key.win.ekey);
1,408✔
1884
  n += sprintf(buf + n, "groupId:%" PRIu64 ",", key->key.groupId);
1,408✔
1885
  n += sprintf(buf + n, "opNum:%" PRIi64 "]", key->opNum);
1,408✔
1886
  return n;
1,408✔
1887
}
1888

1889
/**
1890
 *  SWinKey
1891
 *  |------groupId------|-----ts------|
1892
 *  |------uint64-------|----int64----|
1893
 */
1894
int winKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
101,015✔
1895
  SWinKey w1, w2;
1896
  memset(&w1, 0, sizeof(w1));
101,015✔
1897
  memset(&w2, 0, sizeof(w2));
101,015✔
1898

1899
  char* p1 = (char*)aBuf;
101,015✔
1900
  char* p2 = (char*)bBuf;
101,015!
1901

1902
  p1 = taosDecodeFixedU64(p1, &w1.groupId);
101,015!
1903
  p2 = taosDecodeFixedU64(p2, &w2.groupId);
101,015!
1904

1905
  p1 = taosDecodeFixedI64(p1, &w1.ts);
101,015!
1906
  p2 = taosDecodeFixedI64(p2, &w2.ts);
101,015✔
1907

1908
  return winKeyCmpr(&w1, sizeof(w1), &w2, sizeof(w2));
101,015✔
1909
}
1910

1911
int winKeyEncode(void* k, char* buf) {
5,237✔
1912
  SWinKey* key = k;
5,237✔
1913
  int      len = 0;
5,237✔
1914
  len += taosEncodeFixedU64((void**)&buf, key->groupId);
5,237!
1915
  len += taosEncodeFixedI64((void**)&buf, key->ts);
5,237!
1916
  len += taosEncodeFixedI32((void**)&buf, key->numInGroup);
5,237!
1917
  return len;
5,237✔
1918
}
1919

1920
int winKeyDecode(void* k, char* buf) {
3,775✔
1921
  SWinKey* key = k;
3,775✔
1922
  int      len = 0;
3,775✔
1923
  char*    p = buf;
3,775✔
1924
  p = taosDecodeFixedU64(p, &key->groupId);
3,775!
1925
  p = taosDecodeFixedI64(p, &key->ts);
3,775!
1926
  p = taosDecodeFixedI32(p, &key->numInGroup);
3,775!
1927
  return len;
3,775✔
1928
}
1929

1930
int winKeyToString(void* k, char* buf) {
1,252✔
1931
  SWinKey* key = k;
1,252✔
1932
  int      n = 0;
1,252✔
1933
  n += sprintf(buf + n, "[groupId:%" PRIu64 ",", key->groupId);
1,252✔
1934
  n += sprintf(buf + n, "ts:%" PRIi64 "]", key->ts);
1,252✔
1935
  return n;
1,252✔
1936
}
1937
/*
1938
 * STupleKey
1939
 * |---groupId---|---ts---|---exprIdx---|
1940
 * |---uint64--|---int64--|---int32-----|
1941
 */
1942
int tupleKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
11,140✔
1943
  STupleKey w1, w2;
1944
  memset(&w1, 0, sizeof(w1));
11,140✔
1945
  memset(&w2, 0, sizeof(w2));
11,140✔
1946

1947
  char* p1 = (char*)aBuf;
11,140✔
1948
  char* p2 = (char*)bBuf;
11,140!
1949

1950
  p1 = taosDecodeFixedU64(p1, &w1.groupId);
11,140!
1951
  p2 = taosDecodeFixedU64(p2, &w2.groupId);
11,140!
1952

1953
  p1 = taosDecodeFixedI64(p1, &w1.ts);
11,140!
1954
  p2 = taosDecodeFixedI64(p2, &w2.ts);
11,140!
1955

1956
  p1 = taosDecodeFixedI32(p1, &w1.exprIdx);
11,140!
1957
  p2 = taosDecodeFixedI32(p2, &w2.exprIdx);
11,140✔
1958

1959
  return STupleKeyCmpr(&w1, sizeof(w1), &w2, sizeof(w2));
11,140✔
1960
}
1961

1962
int tupleKeyEncode(void* k, char* buf) {
300✔
1963
  STupleKey* key = k;
300✔
1964
  int        len = 0;
300✔
1965
  len += taosEncodeFixedU64((void**)&buf, key->groupId);
300!
1966
  len += taosEncodeFixedI64((void**)&buf, key->ts);
300!
1967
  len += taosEncodeFixedI32((void**)&buf, key->exprIdx);
300!
1968
  return len;
300✔
1969
}
1970
int tupleKeyDecode(void* k, char* buf) {
×
1971
  STupleKey* key = k;
×
1972
  int        len = 0;
×
1973
  char*      p = buf;
×
1974
  p = taosDecodeFixedU64(p, &key->groupId);
×
1975
  p = taosDecodeFixedI64(p, &key->ts);
×
1976
  p = taosDecodeFixedI32(p, &key->exprIdx);
×
1977
  return len;
×
1978
}
1979
int tupleKeyToString(void* k, char* buf) {
100✔
1980
  int        n = 0;
100✔
1981
  STupleKey* key = k;
100✔
1982
  n += sprintf(buf + n, "[groupId:%" PRIu64 ",", key->groupId);
100✔
1983
  n += sprintf(buf + n, "ts:%" PRIi64 ",", key->ts);
100✔
1984
  n += sprintf(buf + n, "exprIdx:%d]", key->exprIdx);
100✔
1985
  return n;
100✔
1986
}
1987

1988
int parKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
31,535✔
1989
  int64_t w1, w2;
1990
  memset(&w1, 0, sizeof(w1));
1991
  memset(&w2, 0, sizeof(w2));
1992
  char* p1 = (char*)aBuf;
31,535✔
1993
  char* p2 = (char*)bBuf;
31,535✔
1994

1995
  TAOS_UNUSED(taosDecodeFixedI64(p1, &w1));
1996
  TAOS_UNUSED(taosDecodeFixedI64(p2, &w2));
1997
  if (w1 == w2) {
31,535✔
1998
    return 0;
5,707✔
1999
  } else {
2000
    return w1 < w2 ? -1 : 1;
25,828✔
2001
  }
2002
}
2003
int parKeyEncode(void* k, char* buf) {
228,319✔
2004
  int64_t* groupid = k;
228,319✔
2005
  int      len = taosEncodeFixedI64((void**)&buf, *groupid);
228,319✔
2006
  return len;
228,319✔
2007
}
2008
int parKeyDecode(void* k, char* buf) {
1,022✔
2009
  char*    p = buf;
1,022✔
2010
  int64_t* groupid = k;
1,022✔
2011

2012
  p = taosDecodeFixedI64(p, groupid);
1,022✔
2013
  return p - buf;
1,022✔
2014
}
2015
int parKeyToString(void* k, char* buf) {
2,667✔
2016
  int64_t* key = k;
2,667✔
2017
  int      n = 0;
2,667✔
2018
  n = sprintf(buf + n, "[groupId:%" PRIi64 "]", *key);
2,667✔
2019
  return n;
2,667✔
2020
}
2021
int32_t valueToString(void* k, char* buf) {
×
2022
  SStreamValue* key = k;
×
2023
  int           n = 0;
×
2024
  n += sprintf(buf + n, "[unixTimestamp:%" PRIi64 ",", key->unixTimestamp);
×
2025
  n += sprintf(buf + n, "len:%d,", key->len);
×
2026
  n += sprintf(buf + n, "data:%s]", key->data);
×
2027
  return n;
×
2028
}
2029

2030
/*1: stale, 0: no stale*/
2031
int32_t valueIsStale(void* k, int64_t ts) {
×
2032
  SStreamValue* key = k;
×
2033
  if (key->unixTimestamp < ts) {
×
2034
    return 1;
×
2035
  }
2036
  return 0;
×
2037
}
2038

2039
void destroyCompare(void* arg) {
18,653✔
2040
  TAOS_UNUSED(arg);
2041
  return;
18,653✔
2042
}
2043

2044
int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest) {
436,492✔
2045
  int32_t      code = 0;
436,492✔
2046
  SStreamValue key = {.unixTimestamp = ttl, .len = vlen, .rawLen = vlen, .compress = 0, .data = (char*)(value)};
436,492✔
2047
  int32_t      len = 0;
436,492✔
2048
  char*        dst = NULL;
436,492✔
2049
  if (vlen > 512) {
436,492✔
2050
    dst = taosMemoryCalloc(1, vlen + 128);
322,738!
2051
    if (dst == NULL) {
322,738!
2052
      return terrno;
×
2053
    }
2054
    int32_t dstCap = vlen + 128;
322,738✔
2055
    int32_t compressedSize = LZ4_compress_default((char*)value, dst, vlen, dstCap);
322,738✔
2056
    if (compressedSize < vlen) {
322,738!
2057
      key.compress = 1;
322,738✔
2058
      key.len = compressedSize;
322,738✔
2059
      value = dst;
322,738✔
2060
    }
2061
  }
2062

2063
  if (*dest == NULL) {
436,492✔
2064
    size_t size = sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len;
8,892✔
2065
    char*  p = taosMemoryCalloc(1, size);
8,892!
2066
    if (p == NULL) {
8,892!
2067
      code = terrno;
×
2068
      goto _exception;
×
2069
    }
2070
    char* buf = p;
8,892✔
2071
    len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp);
8,892!
2072
    len += taosEncodeFixedI32((void**)&buf, key.len);
8,892!
2073
    len += taosEncodeFixedI32((void**)&buf, key.rawLen);
8,892!
2074
    len += taosEncodeFixedI8((void**)&buf, key.compress);
8,892!
2075
    if (value != NULL && key.len != 0) {
8,892✔
2076
      len += taosEncodeBinary((void**)&buf, (char*)value, key.len);
11,658!
2077
    }
2078
    *dest = p;
8,892✔
2079
  } else {
2080
    char* buf = *dest;
427,600✔
2081
    len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp);
427,600!
2082
    len += taosEncodeFixedI32((void**)&buf, key.len);
427,600!
2083
    len += taosEncodeFixedI32((void**)&buf, key.rawLen);
427,600!
2084
    len += taosEncodeFixedI8((void**)&buf, key.compress);
427,600!
2085
    if (value != NULL && key.len != 0) {
427,600!
2086
      len += taosEncodeBinary((void**)&buf, (char*)value, key.len);
855,200!
2087
    }
2088
  }
2089

2090
  taosMemoryFree(dst);
436,492!
2091
  return len;
436,492✔
2092
_exception:
×
2093
  taosMemoryFree(dst);
×
2094
  return code;
×
2095
}
2096

2097
/*
2098
 *  ret >= 0 : found valid value
2099
 *  ret < 0 : error or timeout
2100
 */
2101
int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest) {
7,393✔
2102
  int32_t      code = -1;
7,393✔
2103
  SStreamValue key = {0};
7,393✔
2104
  char*        p = value;
7,393✔
2105

2106
  char* pCompressData = NULL;
7,393✔
2107
  char* pOutput = NULL;
7,393✔
2108
  if (streamStateValueIsStale(p)) {
7,393!
2109
    code = TSDB_CODE_INVALID_DATA_FMT;
×
2110
    goto _EXCEPT;
×
2111
  }
2112

2113
  p = taosDecodeFixedI64(p, &key.unixTimestamp);
7,393!
2114
  p = taosDecodeFixedI32(p, &key.len);
7,393✔
2115
  if (key.len == 0) {
7,393!
2116
    code = 0;
×
2117
    goto _EXCEPT;
×
2118
  }
2119
  if (vlen == (sizeof(key.unixTimestamp) + sizeof(key.len) + key.len)) {
7,393!
2120
    // compatiable with previous data
2121
    p = taosDecodeBinary(p, (void**)&pOutput, key.len);
×
2122
    if (p == NULL) {
×
2123
      code = terrno;
×
2124
      goto _EXCEPT;
×
2125
    }
2126

2127
  } else {
2128
    p = taosDecodeFixedI32(p, &key.rawLen);
7,393✔
2129
    p = taosDecodeFixedI8(p, &key.compress);
7,393✔
2130
    if (vlen != (sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len)) {
7,393!
2131
      stError("vlen: %d, read len: %d", vlen, key.len);
×
2132
      code = TSDB_CODE_INVALID_DATA_FMT;
×
2133
      goto _EXCEPT;
×
2134
    }
2135
    if (key.compress == 1) {
7,393✔
2136
      p = taosDecodeBinary(p, (void**)&pCompressData, key.len);
1,908!
2137
      if (p == NULL) {
1,908!
2138
        code = terrno;
×
2139
        goto _EXCEPT;
×
2140
      }
2141
      pOutput = taosMemoryCalloc(1, key.rawLen);
1,908!
2142
      if (pOutput == NULL) {
1,908!
2143
        code = terrno;
×
2144
        goto _EXCEPT;
×
2145
      }
2146

2147
      int32_t rawLen = LZ4_decompress_safe(pCompressData, pOutput, key.len, key.rawLen);
1,908✔
2148
      if (rawLen != key.rawLen) {
1,908!
2149
        stError("read invalid read, rawlen: %d, currlen: %d", key.rawLen, key.len);
×
2150
        code = TSDB_CODE_INVALID_DATA_FMT;
×
2151
        goto _EXCEPT;
×
2152
      }
2153
      key.len = rawLen;
1,908✔
2154
    } else {
2155
      p = taosDecodeBinary(p, (void**)&pOutput, key.len);
5,485!
2156
      if (p == NULL) {
5,485!
2157
        code = terrno;
×
2158
        goto _EXCEPT;
×
2159
      }
2160
    }
2161
  }
2162

2163
  if (ttl != NULL) *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - taosGetTimestampMs();
7,393!
2164

2165
  code = 0;
7,393✔
2166
  if (dest) {
7,393✔
2167
    *dest = pOutput;
6,894✔
2168
    pOutput = NULL;
6,894✔
2169
  }
2170
  taosMemoryFree(pCompressData);
7,393!
2171
  taosMemoryFree(pOutput);
7,393!
2172
  return key.len;
7,393✔
2173

2174
_EXCEPT:
×
2175
  if (dest != NULL) *dest = NULL;
×
2176
  if (ttl != NULL) *ttl = 0;
×
2177

2178
  taosMemoryFree(pOutput);
×
2179
  taosMemoryFree(pCompressData);
×
2180
  return code;
×
2181
}
2182

2183
const char* compareDefaultName(void* arg) {
41,032✔
2184
  TAOS_UNUSED(arg);
2185
  return ginitDict[0].key;
41,032✔
2186
}
2187
const char* compareStateName(void* arg) {
12,822✔
2188
  TAOS_UNUSED(arg);
2189
  return ginitDict[1].key;
12,822✔
2190
}
2191
const char* compareWinKeyName(void* arg) { return ginitDict[2].key; }
2,676✔
2192
const char* compareSessionKeyName(void* arg) {
3,220✔
2193
  TAOS_UNUSED(arg);
2194
  return ginitDict[3].key;
3,220✔
2195
}
2196
const char* compareFuncKeyName(void* arg) {
15✔
2197
  TAOS_UNUSED(arg);
2198
  return ginitDict[4].key;
15✔
2199
}
2200
const char* compareParKeyName(void* arg) {
5,151✔
2201
  TAOS_UNUSED(arg);
2202
  return ginitDict[5].key;
5,151✔
2203
}
2204
const char* comparePartagKeyName(void* arg) {
936✔
2205
  TAOS_UNUSED(arg);
2206
  return ginitDict[6].key;
936✔
2207
}
2208

2209
void destroyCompactFilteFactory(void* arg) {
21,318✔
2210
  if (arg == NULL) return;
21,318!
2211
}
2212
const char* compactFilteFactoryName(void* arg) {
26,420✔
2213
  SCompactFilteFactory* state = arg;
26,420✔
2214
  return "stream_compact_factory_filter_default";
26,420✔
2215
}
2216
const char* compactFilteFactoryNameSess(void* arg) {
1,623✔
2217
  SCompactFilteFactory* state = arg;
1,623✔
2218
  return "stream_compact_factory_filter_sess";
1,623✔
2219
}
2220
const char* compactFilteFactoryNameState(void* arg) {
6,860✔
2221
  SCompactFilteFactory* state = arg;
6,860✔
2222
  return "stream_compact_factory_filter_state";
6,860✔
2223
}
2224
const char* compactFilteFactoryNameFill(void* arg) {
1,552✔
2225
  SCompactFilteFactory* state = arg;
1,552✔
2226
  return "stream_compact_factory_filter_fill";
1,552✔
2227
}
2228
const char* compactFilteFactoryNameFunc(void* arg) {
9✔
2229
  SCompactFilteFactory* state = arg;
9✔
2230
  return "stream_compact_factory_filter_func";
9✔
2231
}
2232

2233
void          destroyCompactFilte(void* arg) { TAOS_UNUSED(arg); }
151✔
2234
unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
243✔
2235
                           char** newval, size_t* newvlen, unsigned char* value_changed) {
2236
  return streamStateValueIsStale((char*)val) ? 1 : 0;
243✔
2237
}
2238
const char* compactFilteName(void* arg) { return "stream_filte_default"; }
×
2239
const char* compactFilteNameSess(void* arg) { return "stream_filte_sess"; }
×
2240
const char* compactFilteNameState(void* arg) { return "stream_filte_state"; }
×
2241
const char* compactFilteNameFill(void* arg) { return "stream_filte_fill"; }
×
2242
const char* compactFilteNameFunc(void* arg) { return "stream_filte_func"; }
×
2243

2244
unsigned char compactFilteSess(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
123✔
2245
                               char** newval, size_t* newvlen, unsigned char* value_changed) {
2246
  // not impl yet
2247
  return 0;
123✔
2248
}
2249

2250
unsigned char compactFilteState(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
2,214✔
2251
                                char** newval, size_t* newvlen, unsigned char* value_changed) {
2252
  // not impl yet
2253
  return 0;
2,214✔
2254
}
2255

2256
unsigned char compactFilteFill(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
14✔
2257
                               char** newval, size_t* newvlen, unsigned char* value_changed) {
2258
  // not impl yet
2259
  return 0;
14✔
2260
}
2261

2262
unsigned char compactFilteFunc(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
×
2263
                               char** newval, size_t* newvlen, unsigned char* value_changed) {
2264
  // not impl yet
2265
  return 0;
×
2266
  // return streamStateValueIsStale((char*)val) ? 1 : 0;
2267
}
2268

2269
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx) {
87✔
2270
  SCompactFilteFactory*       state = arg;
87✔
2271
  rocksdb_compactionfilter_t* filter =
2272
      rocksdb_compactionfilter_create(state, destroyCompactFilte, compactFilte, compactFilteName);
87✔
2273
  return filter;
87✔
2274
}
2275
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterSess(void* arg, rocksdb_compactionfiltercontext_t* ctx) {
38✔
2276
  SCompactFilteFactory*       state = arg;
38✔
2277
  rocksdb_compactionfilter_t* filter =
2278
      rocksdb_compactionfilter_create(state, destroyCompactFilte, compactFilteSess, compactFilteNameSess);
38✔
2279
  return filter;
38✔
2280
}
2281
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterState(void* arg, rocksdb_compactionfiltercontext_t* ctx) {
18✔
2282
  SCompactFilteFactory*       state = arg;
18✔
2283
  rocksdb_compactionfilter_t* filter =
2284
      rocksdb_compactionfilter_create(state, destroyCompactFilte, compactFilteState, compactFilteNameState);
18✔
2285
  return filter;
18✔
2286
}
2287
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFill(void* arg, rocksdb_compactionfiltercontext_t* ctx) {
8✔
2288
  SCompactFilteFactory*       state = arg;
8✔
2289
  rocksdb_compactionfilter_t* filter =
2290
      rocksdb_compactionfilter_create(state, destroyCompactFilte, compactFilteFill, compactFilteNameFill);
8✔
2291
  return filter;
8✔
2292
}
2293
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFunc(void* arg, rocksdb_compactionfiltercontext_t* ctx) {
×
2294
  SCompactFilteFactory*       state = arg;
×
2295
  rocksdb_compactionfilter_t* filter =
2296
      rocksdb_compactionfilter_create(state, destroyCompactFilte, compactFilteFunc, compactFilteNameFunc);
×
2297
  return filter;
×
2298
}
2299

2300
int32_t taskDbOpenCfs(STaskDbWrapper* pTask, char* path, char** pCfNames, int32_t nCf) {
2,705✔
2301
  int32_t code = -1;
2,705✔
2302
  char*   err = NULL;
2,705✔
2303

2304
  rocksdb_options_t** cfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*));
2,705!
2305
  if (cfOpts == NULL) {
2,705!
2306
    return terrno;
×
2307
  }
2308
  rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
2,705!
2309
  if (cfHandle == NULL) {
2,705!
2310
    return terrno;
×
2311
  }
2312

2313
  for (int i = 0; i < nCf; i++) {
5,429✔
2314
    int32_t idx = getCfIdx(pCfNames[i]);
2,724✔
2315
    cfOpts[i] = pTask->pCfOpts[idx];
2,724✔
2316
  }
2317

2318
  rocksdb_t* db = rocksdb_open_column_families(pTask->dbOpt, path, nCf, (const char* const*)pCfNames,
2,705✔
2319
                                               (const rocksdb_options_t* const*)cfOpts, cfHandle, &err);
2320

2321
  if (err != NULL) {
2,705!
2322
    stError("failed to open cf path: %s", err);
×
2323
    taosMemoryFree(err);
×
2324
    goto _EXIT;
×
2325
  }
2326

2327
  for (int i = 0; i < nCf; i++) {
5,429✔
2328
    int32_t idx = getCfIdx(pCfNames[i]);
2,724✔
2329
    pTask->pCf[idx] = cfHandle[i];
2,724✔
2330
  }
2331

2332
  pTask->db = db;
2,705✔
2333
  code = 0;
2,705✔
2334

2335
_EXIT:
2,705✔
2336
  taosMemoryFree(cfOpts);
2,705!
2337
  taosMemoryFree(cfHandle);
2,705!
2338
  return code;
2,705✔
2339
}
2340

2341
void* taskDbAddRef(void* pTaskDb) {
941✔
2342
  STaskDbWrapper* pBackend = pTaskDb;
941✔
2343
  return taosAcquireRef(taskDbWrapperId, pBackend->refId);
941✔
2344
}
2345

2346
void taskDbRemoveRef(void* pTaskDb) {
3,606✔
2347
  if (pTaskDb == NULL) {
3,606!
2348
    return;
×
2349
  }
2350

2351
  STaskDbWrapper* pBackend = pTaskDb;
3,606✔
2352
  TAOS_UNUSED(taosReleaseRef(taskDbWrapperId, pBackend->refId));
3,606✔
2353
}
2354

2355
void taskDbSetClearFileFlag(void* pTaskDb) {
1,386✔
2356
  if (pTaskDb == NULL) {
1,386!
2357
    return;
×
2358
  }
2359

2360
  STaskDbWrapper* pBackend = pTaskDb;
1,386✔
2361
  atomic_store_8(&pBackend->removeAllFiles, 1);
1,386✔
2362
}
2363

2364
void taskDbInitOpt(STaskDbWrapper* pTaskDb) {
2,705✔
2365
  rocksdb_env_t* env = rocksdb_create_default_env();
2,705✔
2366

2367
  rocksdb_cache_t*   cache = rocksdb_cache_create_lru(256);
2,704✔
2368
  rocksdb_options_t* opts = rocksdb_options_create();
2,704✔
2369
  rocksdb_options_set_env(opts, env);
2,704✔
2370
  rocksdb_options_set_create_if_missing(opts, 1);
2,703✔
2371
  rocksdb_options_set_create_missing_column_families(opts, 1);
2,704✔
2372
  // rocksdb_options_set_max_total_wal_size(opts, dbMemLimit);
2373
  // rocksdb_options_set_ecycle_log_file_num(opts, 6);
2374
  rocksdb_options_set_max_write_buffer_number(opts, 3);
2,703✔
2375
  rocksdb_options_set_info_log_level(opts, 1);
2,703✔
2376
  rocksdb_options_set_db_write_buffer_size(opts, 256 << 20);
2,703✔
2377
  rocksdb_options_set_write_buffer_size(opts, 128 << 20);
2,704✔
2378
  rocksdb_options_set_atomic_flush(opts, 1);
2,704✔
2379

2380
  pTaskDb->dbOpt = opts;
2,703✔
2381
  pTaskDb->env = env;
2,703✔
2382
  pTaskDb->cache = cache;
2,703✔
2383
  pTaskDb->filterFactory = rocksdb_compactionfilterfactory_create(
2,703✔
2384
      NULL, destroyCompactFilteFactory, compactFilteFactoryCreateFilter, compactFilteFactoryName);
2385
  rocksdb_options_set_compaction_filter_factory(pTaskDb->dbOpt, pTaskDb->filterFactory);
2,705✔
2386
  pTaskDb->readOpt = rocksdb_readoptions_create();
2,704✔
2387
  pTaskDb->writeOpt = rocksdb_writeoptions_create();
2,703✔
2388
  rocksdb_writeoptions_disable_WAL(pTaskDb->writeOpt, 1);
2,703✔
2389

2390
  size_t nCf = sizeof(ginitDict) / sizeof(ginitDict[0]);
2,704✔
2391
  pTaskDb->pCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
2,704!
2392
  pTaskDb->pCfParams = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam));
2,705!
2393
  pTaskDb->pCfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*));
2,705!
2394
  pTaskDb->pCompares = taosMemoryCalloc(nCf, sizeof(rocksdb_comparator_t*));
2,705!
2395
  if (pTaskDb->pCf == NULL || pTaskDb->pCfParams == NULL || pTaskDb->pCfOpts == NULL || pTaskDb->pCompares == NULL) {
2,705!
2396
    stError("failed to alloc memory for cf");
3!
2397
    taosMemoryFreeClear(pTaskDb->pCf);
×
2398
    taosMemoryFreeClear(pTaskDb->pCfParams);
×
2399
    taosMemoryFreeClear(pTaskDb->pCfOpts);
×
2400
    taosMemoryFreeClear(pTaskDb->pCompares);
×
2401
    return;
×
2402
  }
2403

2404
  for (int i = 0; i < nCf; i++) {
21,634✔
2405
    rocksdb_options_t*                   opt = rocksdb_options_create_copy(pTaskDb->dbOpt);
18,929✔
2406
    rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create();
18,934✔
2407
    rocksdb_block_based_options_set_block_cache(tableOpt, pTaskDb->cache);
18,929✔
2408
    rocksdb_block_based_options_set_partition_filters(tableOpt, 1);
18,927✔
2409

2410
    rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15);
18,927✔
2411
    rocksdb_block_based_options_set_filter_policy(tableOpt, filter);
18,930✔
2412

2413
    rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)opt, tableOpt);
18,927✔
2414

2415
    SCfInit* cfPara = &ginitDict[i];
18,933✔
2416

2417
    rocksdb_comparator_t* compare =
2418
        rocksdb_comparator_create(NULL, cfPara->destroyCmp, cfPara->cmpKey, cfPara->cmpName);
18,933✔
2419
    rocksdb_options_set_comparator((rocksdb_options_t*)opt, compare);
18,924✔
2420

2421
    rocksdb_compactionfilterfactory_t* filterFactory =
2422
        rocksdb_compactionfilterfactory_create(NULL, cfPara->destroyFilter, cfPara->createFilter, cfPara->funcName);
18,928✔
2423
    rocksdb_options_set_compaction_filter_factory(opt, filterFactory);
18,933✔
2424

2425
    pTaskDb->pCompares[i] = compare;
18,932✔
2426
    pTaskDb->pCfOpts[i] = opt;
18,932✔
2427
    pTaskDb->pCfParams[i].tableOpt = tableOpt;
18,932✔
2428
  }
2429
  return;
2,705✔
2430
}
2431
void taskDbInitChkpOpt(STaskDbWrapper* pTaskDb) {
2,705✔
2432
  pTaskDb->chkpId = -1;
2,705✔
2433
  pTaskDb->chkpCap = 4;
2,705✔
2434
  pTaskDb->chkpSaved = taosArrayInit(4, sizeof(int64_t));
2,705✔
2435
  TAOS_UNUSED(taskDbLoadChkpInfo(pTaskDb));
2,705✔
2436

2437
  pTaskDb->chkpInUse = taosArrayInit(4, sizeof(int64_t));
2,705✔
2438

2439
  TAOS_UNUSED(taosThreadRwlockInit(&pTaskDb->chkpDirLock, NULL));
2,705✔
2440
}
2,705✔
2441

2442
void taskDbRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) {
3✔
2443
  TAOS_UNUSED(taosThreadRwlockWrlock(&pTaskDb->chkpDirLock));
3✔
2444
  if (taosArrayPush(pTaskDb->chkpInUse, &chkp) == NULL) {
6!
2445
    stError("failed to push chkp: %" PRIi64 " into inuse", chkp);
×
2446
  }
2447
  taosArraySort(pTaskDb->chkpInUse, chkpIdComp);
3✔
2448
  TAOS_UNUSED(taosThreadRwlockUnlock(&pTaskDb->chkpDirLock));
3✔
2449
}
3✔
2450

2451
void taskDbUnRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) {
3✔
2452
  TAOS_UNUSED(taosThreadRwlockWrlock(&pTaskDb->chkpDirLock));
3✔
2453
  int32_t size = taosArrayGetSize(pTaskDb->chkpInUse);
3✔
2454
  for (int i = 0; i < size; i++) {
3!
2455
    int64_t* p = taosArrayGet(pTaskDb->chkpInUse, i);
3✔
2456
    if (*p == chkp) {
3!
2457
      taosArrayRemove(pTaskDb->chkpInUse, i);
3✔
2458
      break;
3✔
2459
    }
2460
  }
2461
  TAOS_UNUSED(taosThreadRwlockUnlock(&pTaskDb->chkpDirLock));
3✔
2462
}
3✔
2463

2464
void taskDbDestroyChkpOpt(STaskDbWrapper* pTaskDb) {
2,664✔
2465
  taosArrayDestroy(pTaskDb->chkpSaved);
2,664✔
2466
  taosArrayDestroy(pTaskDb->chkpInUse);
2,665✔
2467
  TAOS_UNUSED(taosThreadRwlockDestroy(&pTaskDb->chkpDirLock));
2,665✔
2468
}
2,665✔
2469

2470
int32_t taskDbBuildFullPath(char* path, char* key, char** dbFullPath, char** stateFullPath) {
×
2471
  int32_t code = 0;
×
2472
  int32_t cap = strlen(path) + 128, nBytes = 0;
×
2473
  char*   statePath = NULL;
×
2474
  char*   dbPath = NULL;
×
2475

2476
  statePath = taosMemoryCalloc(1, cap);
×
2477
  if (statePath == NULL) {
×
2478
    TAOS_CHECK_GOTO(terrno, NULL, _err);
×
2479
  }
2480

2481
  nBytes = snprintf(statePath, cap, "%s%s%s", path, TD_DIRSEP, key);
×
2482
  if (nBytes < 0 || nBytes >= cap) {
×
2483
    code = TSDB_CODE_OUT_OF_RANGE;
×
2484
    TAOS_CHECK_GOTO(code, NULL, _err);
×
2485
  }
2486

2487
  if (!taosDirExist(statePath)) {
×
2488
    code = taosMulMkDir(statePath);
×
2489
    TAOS_CHECK_GOTO(code, NULL, _err);
×
2490
  }
2491

2492
  dbPath = taosMemoryCalloc(1, cap);
×
2493
  if (dbPath == NULL) {
×
2494
    TAOS_CHECK_GOTO(terrno, NULL, _err);
×
2495
  }
2496
  nBytes = snprintf(dbPath, cap, "%s%s%s", statePath, TD_DIRSEP, "state");
×
2497
  if (nBytes < 0 || nBytes >= cap) {
×
2498
    code = TSDB_CODE_OUT_OF_RANGE;
×
2499
    TAOS_CHECK_GOTO(code, NULL, _err);
×
2500
  }
2501

2502
  if (!taosDirExist(dbPath)) {
×
2503
    code = taosMulMkDir(dbPath);
×
2504
    TAOS_CHECK_GOTO(code, NULL, _err);
×
2505
  }
2506

2507
  *dbFullPath = dbPath;
×
2508
  *stateFullPath = statePath;
×
2509
  return 0;
×
2510
_err:
×
2511
  stError("failed to create dir: %s, reason:%s", dbPath, tstrerror(code));
×
2512

2513
  taosMemoryFree(statePath);
×
2514
  taosMemoryFree(dbPath);
×
2515
  return code;
×
2516
}
2517

2518
void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId) {
×
2519
  STaskDbWrapper* p = pTaskDb;
×
2520
  TAOS_UNUSED(streamMutexLock(&p->mutex));
×
2521
  p->chkpId = chkpId;
×
2522
  TAOS_UNUSED(streamMutexUnlock(&p->mutex));
×
2523
}
×
2524

2525
STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) {
2,705✔
2526
  char*   err = NULL;
2,705✔
2527
  char**  cfNames = NULL;
2,705✔
2528
  size_t  nCf = 0;
2,705✔
2529
  int32_t code = 0;
2,705✔
2530
  int32_t lino = 0;
2,705✔
2531

2532
  STaskDbWrapper* pTaskDb = taosMemoryCalloc(1, sizeof(STaskDbWrapper));
2,705!
2533
  TSDB_CHECK_NULL(pTaskDb, code, lino, _EXIT, terrno);
2,705!
2534

2535
  pTaskDb->idstr = key ? taosStrdup(key) : NULL;
2,705!
2536
  pTaskDb->path = statePath ? taosStrdup(statePath) : NULL;
2,705!
2537

2538
  code = taosThreadMutexInit(&pTaskDb->mutex, NULL);
2,705✔
2539
  TSDB_CHECK_CODE(code, lino, _EXIT);
2,705!
2540

2541
  taskDbInitChkpOpt(pTaskDb);
2,705✔
2542
  taskDbInitOpt(pTaskDb);
2,705✔
2543

2544
  cfNames = rocksdb_list_column_families(pTaskDb->dbOpt, dbPath, &nCf, &err);
2,705✔
2545
  if (nCf == 0) {
2,700✔
2546
    stInfo("%s newly create db in state-backend", key);
2,687!
2547
    // pre create db
2548
    pTaskDb->db = rocksdb_open(pTaskDb->pCfOpts[0], dbPath, &err);
2,688✔
2549
    if (pTaskDb->db == NULL) {
2,692!
2550
      stError("%s open state-backend failed, reason:%s", key, err);
×
2551
      code = TSDB_CODE_STREAM_INTERNAL_ERROR;
×
2552
      goto _EXIT;
×
2553
    }
2554

2555
    rocksdb_close(pTaskDb->db);
2,692✔
2556
    pTaskDb->db = NULL;
2,692✔
2557

2558
    if (cfNames != NULL) {
2,692!
2559
      rocksdb_list_column_families_destroy(cfNames, nCf);
2,692✔
2560
    }
2561

2562
    taosMemoryFree(err);
2,692!
2563
    err = NULL;
2,692✔
2564

2565
    cfNames = rocksdb_list_column_families(pTaskDb->dbOpt, dbPath, &nCf, &err);
2,692✔
2566
    if (err != NULL) {
2,692!
2567
      stError("%s failed to create column-family, %s, %" PRIzu ", reason:%s", key, dbPath, nCf, err);
×
2568
      code = TSDB_CODE_STREAM_INTERNAL_ERROR;
×
2569
      goto _EXIT;
×
2570
    }
2571
  }
2572

2573
  if ((code = taskDbOpenCfs(pTaskDb, dbPath, cfNames, nCf)) != 0) {
2,705!
2574
    goto _EXIT;
×
2575
  }
2576

2577
  if (cfNames != NULL) {
2,705!
2578
    rocksdb_list_column_families_destroy(cfNames, nCf);
2,705✔
2579
    cfNames = NULL;
2,705✔
2580
  }
2581

2582
  stDebug("init s-task backend in:%s, backend:%p, %s", dbPath, pTaskDb, key);
2,705✔
2583
  return pTaskDb;
2,705✔
2584

2585
_EXIT:
×
2586
  stError("%s taskDb open failed, %s at line:%d code:%s", key, __func__, lino, tstrerror(code));
×
2587

2588
  taskDbDestroy(pTaskDb, false);
×
2589
  if (err) taosMemoryFree(err);
×
2590
  if (cfNames) rocksdb_list_column_families_destroy(cfNames, nCf);
×
2591
  return NULL;
×
2592
}
2593

2594
int32_t taskDbOpen(const char* path, const char* key, int64_t chkptId, int64_t* processVer, STaskDbWrapper** ppTaskDb) {
2,703✔
2595
  char* statePath = NULL;
2,703✔
2596
  char* dbPath = NULL;
2,703✔
2597
  int   code = 0;
2,703✔
2598
  if ((code = restoreCheckpointData(path, key, chkptId, &statePath, &dbPath, processVer)) < 0) {
2,703!
2599
    stError("failed to restore checkpoint data, path:%s, key:%s, checkpointId: %" PRId64 "reason:%s", path, key,
×
2600
            chkptId, tstrerror(code));
2601
    return code;
×
2602
  }
2603

2604
  STaskDbWrapper* pTaskDb = taskDbOpenImpl(key, statePath, dbPath);
2,705✔
2605
  if (pTaskDb != NULL) {
2,705!
2606
    int64_t chkpId = -1, ver = -1;
2,705✔
2607
    if ((code = chkpLoadExtraInfo(dbPath, &chkpId, &ver)) == 0) {
2,705!
2608
      *processVer = ver;
2,705✔
2609
    } else {
2610
      stError("failed to load extra info, path:%s, key:%s, checkpointId: %" PRId64 "reason:%s", path, key, chkptId,
×
2611
              tstrerror(code));
2612
      taskDbDestroy(pTaskDb, false);
×
2613
      return code;
×
2614
    }
2615
  } else {
2616
    code = TSDB_CODE_INVALID_PARA;
×
2617
  }
2618

2619
  taosMemoryFree(dbPath);
2,705!
2620
  taosMemoryFree(statePath);
2,705!
2621
  *ppTaskDb = pTaskDb;
2,705✔
2622
  return code;
2,705✔
2623
}
2624

2625
void taskDbDestroy(void* pDb, bool flush) {
2,665✔
2626
  STaskDbWrapper* wrapper = pDb;
2,665✔
2627
  if (wrapper == NULL) {
2,665!
2628
    return;
×
2629
  }
2630

2631
  int64_t st = taosGetTimestampMs();
2,665✔
2632
  streamMetaRemoveDB(wrapper->pMeta, wrapper->idstr);
2,665✔
2633

2634
  stDebug("%s succ to destroy stream backend:%p", wrapper->idstr, wrapper);
2,665✔
2635

2636
  int8_t nCf = tListLen(ginitDict);
2,665✔
2637
  if (flush && wrapper->removeAllFiles == 0) {
2,665!
2638
    if (wrapper->db && wrapper->pCf) {
1,279!
2639
      rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
1,279✔
2640
      rocksdb_flushoptions_set_wait(flushOpt, 1);
1,279✔
2641

2642
      char*                            err = NULL;
1,279✔
2643
      rocksdb_column_family_handle_t** cfs = taosMemoryCalloc(1, sizeof(rocksdb_column_family_handle_t*) * nCf);
1,279!
2644
      int                              numOfFlushCf = 0;
1,279✔
2645
      for (int i = 0; i < nCf; i++) {
10,232✔
2646
        if (wrapper->pCf[i] != NULL) {
8,953✔
2647
          cfs[numOfFlushCf++] = wrapper->pCf[i];
2,874✔
2648
        }
2649
      }
2650
      if (numOfFlushCf != 0) {
1,279!
2651
        rocksdb_flush_cfs(wrapper->db, flushOpt, cfs, numOfFlushCf, &err);
1,279✔
2652
        if (err != NULL) {
1,279!
2653
          stError("failed to flush all cfs, reason:%s", err);
×
2654
          taosMemoryFreeClear(err);
×
2655
        }
2656
      }
2657
      taosMemoryFree(cfs);
1,279!
2658
      rocksdb_flushoptions_destroy(flushOpt);
1,279✔
2659
    }
2660
  }
2661

2662
  if (wrapper->pCf != NULL) {
2,665!
2663
    for (int i = 0; i < nCf; i++) {
21,308✔
2664
      if (wrapper->pCf[i] != NULL) {
18,648✔
2665
        rocksdb_column_family_handle_destroy(wrapper->pCf[i]);
6,160✔
2666
      }
2667
    }
2668
  }
2669

2670
  if (wrapper->db) {
2,660!
2671
    rocksdb_close(wrapper->db);
2,665✔
2672
    wrapper->db = NULL;
2,663✔
2673
  }
2674

2675
  rocksdb_options_destroy(wrapper->dbOpt);
2,658✔
2676
  rocksdb_readoptions_destroy(wrapper->readOpt);
2,665✔
2677
  rocksdb_writeoptions_destroy(wrapper->writeOpt);
2,664✔
2678
  rocksdb_env_destroy(wrapper->env);
2,664✔
2679
  rocksdb_cache_destroy(wrapper->cache);
2,665✔
2680

2681
  taosMemoryFree(wrapper->pCf);
2,665!
2682
  for (int i = 0; i < nCf; i++) {
21,317✔
2683
    rocksdb_options_t*                   opt = wrapper->pCfOpts[i];
18,652✔
2684
    rocksdb_comparator_t*                compare = wrapper->pCompares[i];
18,652✔
2685
    rocksdb_block_based_table_options_t* tblOpt = wrapper->pCfParams[i].tableOpt;
18,652✔
2686

2687
    rocksdb_options_destroy(opt);
18,652✔
2688
    rocksdb_comparator_destroy(compare);
18,651✔
2689
    rocksdb_block_based_options_destroy(tblOpt);
18,651✔
2690
  }
2691

2692
  taosMemoryFree(wrapper->pCompares);
2,665!
2693
  taosMemoryFree(wrapper->pCfOpts);
2,665!
2694
  taosMemoryFree(wrapper->pCfParams);
2,665!
2695

2696
  streamMutexDestroy(&wrapper->mutex);
2,665✔
2697
  taskDbDestroyChkpOpt(wrapper);
2,665✔
2698

2699
  if (wrapper->removeAllFiles) {
2,665✔
2700
    char* err = NULL;
1,386✔
2701
    stInfo("drop task remove backend data:%s", wrapper->path);
1,386!
2702
    taosRemoveDir(wrapper->path);
1,386✔
2703
  }
2704

2705
  int64_t et = taosGetTimestampMs();
2,665✔
2706
  stDebug("%s destroy stream backend:%p completed, elapsed time:%.2fs", wrapper->idstr, wrapper, (et - st) / 1000.0);
2,665✔
2707

2708
  taosMemoryFree(wrapper->idstr);
2,665!
2709
  taosMemoryFree(wrapper->path);
2,665!
2710
  taosMemoryFree(wrapper);
2,665!
2711
}
2712

2713
void taskDbDestroy2(void* pDb) { taskDbDestroy(pDb, true); }
2,665✔
2714

2715
int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char** path) {
1✔
2716
  int32_t code = 0;
1✔
2717
  int64_t refId = pDb->refId;
1✔
2718
  int32_t nBytes = 0;
1✔
2719

2720
  if (taosAcquireRef(taskDbWrapperId, refId) == NULL) {
1!
2721
    code = terrno;
×
2722
    return code;
×
2723
  }
2724

2725
  int32_t cap = strlen(pDb->path) + 128;
1✔
2726

2727
  char* buf = taosMemoryCalloc(1, cap);
1!
2728
  if (buf == NULL) {
1!
2729
    TAOS_UNUSED(taosReleaseRef(taskDbWrapperId, refId));
×
2730
    return terrno;
×
2731
  }
2732

2733
  nBytes =
2734
      snprintf(buf, cap, "%s%s%s%s%s%" PRId64, pDb->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkpId);
1✔
2735
  if (nBytes <= 0 || nBytes >= cap) {
1!
2736
    taosMemoryFree(buf);
×
2737
    TAOS_UNUSED(taosReleaseRef(taskDbWrapperId, refId));
×
2738
    return TSDB_CODE_OUT_OF_RANGE;
×
2739
  }
2740

2741
  if (taosIsDir(buf)) {
1!
2742
    code = 0;
×
2743
    *path = buf;
×
2744
  } else {
2745
    taosMemoryFree(buf);
1!
2746
  }
2747

2748
  TAOS_UNUSED(taosReleaseRef(taskDbWrapperId, refId));
1✔
2749
  return code;
1✔
2750
}
2751

2752
int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64_t chkpId, char** path, SArray* list,
1✔
2753
                                    const char* idStr) {
2754
  int32_t  code = 0;
1✔
2755
  int32_t  cap = strlen(pDb->path) + 32;
1✔
2756
  SBkdMgt* p = (SBkdMgt*)bkdChkpMgt;
1✔
2757

2758
  char* temp = taosMemoryCalloc(1, cap);
1!
2759
  if (temp == NULL) {
1!
2760
    return terrno;
×
2761
  }
2762

2763
  int32_t nBytes = snprintf(temp, cap, "%s%s%s%" PRId64, pDb->path, TD_DIRSEP, "tmp", chkpId);
1✔
2764
  if (nBytes <= 0 || nBytes >= cap) {
1!
2765
    taosMemoryFree(temp);
×
2766
    return TSDB_CODE_OUT_OF_RANGE;
×
2767
  }
2768

2769
  if (taosDirExist(temp)) {
1!
2770
    cleanDir(temp, idStr);
×
2771
  } else {
2772
    code = taosMkDir(temp);
1✔
2773
    if (code != 0) {
1!
2774
      taosMemoryFree(temp);
×
2775
      return TAOS_SYSTEM_ERROR(ERRNO);
×
2776
    }
2777
  }
2778

2779
  code = bkdMgtGetDelta(p, pDb->idstr, chkpId, list, temp);
1✔
2780
  *path = temp;
1✔
2781

2782
  return code;
1✔
2783
}
2784

2785
int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list,
3✔
2786
                                const char* idStr) {
2787
  int32_t                 code = -1;
3✔
2788
  STaskDbWrapper*         pDb = arg;
3✔
2789
  ECHECKPOINT_BACKUP_TYPE utype = type;
3✔
2790

2791
  taskDbRefChkp(pDb, chkpId);
3✔
2792
  if (utype == DATA_UPLOAD_RSYNC) {
3✔
2793
    code = taskDbGenChkpUploadData__rsync(pDb, chkpId, path);
1✔
2794
  } else if (utype == DATA_UPLOAD_S3) {
2✔
2795
    code = taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path, list, idStr);
1✔
2796
  }
2797
  taskDbUnRefChkp(pDb, chkpId);
3✔
2798
  return code;
3✔
2799
}
2800

2801
int32_t taskDbOpenCfByKey(STaskDbWrapper* pDb, const char* key) {
×
2802
  int32_t code = 0;
×
2803
  char*   err = NULL;
×
2804
  int8_t  idx = getCfIdx(key);
×
2805

2806
  if (idx == -1) return -1;
×
2807

2808
  if (pDb->pCf[idx] != NULL) return code;
×
2809

2810
  rocksdb_column_family_handle_t* cf =
2811
      rocksdb_create_column_family(pDb->db, pDb->pCfOpts[idx], ginitDict[idx].key, &err);
×
2812
  if (err != NULL) {
×
2813
    stError("failed to open cf, key:%s, reason: %s", key, err);
×
2814
    taosMemoryFree(err);
×
2815
    code = -1;
×
2816
    return code;
×
2817
  }
2818

2819
  pDb->pCf[idx] = cf;
×
2820
  return code;
×
2821
}
2822
int32_t copyDataAt(RocksdbCfInst* pSrc, STaskDbWrapper* pDst, int8_t i) {
×
2823
  int32_t WRITE_BATCH = 1024;
×
2824
  char*   err = NULL;
×
2825
  int     code = 0;
×
2826

2827
  rocksdb_readoptions_t* pRdOpt = rocksdb_readoptions_create();
×
2828

2829
  rocksdb_writebatch_t* wb = rocksdb_writebatch_create();
×
2830
  rocksdb_iterator_t*   pIter = rocksdb_create_iterator_cf(pSrc->db, pRdOpt, pSrc->pHandle[i]);
×
2831
  rocksdb_iter_seek_to_first(pIter);
×
2832
  while (rocksdb_iter_valid(pIter)) {
×
2833
    if (rocksdb_writebatch_count(wb) >= WRITE_BATCH) {
×
2834
      rocksdb_write(pDst->db, pDst->writeOpt, wb, &err);
×
2835
      if (err != NULL) {
×
2836
        code = -1;
×
2837
        goto _EXIT;
×
2838
      }
2839
      rocksdb_writebatch_clear(wb);
×
2840
    }
2841

2842
    size_t klen = 0, vlen = 0;
×
2843
    char*  key = (char*)rocksdb_iter_key(pIter, &klen);
×
2844
    char*  val = (char*)rocksdb_iter_value(pIter, &vlen);
×
2845

2846
    rocksdb_writebatch_put_cf(wb, pDst->pCf[i], key, klen, val, vlen);
×
2847
    rocksdb_iter_next(pIter);
×
2848
  }
2849

2850
  if (rocksdb_writebatch_count(wb) > 0) {
×
2851
    rocksdb_write(pDst->db, pDst->writeOpt, wb, &err);
×
2852
    if (err != NULL) {
×
2853
      code = -1;
×
2854
      goto _EXIT;
×
2855
    }
2856
  }
2857

2858
_EXIT:
×
2859
  rocksdb_writebatch_destroy(wb);
×
2860
  rocksdb_iter_destroy(pIter);
×
2861
  rocksdb_readoptions_destroy(pRdOpt);
×
2862
  taosMemoryFree(err);
×
2863

2864
  return code;
×
2865
}
2866

2867
int32_t streamStateCvtDataFormat(char* path, char* key, void* pCfInst) {
×
2868
  int nCf = sizeof(ginitDict) / sizeof(ginitDict[0]);
×
2869

2870
  int32_t code = 0;
×
2871

2872
  int64_t         processVer = -1;
×
2873
  STaskDbWrapper* pTaskDb = NULL;
×
2874

2875
  code = taskDbOpen(path, key, 0, &processVer, &pTaskDb);
×
2876
  RocksdbCfInst* pSrcBackend = pCfInst;
×
2877

2878
  for (int i = 0; i < nCf; i++) {
×
2879
    rocksdb_column_family_handle_t* pSrcCf = pSrcBackend->pHandle[i];
×
2880
    if (pSrcCf == NULL) continue;
×
2881

2882
    code = taskDbOpenCfByKey(pTaskDb, ginitDict[i].key);
×
2883
    if (code != 0) goto _EXIT;
×
2884

2885
    code = copyDataAt(pSrcBackend, pTaskDb, i);
×
2886
    if (code != 0) goto _EXIT;
×
2887
  }
2888

2889
_EXIT:
×
2890
  taskDbDestroy(pTaskDb, true);
×
2891

2892
  return code;
×
2893
}
2894
int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf) {
×
2895
  SBackendWrapper* handle = backend;
×
2896
  char*            err = NULL;
×
2897
  int64_t          streamId;
2898
  int32_t          taskId, dummy = 0;
×
2899
  char             suffix[64] = {0};
×
2900
  int32_t          code = 0;
×
2901

2902
  rocksdb_options_t**              cfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*));
×
2903
  RocksdbCfParam*                  params = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam));
×
2904
  rocksdb_comparator_t**           pCompare = taosMemoryCalloc(nCf, sizeof(rocksdb_comparator_t*));
×
2905
  rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
×
2906

2907
  for (int i = 0; i < nCf; i++) {
×
2908
    char* cf = cfs[i];
×
2909
    char  funcname[64] = {0};
×
2910

2911
    cfOpts[i] = rocksdb_options_create_copy(handle->dbOpt);
×
2912
    if (i == 0) continue;
×
2913
    if (3 == sscanf(cf, "0x%" PRIx64 "-%d_%s", &streamId, &taskId, funcname)) {
×
2914
      rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create();
×
2915
      rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache);
×
2916
      rocksdb_block_based_options_set_partition_filters(tableOpt, 1);
×
2917

2918
      rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15);
×
2919
      rocksdb_block_based_options_set_filter_policy(tableOpt, filter);
×
2920

2921
      rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpts[i], tableOpt);
×
2922
      params[i].tableOpt = tableOpt;
×
2923

2924
      int idx = streamStateGetCfIdx(NULL, funcname);
×
2925
      if (idx < 0 || idx >= sizeof(ginitDict) / sizeof(ginitDict[0])) {
×
2926
        stError("failed to open cf");
×
2927
        return -1;
×
2928
      }
2929
      SCfInit* cfPara = &ginitDict[idx];
×
2930

2931
      rocksdb_comparator_t* compare =
2932
          rocksdb_comparator_create(NULL, cfPara->destroyCmp, cfPara->cmpKey, cfPara->cmpName);
×
2933
      rocksdb_options_set_comparator((rocksdb_options_t*)cfOpts[i], compare);
×
2934
      pCompare[i] = compare;
×
2935
    }
2936
  }
2937
  rocksdb_t* db = rocksdb_open_column_families(handle->dbOpt, name, nCf, (const char* const*)cfs,
×
2938
                                               (const rocksdb_options_t* const*)cfOpts, cfHandle, &err);
2939
  if (err != NULL) {
×
2940
    stError("failed to open rocksdb cf, reason:%s", err);
×
2941
    taosMemoryFree(err);
×
2942
    taosMemoryFree(cfHandle);
×
2943
    taosMemoryFree(pCompare);
×
2944
    taosMemoryFree(params);
×
2945
    taosMemoryFree(cfOpts);
×
2946
    // fix other leak
2947
    return TSDB_CODE_THIRDPARTY_ERROR;
×
2948
  } else {
2949
    stDebug("succ to open rocksdb cf");
×
2950
  }
2951
  // close default cf
2952
  if (((rocksdb_column_family_handle_t**)cfHandle)[0] != 0) {
×
2953
    rocksdb_column_family_handle_destroy(cfHandle[0]);
×
2954
    cfHandle[0] = NULL;
×
2955
  }
2956
  rocksdb_options_destroy(cfOpts[0]);
×
2957

2958
  handle->db = db;
×
2959

2960
  static int32_t cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
2961
  for (int i = 0; i < nCf; i++) {
×
2962
    char* cf = cfs[i];
×
2963
    if (i == 0) continue;  // skip default column family, not set opt
×
2964

2965
    char funcname[64] = {0};
×
2966
    if (3 == sscanf(cf, "0x%" PRIx64 "-%d_%s", &streamId, &taskId, funcname)) {
×
2967
      char    idstr[128] = {0};
×
2968
      int32_t nBytes = snprintf(idstr, sizeof(idstr), "0x%" PRIx64 "-%d", streamId, taskId);
×
2969
      if (nBytes <= 0 || nBytes >= sizeof(idstr)) {
×
2970
        code = TSDB_CODE_OUT_OF_RANGE;
×
2971
        stError("failed to open cf since %s", tstrerror(code));
×
2972
        return code;
×
2973
      }
2974

2975
      int idx = streamStateGetCfIdx(NULL, funcname);
×
2976

2977
      RocksdbCfInst*  inst = NULL;
×
2978
      RocksdbCfInst** pInst = taosHashGet(handle->cfInst, idstr, strlen(idstr) + 1);
×
2979
      if (pInst == NULL || *pInst == NULL) {
×
2980
        inst = taosMemoryCalloc(1, sizeof(RocksdbCfInst));
×
2981
        inst->pHandle = taosMemoryCalloc(cfLen, sizeof(rocksdb_column_family_handle_t*));
×
2982
        inst->cfOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*));
×
2983
        inst->wOpt = rocksdb_writeoptions_create();
×
2984
        inst->rOpt = rocksdb_readoptions_create();
×
2985
        inst->param = taosMemoryCalloc(cfLen, sizeof(RocksdbCfParam));
×
2986
        inst->pBackend = handle;
×
2987
        inst->db = db;
×
2988
        inst->pCompares = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*));
×
2989

2990
        inst->dbOpt = handle->dbOpt;
×
2991
        rocksdb_writeoptions_disable_WAL(inst->wOpt, 1);
×
2992
        TAOS_UNUSED(taosHashPut(handle->cfInst, idstr, strlen(idstr) + 1, &inst, sizeof(void*)));
×
2993
      } else {
2994
        inst = *pInst;
×
2995
      }
2996
      inst->cfOpt[idx] = cfOpts[i];
×
2997
      inst->pCompares[idx] = pCompare[i];
×
2998
      memcpy(&(inst->param[idx]), &(params[i]), sizeof(RocksdbCfParam));
×
2999
      inst->pHandle[idx] = cfHandle[i];
×
3000
    }
3001
  }
3002
  void* pIter = taosHashIterate(handle->cfInst, NULL);
×
3003
  while (pIter) {
×
3004
    RocksdbCfInst* inst = *(RocksdbCfInst**)pIter;
×
3005

3006
    for (int i = 0; i < cfLen; i++) {
×
3007
      if (inst->cfOpt[i] == NULL) {
×
3008
        rocksdb_options_t*                   opt = rocksdb_options_create_copy(handle->dbOpt);
×
3009
        rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create();
×
3010
        rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache);
×
3011
        rocksdb_block_based_options_set_partition_filters(tableOpt, 1);
×
3012

3013
        rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15);
×
3014
        rocksdb_block_based_options_set_filter_policy(tableOpt, filter);
×
3015

3016
        rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)opt, tableOpt);
×
3017

3018
        SCfInit* cfPara = &ginitDict[i];
×
3019

3020
        rocksdb_comparator_t* compare =
3021
            rocksdb_comparator_create(NULL, cfPara->destroyCmp, cfPara->cmpKey, cfPara->cmpName);
×
3022
        rocksdb_options_set_comparator((rocksdb_options_t*)opt, compare);
×
3023

3024
        inst->pCompares[i] = compare;
×
3025
        inst->cfOpt[i] = opt;
×
3026
        inst->param[i].tableOpt = tableOpt;
×
3027
      }
3028
    }
3029
    SCfComparator compare = {.comp = inst->pCompares, .numOfComp = cfLen};
×
3030
    inst->pCompareNode = streamBackendAddCompare(handle, &compare);
×
3031
    pIter = taosHashIterate(handle->cfInst, pIter);
×
3032
  }
3033

3034
  taosMemoryFree(cfHandle);
×
3035
  taosMemoryFree(pCompare);
×
3036
  taosMemoryFree(params);
×
3037
  taosMemoryFree(cfOpts);
×
3038
  return 0;
×
3039
}
3040

3041
void streamStateDestroyCompar(void* arg) {
×
3042
  SCfComparator* comp = (SCfComparator*)arg;
×
3043
  for (int i = 0; i < comp->numOfComp; i++) {
×
3044
    if (comp->comp[i]) rocksdb_comparator_destroy(comp->comp[i]);
×
3045
  }
3046
  taosMemoryFree(comp->comp);
×
3047
}
×
3048

3049
int streamStateGetCfIdx(SStreamState* pState, const char* funcName) {
269,481✔
3050
  int    idx = -1;
269,481✔
3051
  size_t len = strlen(funcName);
269,481✔
3052
  for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
1,482,147!
3053
    if (len == ginitDict[i].len && strncmp(funcName, ginitDict[i].key, strlen(funcName)) == 0) {
1,482,152✔
3054
      idx = i;
269,486✔
3055
      break;
269,486✔
3056
    }
3057
  }
3058
  if (pState != NULL && idx != -1) {
269,481!
3059
    STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
269,488✔
3060
    if (pState->pTdbState->recalc) {
269,488!
3061
      wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
3062
    }
3063
    if (wrapper == NULL) {
269,488!
3064
      return -1;
×
3065
    }
3066

3067
    streamMutexLock(&wrapper->mutex);
269,488✔
3068

3069
    rocksdb_column_family_handle_t* cf = wrapper->pCf[idx];
269,498✔
3070
    if (cf == NULL) {
269,498✔
3071
      char* err = NULL;
3,524✔
3072
      cf = rocksdb_create_column_family(wrapper->db, wrapper->pCfOpts[idx], ginitDict[idx].key, &err);
3,524✔
3073
      if (err != NULL) {
3,524!
3074
        idx = -1;
×
3075
        stError("failed to open cf, %p %s_%s, reason:%s", pState, wrapper->idstr, funcName, err);
×
3076
        rocksdb_column_family_handle_destroy(cf);
×
3077
        taosMemoryFree(err);
×
3078
      } else {
3079
        stDebug("succ to open cf, %p %s_%s", pState, wrapper->idstr, funcName);
3,524✔
3080
        wrapper->pCf[idx] = cf;
3,524✔
3081
      }
3082
    }
3083
    streamMutexUnlock(&wrapper->mutex);
269,498✔
3084
  }
3085

3086
  return idx;
269,503✔
3087
}
3088
bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len) {
10,272✔
3089
  rocksdb_iter_seek(iter, buf, len);
10,272✔
3090
  if (!rocksdb_iter_valid(iter)) {
10,265✔
3091
    rocksdb_iter_seek_for_prev(iter, buf, len);
6,255✔
3092
    if (!rocksdb_iter_valid(iter)) {
6,263✔
3093
      return false;
5,791✔
3094
    }
3095
  }
3096
  return true;
4,472✔
3097
}
3098
rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKeyName, rocksdb_snapshot_t** snapshot,
13,158✔
3099
                                          rocksdb_readoptions_t** readOpt) {
3100
  int idx = streamStateGetCfIdx(pState, cfKeyName);
13,158✔
3101

3102
  *readOpt = rocksdb_readoptions_create();
13,158✔
3103

3104
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
13,156✔
3105
  if (pState->pTdbState->recalc) {
13,156!
3106
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
3107
  }
3108
  if (snapshot != NULL) {
13,156!
3109
    *snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(wrapper->db);
13,157✔
3110
    rocksdb_readoptions_set_snapshot(*readOpt, *snapshot);
13,157✔
3111
    rocksdb_readoptions_set_fill_cache(*readOpt, 0);
13,157✔
3112
  }
3113

3114
  return rocksdb_create_iterator_cf(wrapper->db, *readOpt, ((rocksdb_column_family_handle_t**)wrapper->pCf)[idx]);
13,156✔
3115
}
3116

3117
#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen)                                                 \
3118
  do {                                                                                                               \
3119
    code = 0;                                                                                                        \
3120
    char  buf[128] = {0};                                                                                            \
3121
    char* err = NULL;                                                                                                \
3122
    int   i = streamStateGetCfIdx(pState, funcname);                                                                 \
3123
    if (i < 0) {                                                                                                     \
3124
      stWarn("streamState failed to get cf name: %s", funcname);                                                     \
3125
      code = TSDB_CODE_THIRDPARTY_ERROR;                                                                             \
3126
      break;                                                                                                         \
3127
    }                                                                                                                \
3128
    STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;                                                   \
3129
    TAOS_UNUSED(atomic_add_fetch_64(&wrapper->dataWritten, 1));                                                      \
3130
    char toString[128] = {0};                                                                                        \
3131
    TAOS_UNUSED((ginitDict[i].toStrFunc((void*)key, toString)));                                                     \
3132
    int32_t                         klen = ginitDict[i].enFunc((void*)key, buf);                                     \
3133
    rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx];    \
3134
    rocksdb_writeoptions_t*         opts = wrapper->writeOpt;                                                        \
3135
    rocksdb_t*                      db = wrapper->db;                                                                \
3136
    char*                           ttlV = NULL;                                                                     \
3137
    int32_t                         ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV);                \
3138
    rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err);             \
3139
    if (err != NULL) {                                                                                               \
3140
      stError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err);                        \
3141
      taosMemoryFree(err);                                                                                           \
3142
      code = TSDB_CODE_THIRDPARTY_ERROR;                                                                             \
3143
    } else {                                                                                                         \
3144
      stTrace("[StreamInternal] write streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d, %p", toString, \
3145
             funcname, vLen, ttlVLen, wrapper);                                                                      \
3146
    }                                                                                                                \
3147
    taosMemoryFree(ttlV);                                                                                            \
3148
  } while (0);
3149

3150
#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen)                                                   \
3151
  do {                                                                                                                \
3152
    code = 0;                                                                                                         \
3153
    char  buf[128] = {0};                                                                                             \
3154
    char* err = NULL;                                                                                                 \
3155
    int   i = streamStateGetCfIdx(pState, funcname);                                                                  \
3156
    if (i < 0) {                                                                                                      \
3157
      stWarn("streamState failed to get cf name: %s", funcname);                                                      \
3158
      code = -1;                                                                                                      \
3159
      break;                                                                                                          \
3160
    }                                                                                                                 \
3161
    STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;                                                    \
3162
    if (pState->pTdbState->recalc) {                                                                                  \
3163
      wrapper = pState->pTdbState->pOwner->pRecalBackend;                                                             \
3164
    }                                                                                                                 \
3165
    char toString[128] = {0};                                                                                         \
3166
    if (stDebugFlag & DEBUG_TRACE) TAOS_UNUSED((ginitDict[i].toStrFunc((void*)key, toString)));                       \
3167
    int32_t                         klen = ginitDict[i].enFunc((void*)key, buf);                                      \
3168
    rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx];     \
3169
    rocksdb_t*                      db = wrapper->db;                                                                 \
3170
    rocksdb_readoptions_t*          opts = wrapper->readOpt;                                                          \
3171
    size_t                          len = 0;                                                                          \
3172
    char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err);                       \
3173
    if (val == NULL || len == 0) {                                                                                    \
3174
      if (err == NULL) {                                                                                              \
3175
        stTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, wrapper->idstr, funcname); \
3176
      } else {                                                                                                        \
3177
        stError("streamState str: %s failed to read from %s_%s, err: %s", toString, wrapper->idstr, funcname, err);   \
3178
        taosMemoryFreeClear(err);                                                                                     \
3179
      }                                                                                                               \
3180
      code = -1;                                                                                                      \
3181
    } else {                                                                                                          \
3182
      char*   p = NULL;                                                                                               \
3183
      int32_t tlen = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal);                                          \
3184
      if (tlen <= 0) {                                                                                                \
3185
        stError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, wrapper->idstr,         \
3186
                funcname);                                                                                            \
3187
        code = -1;                                                                                                    \
3188
      } else {                                                                                                        \
3189
        stTrace("streamState str: %s succ to read from %s_%s, valLen:%d, %p", toString, wrapper->idstr, funcname,     \
3190
                tlen, wrapper);                                                                                       \
3191
      }                                                                                                               \
3192
      taosMemoryFree(val);                                                                                            \
3193
      if (vLen != NULL) *vLen = tlen;                                                                                 \
3194
    }                                                                                                                 \
3195
  } while (0);
3196

3197
#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key)                                                           \
3198
  do {                                                                                                            \
3199
    code = 0;                                                                                                     \
3200
    char  buf[128] = {0};                                                                                         \
3201
    char* err = NULL;                                                                                             \
3202
    int   i = streamStateGetCfIdx(pState, funcname);                                                              \
3203
    if (i < 0) {                                                                                                  \
3204
      stWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname);                     \
3205
      code = TSDB_CODE_THIRDPARTY_ERROR;                                                                          \
3206
      break;                                                                                                      \
3207
    }                                                                                                             \
3208
    STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;                                                \
3209
    if (pState->pTdbState->recalc) {                                                                              \
3210
      wrapper = pState->pTdbState->pOwner->pRecalBackend;                                                         \
3211
    }                                                                                                             \
3212
    TAOS_UNUSED(atomic_add_fetch_64(&wrapper->dataWritten, 1));                                                   \
3213
    char toString[128] = {0};                                                                                     \
3214
    if (stDebugFlag & DEBUG_TRACE) TAOS_UNUSED(ginitDict[i].toStrFunc((void*)key, toString));                     \
3215
    int32_t                         klen = ginitDict[i].enFunc((void*)key, buf);                                  \
3216
    rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \
3217
    rocksdb_t*                      db = wrapper->db;                                                             \
3218
    rocksdb_writeoptions_t*         opts = wrapper->writeOpt;                                                     \
3219
    rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, klen, &err);                                           \
3220
    if (err != NULL) {                                                                                            \
3221
      stError("streamState str: %s failed to del from %s_%s, err: %s", toString, wrapper->idstr, funcname, err);  \
3222
      taosMemoryFree(err);                                                                                        \
3223
      code = TSDB_CODE_THIRDPARTY_ERROR;                                                                          \
3224
    } else {                                                                                                      \
3225
      stTrace("streamState str: %s succ to del from %s_%s", toString, wrapper->idstr, funcname);                  \
3226
    }                                                                                                             \
3227
  } while (0);
3228

3229
// state cf
3230
int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
201✔
3231
  int code = 0;
201✔
3232

3233
  SStateKey sKey = {.key = *key, .opNum = pState->number};
201✔
3234
  char*     dst = NULL;
201✔
3235
  size_t    size = 0;
201✔
3236
  if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) {
201!
3237
    STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)value, (int32_t)vLen);
201!
3238
  } else {
3239
    code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size);
×
3240
    if (code != 0) {
×
3241
      return code;
×
3242
    }
3243
    STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)dst, (int32_t)size);
×
3244
    taosMemoryFree(dst);
×
3245
  }
3246

3247
  stTrace("[StreamInternal] user len:%d, rocks len:%zu", vLen, size);
201!
3248
  return code;
201✔
3249
}
3250
int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
343✔
3251
  int       code = 0;
343✔
3252
  SStateKey sKey = {.key = *key, .opNum = pState->number};
343✔
3253

3254
  char*  tVal = NULL;
343✔
3255
  size_t tValLen = 0;
343✔
3256
  STREAM_STATE_GET_ROCKSDB(pState, "state", &sKey, &tVal, &tValLen);
549!
3257
  if (code != 0) {
343✔
3258
    taosMemoryFree(tVal);
137!
3259
    return code;
137✔
3260
  }
3261
  if (pState->pResultRowStore.resultRowGet == NULL || pState->pExprSupp == NULL) {
206!
3262
    *pVal = tVal;
100✔
3263
    *pVLen = tValLen;
100✔
3264
    return code;
100✔
3265
  }
3266
  size_t pValLen = 0;
106✔
3267
  code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, &pValLen);
106✔
3268
  *pVLen = (int32_t)pValLen;
106✔
3269
  taosMemoryFree(tVal);
106!
3270
  return code;
106✔
3271
}
3272
int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key) {
2,516✔
3273
  int       code = 0;
2,516✔
3274
  SStateKey sKey = {.key = *key, .opNum = pState->number};
2,516✔
3275
  STREAM_STATE_DEL_ROCKSDB(pState, "state", &sKey);
2,516!
3276
  return code;
2,515✔
3277
}
3278
int32_t streamStateClear_rocksdb(SStreamState* pState) {
1✔
3279
  stDebug("streamStateClear_rocksdb");
1!
3280

3281
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
1✔
3282
  if (pState->pTdbState->recalc) {
1!
3283
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
3284
  }
3285

3286
  TAOS_UNUSED(atomic_add_fetch_64(&wrapper->dataWritten, 1));
1✔
3287

3288
  char      sKeyStr[128] = {0};
1✔
3289
  char      eKeyStr[128] = {0};
1✔
3290
  SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number};
1✔
3291
  SStateKey eKey = {.key = {.ts = INT64_MAX, .groupId = UINT64_MAX}, .opNum = pState->number};
1✔
3292

3293
  int sLen = stateKeyEncode(&sKey, sKeyStr);
1✔
3294
  int eLen = stateKeyEncode(&eKey, eKeyStr);
1✔
3295

3296
  if (wrapper->pCf[1] != NULL) {
1!
3297
    char* err = NULL;
1✔
3298
    rocksdb_delete_range_cf(wrapper->db, wrapper->writeOpt, wrapper->pCf[1], sKeyStr, sLen, eKeyStr, eLen, &err);
1✔
3299
    if (err != NULL) {
1!
3300
      char toStringStart[128] = {0};
×
3301
      char toStringEnd[128] = {0};
×
3302
      TAOS_UNUSED(stateKeyToString(&sKey, toStringStart));
×
3303
      TAOS_UNUSED(stateKeyToString(&eKey, toStringEnd));
×
3304

3305
      stWarn("failed to delete range cf(state) start: %s, end:%s, reason:%s", toStringStart, toStringEnd, err);
×
3306
      taosMemoryFree(err);
×
3307
    } else {
3308
      rocksdb_compact_range_cf(wrapper->db, wrapper->pCf[1], sKeyStr, sLen, eKeyStr, eLen);
1✔
3309
    }
3310
  }
3311

3312
  return 0;
1✔
3313
}
3314
void streamStateCurNext_rocksdb(SStreamStateCur* pCur) {
2,565✔
3315
  if (pCur && pCur->iter && rocksdb_iter_valid(pCur->iter)) {
2,565!
3316
    rocksdb_iter_next(pCur->iter);
2,537✔
3317
  }
3318
}
2,565✔
3319
int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) {
1✔
3320
  int code = 0;
1✔
3321
  stDebug("streamStateGetFirst_rocksdb");
1!
3322
  SWinKey tmp = {.ts = 0, .groupId = 0};
1✔
3323
  code = streamStatePut_rocksdb(pState, &tmp, NULL, 0);
1✔
3324
  if (code != 0) {
1!
3325
    return code;
×
3326
  }
3327

3328
  SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp);
1✔
3329
  code = streamStateGetKVByCur_rocksdb(pState, pCur, key, NULL, 0);
1✔
3330
  if (code != 0) {
1!
3331
    return code;
×
3332
  }
3333
  streamStateFreeCur(pCur);
1✔
3334
  return streamStateDel_rocksdb(pState, &tmp);
1✔
3335
}
3336

3337
int32_t streamStateFillGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal,
3,465✔
3338
                                               int32_t* pVLen) {
3339
  if (!pCur) {
3,465✔
3340
    return -1;
376✔
3341
  }
3342
  uint64_t groupId = pKey->groupId;
3,089✔
3343

3344
  int32_t code = streamStateFillGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
3,089✔
3345
  if (code == 0) {
3,090✔
3346
    if (pKey->groupId == groupId) {
2,080✔
3347
      return 0;
2,062✔
3348
    }
3349
    if (pVal != NULL) {
18✔
3350
      taosMemoryFree((void*)*pVal);
15!
3351
      *pVal = NULL;
15✔
3352
    }
3353
  }
3354
  return -1;
1,028✔
3355
}
3356
int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
×
3357
  stDebug("streamStateAddIfNotExist_rocksdb");
×
3358
  int32_t size = *pVLen;
×
3359
  if (streamStateGet_rocksdb(pState, key, pVal, pVLen) == 0) {
×
3360
    return 0;
×
3361
  }
3362
  *pVal = taosMemoryMalloc(size);
×
3363
  if (*pVal == NULL) {
×
3364
    return terrno;
×
3365
  }
3366
  memset(*pVal, 0, size);
×
3367
  return 0;
×
3368
}
3369
void streamStateCurPrev_rocksdb(SStreamStateCur* pCur) {
22✔
3370
  if (pCur) {
22!
3371
    rocksdb_iter_prev(pCur->iter);
22✔
3372
  }
3373
}
22✔
3374
int32_t streamStateGetKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SWinKey* pKey, const void** pVal,
2,250✔
3375
                                      int32_t* pVLen) {
3376
  if (!pCur) return -1;
2,250✔
3377
  SStateKey  tkey = {0};
13✔
3378
  SStateKey* pKtmp = &tkey;
13✔
3379

3380
  if (rocksdb_iter_valid(pCur->iter) && !iterValueIsStale(pCur->iter)) {
13!
3381
    size_t tlen;
3382
    char*  keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen);
11✔
3383
    TAOS_UNUSED(stateKeyDecode((void*)pKtmp, keyStr));
11✔
3384
    if (pKtmp->opNum != pCur->number) {
11!
3385
      return -1;
×
3386
    }
3387

3388
    if (pVLen != NULL) {
11✔
3389
      size_t      vlen = 0;
8✔
3390
      const char* valStr = rocksdb_iter_value(pCur->iter, &vlen);
8✔
3391
      char*       val = NULL;
8✔
3392
      int32_t     len = valueDecode((void*)valStr, vlen, NULL, (char**)val);
8✔
3393
      if (len <= 0) {
8!
3394
        taosMemoryFree(val);
×
3395
        return -1;
8✔
3396
      }
3397

3398
      char*  tVal = val;
8✔
3399
      size_t tVlen = len;
8✔
3400

3401
      if (pVal != NULL) {
8!
3402
        if (pState != NULL && pState->pResultRowStore.resultRowGet != NULL && pState->pExprSupp != NULL) {
8!
3403
          int code =
3404
              (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, val, len, (char**)&tVal, (size_t*)&tVlen);
8✔
3405
          if (code != 0) {
8!
3406
            taosMemoryFree(val);
8!
3407
            return code;
8✔
3408
          }
3409
          taosMemoryFree(val);
×
3410
          *pVal = (char*)tVal;
×
3411
        } else {
3412
          stInfo("streamStateGetKVByCur_rocksdb, pState = %p, pResultRowStore = %p, pExprSupp = %p", pState,
×
3413
                 pState->pResultRowStore.resultRowGet, pState->pExprSupp);
3414
          *pVal = (char*)tVal;
×
3415
        }
3416
      } else {
3417
        taosMemoryFree(val);
×
3418
      }
3419
      *pVLen = (int32_t)tVlen;
×
3420
    }
3421

3422
    *pKey = pKtmp->key;
3✔
3423
    return 0;
3✔
3424
  }
3425
  return -1;
2✔
3426
}
3427
SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key) {
189✔
3428
  SStreamStateCur* pCur = streamStateFillGetCur_rocksdb(pState, key);
189✔
3429
  if (pCur) {
189✔
3430
    int32_t code = streamStateFillGetGroupKVByCur_rocksdb(pCur, key, NULL, 0);
153✔
3431
    if (code == 0) return pCur;
153!
3432
    streamStateFreeCur(pCur);
×
3433
  }
3434
  return NULL;
36✔
3435
}
3436

3437
SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) {
2✔
3438
  SStreamStateCur* pCur = createStreamStateCursor();
2✔
3439
  if (pCur == NULL) {
2!
3440
    return NULL;
×
3441
  }
3442
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
2✔
3443
  if (pState->pTdbState->recalc) {
2!
3444
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
3445
  }
3446
  pCur->number = pState->number;
2✔
3447
  pCur->db = wrapper->db;
2✔
3448
  pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
4✔
3449
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
2✔
3450

3451
  SStateKey sKey = {.key = *key, .opNum = pState->number};
2✔
3452
  char      buf[128] = {0};
2✔
3453
  int       len = stateKeyEncode((void*)&sKey, buf);
2✔
3454
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
2!
3455
    streamStateFreeCur(pCur);
×
3456
    return NULL;
×
3457
  }
3458
  // skip ttl expired data
3459
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
2!
3460
    rocksdb_iter_next(pCur->iter);
×
3461
  }
3462

3463
  if (rocksdb_iter_valid(pCur->iter)) {
2!
3464
    SStateKey curKey;
3465
    size_t    kLen;
3466
    char*     keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
2✔
3467
    TAOS_UNUSED(stateKeyDecode((void*)&curKey, keyStr));
2✔
3468
    if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) > 0) {
2!
3469
      return pCur;
×
3470
    }
3471
    rocksdb_iter_next(pCur->iter);
2✔
3472
    return pCur;
2✔
3473
  }
3474
  streamStateFreeCur(pCur);
×
3475
  return NULL;
×
3476
}
3477

3478
SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState) {
2,246✔
3479
  int32_t code = 0;
2,246✔
3480

3481
  const SStateKey maxStateKey = {.key = {.groupId = UINT64_MAX, .ts = INT64_MAX}, .opNum = INT64_MAX};
2,246✔
3482
  STREAM_STATE_PUT_ROCKSDB(pState, "state", &maxStateKey, "", 0);
2,246!
3483
  if (code != 0) {
2,246!
3484
    return NULL;
×
3485
  }
3486

3487
  {
3488
    char tbuf[256] = {0};
2,246✔
3489
    TAOS_UNUSED(stateKeyToString((void*)&maxStateKey, tbuf));
2,246✔
3490
    stDebug("seek to last:%s", tbuf);
2,246✔
3491
  }
3492

3493
  SStreamStateCur* pCur = createStreamStateCursor();
2,246✔
3494
  if (pCur == NULL) return NULL;
2,246!
3495

3496
  pCur->number = pState->number;
2,246✔
3497

3498
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
2,246✔
3499
  if (pState->pTdbState->recalc) {
2,246!
3500
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
3501
  }
3502
  pCur->db = ((STaskDbWrapper*)wrapper)->db;
2,246✔
3503

3504
  pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
4,492✔
3505
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
2,246✔
3506

3507
  char    buf[128] = {0};
2,246✔
3508
  int32_t klen = stateKeyEncode((void*)&maxStateKey, buf);
2,246✔
3509
  rocksdb_iter_seek(pCur->iter, buf, (size_t)klen);
2,246✔
3510
  rocksdb_iter_prev(pCur->iter);
2,245✔
3511
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
2,246!
3512
    rocksdb_iter_prev(pCur->iter);
×
3513
  }
3514

3515
  if (!rocksdb_iter_valid(pCur->iter)) {
2,245✔
3516
    streamStateFreeCur(pCur);
2,237✔
3517
    pCur = NULL;
2,237✔
3518
  }
3519

3520
  STREAM_STATE_DEL_ROCKSDB(pState, "state", &maxStateKey);
2,246!
3521
  return pCur;
2,246✔
3522
}
3523

3524
SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
×
3525
  stDebug("streamStateGetCur_rocksdb");
×
3526

3527
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
×
3528
  if (pState->pTdbState->recalc) {
×
3529
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
3530
  }
3531

3532
  SStreamStateCur* pCur = createStreamStateCursor();
×
3533
  if (pCur == NULL) return NULL;
×
3534

3535
  pCur->db = wrapper->db;
×
3536
  pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
×
3537
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
×
3538
  pCur->number = pState->number;
×
3539

3540
  SStateKey sKey = {.key = *key, .opNum = pState->number};
×
3541
  char      buf[128] = {0};
×
3542
  int       len = stateKeyEncode((void*)&sKey, buf);
×
3543

3544
  rocksdb_iter_seek(pCur->iter, buf, len);
×
3545

3546
  if (rocksdb_iter_valid(pCur->iter) && !iterValueIsStale(pCur->iter)) {
×
3547
    SStateKey curKey;
3548
    size_t    kLen = 0;
×
3549
    char*     keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
×
3550
    TAOS_UNUSED(stateKeyDecode((void*)&curKey, keyStr));
×
3551

3552
    if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) == 0) {
×
3553
      pCur->number = pState->number;
×
3554
      return pCur;
×
3555
    }
3556
  }
3557
  streamStateFreeCur(pCur);
×
3558
  return NULL;
×
3559
}
3560

3561
// func cf
3562
int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) {
100✔
3563
  int    code = 0;
100✔
3564
  char*  dst = NULL;
100✔
3565
  size_t size = 0;
100✔
3566
  if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) {
100!
3567
    STREAM_STATE_PUT_ROCKSDB(pState, "func", key, (void*)value, (int32_t)vLen);
100!
3568
    return code;
100✔
3569
  }
3570
  code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size);
×
3571
  if (code != 0) {
×
3572
    return code;
×
3573
  }
3574
  STREAM_STATE_PUT_ROCKSDB(pState, "func", key, (void*)dst, (int32_t)size);
×
3575
  taosMemoryFree(dst);
×
3576

3577
  return code;
×
3578
}
3579
int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) {
100✔
3580
  int    code = 0;
100✔
3581
  char*  tVal = NULL;
100✔
3582
  size_t tValLen = 0;
100✔
3583
  STREAM_STATE_GET_ROCKSDB(pState, "func", key, tVal, &tValLen);
200!
3584
  if (code != 0) {
100!
3585
    taosMemoryFree(tVal);
×
3586
    return code;
×
3587
  }
3588

3589
  if (pState->pResultRowStore.resultRowGet == NULL || pState->pExprSupp == NULL) {
100!
3590
    *pVal = tVal;
100✔
3591
    *pVLen = tValLen;
100✔
3592
    return code;
100✔
3593
  }
3594

3595
  size_t pValLen = 0;
×
3596
  code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, &pValLen);
×
3597
  *pVLen = (int32_t)pValLen;
×
3598

3599
  taosMemoryFree(tVal);
×
3600
  return code;
×
3601
}
3602
int32_t streamStateFuncDel_rocksdb(SStreamState* pState, const STupleKey* key) {
100✔
3603
  int code = 0;
100✔
3604
  STREAM_STATE_DEL_ROCKSDB(pState, "func", key);
100!
3605
  return 0;
100✔
3606
}
3607

3608
// session cf
3609
int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
418✔
3610
  int              code = 0;
418✔
3611
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
418✔
3612
  if (value == NULL || vLen == 0) {
418!
3613
    stError("streamStateSessionPut_rocksdb val: %p, len: %d", value, vLen);
×
3614
  }
3615
  char*  dst = NULL;
418✔
3616
  size_t size = 0;
418✔
3617
  if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) {
418!
3618
    STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, (void*)value, (int32_t)vLen);
100!
3619
    return code;
100✔
3620
  }
3621

3622
  code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size);
318✔
3623
  if (code != 0) {
318!
3624
    return code;
×
3625
  }
3626

3627
  stTrace("[StreamInternal] raw len:%d to rocks len:%zu", vLen, size);
318!
3628
  STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, dst, (int32_t)size);
318!
3629
  taosMemoryFree(dst);
318!
3630

3631
  return code;
318✔
3632
}
3633
int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
496✔
3634
  stDebug("streamStateSessionGet_rocksdb");
496✔
3635
  int              code = 0;
496✔
3636
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, key);
496✔
3637
  SSessionKey      resKey = *key;
496✔
3638
  void*            tmp = NULL;
496✔
3639
  int32_t          vLen = 0;
496✔
3640

3641
  code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &resKey, &tmp, &vLen);
496✔
3642
  if (code == 0 && key->win.skey == resKey.win.skey) {
496!
3643
    *key = resKey;
496✔
3644

3645
    if (pVal) {
496!
3646
      *pVal = tmp;
496✔
3647
      tmp = NULL;
496✔
3648
    };
3649
    if (pVLen) *pVLen = vLen;
496!
3650
  } else {
3651
    code = -1;
×
3652
  }
3653

3654
  taosMemoryFree(tmp);
496!
3655
  streamStateFreeCur(pCur);
496✔
3656
  return code;
496✔
3657
}
3658

3659
int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* key) {
1,042✔
3660
  int              code = 0;
1,042✔
3661
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
1,042✔
3662
  STREAM_STATE_DEL_ROCKSDB(pState, "sess", &sKey);
1,042!
3663
  return code;
1,039✔
3664
}
3665

3666
SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState, int64_t groupId) {
623✔
3667
  stDebug("streamStateSessionSeekToLast_rocksdb");
623✔
3668

3669
  int32_t code = 0;
623✔
3670

3671
  SSessionKey      maxSessionKey = {.groupId = groupId, .win = {.skey = INT64_MAX, .ekey = INT64_MAX}};
623✔
3672
  SStateSessionKey maxKey = {.key = maxSessionKey, .opNum = pState->number};
623✔
3673

3674
  STREAM_STATE_PUT_ROCKSDB(pState, "sess", &maxKey, "", 0);
623!
3675
  if (code != 0) {
623!
3676
    return NULL;
×
3677
  }
3678
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
623✔
3679
  if (pState->pTdbState->recalc) {
623!
3680
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
3681
  }
3682

3683
  SStreamStateCur* pCur = createStreamStateCursor();
623✔
3684
  pCur->number = pState->number;
623✔
3685
  pCur->db = wrapper->db;
623✔
3686
  pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
1,246✔
3687
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
623✔
3688

3689
  char    buf[128] = {0};
623✔
3690
  int32_t klen = stateSessionKeyEncode((void*)&maxKey, buf);
623✔
3691
  rocksdb_iter_seek(pCur->iter, buf, (size_t)klen);
623✔
3692
  rocksdb_iter_prev(pCur->iter);
623✔
3693
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
623!
3694
    rocksdb_iter_prev(pCur->iter);
×
3695
  }
3696

3697
  if (!rocksdb_iter_valid(pCur->iter)) {
623✔
3698
    streamStateFreeCur(pCur);
622✔
3699
    pCur = NULL;
622✔
3700
  }
3701

3702
  STREAM_STATE_DEL_ROCKSDB(pState, "sess", &maxKey);
623!
3703
  return pCur;
623✔
3704
}
3705

3706
int32_t streamStateSessionCurPrev_rocksdb(SStreamStateCur* pCur) {
×
3707
  stDebug("streamStateCurPrev_rocksdb");
×
3708
  if (!pCur) return -1;
×
3709

3710
  rocksdb_iter_prev(pCur->iter);
×
3711
  return 0;
×
3712
}
3713

3714
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) {
1,978✔
3715
  stDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb");
1,978✔
3716

3717
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
1,979✔
3718
  if (pState->pTdbState->recalc) {
1,979!
3719
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
3720
  }
3721
  SStreamStateCur* pCur = createStreamStateCursor();
1,979✔
3722
  if (pCur == NULL) {
1,979!
3723
    return NULL;
×
3724
  }
3725

3726
  pCur->number = pState->number;
1,979✔
3727
  pCur->db = wrapper->db;
1,979✔
3728
  pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
3,956✔
3729
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
1,979✔
3730
  if (pCur->iter == NULL) {
1,977!
3731
    streamStateFreeCur(pCur);
×
3732
    return NULL;
×
3733
  }
3734

3735
  char             buf[128] = {0};
1,977✔
3736
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
1,977✔
3737
  int              len = stateSessionKeyEncode(&sKey, buf);
1,977✔
3738
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
1,976✔
3739
    streamStateFreeCur(pCur);
1,112✔
3740
    return NULL;
1,110✔
3741
  }
3742
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) rocksdb_iter_prev(pCur->iter);
865!
3743

3744
  if (!rocksdb_iter_valid(pCur->iter)) {
865!
3745
    streamStateFreeCur(pCur);
×
3746
    return NULL;
×
3747
  }
3748

3749
  int32_t          c = 0;
866✔
3750
  size_t           klen;
3751
  const char*      iKey = rocksdb_iter_key(pCur->iter, &klen);
866✔
3752
  SStateSessionKey curKey = {0};
865✔
3753
  TAOS_UNUSED(stateSessionKeyDecode(&curKey, (char*)iKey));
865✔
3754
  if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) >= 0) return pCur;
865✔
3755

3756
  if (!rocksdb_iter_valid(pCur->iter)) {
400!
3757
    streamStateFreeCur(pCur);
×
3758
    return NULL;
×
3759
  }
3760

3761
  rocksdb_iter_prev(pCur->iter);
400✔
3762
  if (!rocksdb_iter_valid(pCur->iter)) {
400✔
3763
    streamStateFreeCur(pCur);
61✔
3764
    return NULL;
61✔
3765
  }
3766
  return pCur;
339✔
3767
}
3768
SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, const SSessionKey* key) {
734✔
3769
  stDebug("streamStateSessionSeekKeyCurrentNext_rocksdb");
734✔
3770

3771
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
734✔
3772
  if (pState->pTdbState->recalc) {
734!
3773
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
3774
  }
3775
  SStreamStateCur* pCur = createStreamStateCursor();
734✔
3776
  if (pCur == NULL) {
734!
3777
    return NULL;
×
3778
  }
3779
  pCur->db = wrapper->db;
734✔
3780
  pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
1,468✔
3781
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
734✔
3782
  pCur->number = pState->number;
734✔
3783

3784
  char             buf[128] = {0};
734✔
3785
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
734✔
3786
  int              len = stateSessionKeyEncode(&sKey, buf);
734✔
3787

3788
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
734✔
3789
    streamStateFreeCur(pCur);
237✔
3790
    return NULL;
236✔
3791
  }
3792
  if (iterValueIsStale(pCur->iter)) {
497!
3793
    streamStateFreeCur(pCur);
×
3794
    return NULL;
×
3795
  }
3796
  size_t           klen;
3797
  const char*      iKey = rocksdb_iter_key(pCur->iter, &klen);
497✔
3798
  SStateSessionKey curKey = {0};
497✔
3799
  TAOS_UNUSED(stateSessionKeyDecode(&curKey, (char*)iKey));
497✔
3800
  if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) <= 0) return pCur;
497✔
3801

3802
  rocksdb_iter_next(pCur->iter);
1✔
3803
  if (!rocksdb_iter_valid(pCur->iter)) {
1!
3804
    streamStateFreeCur(pCur);
1✔
3805
    return NULL;
1✔
3806
  }
3807
  return pCur;
×
3808
}
3809

3810
SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) {
1,719✔
3811
  stDebug("streamStateSessionSeekKeyNext_rocksdb");
1,719!
3812
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
1,721✔
3813
  if (pState->pTdbState->recalc) {
1,721!
3814
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
3815
  }
3816

3817
  SStreamStateCur* pCur = createStreamStateCursor();
1,721✔
3818
  if (pCur == NULL) {
1,723!
3819
    return NULL;
×
3820
  }
3821
  pCur->db = wrapper->db;
1,723✔
3822
  pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
3,446✔
3823
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
1,723✔
3824
  pCur->number = pState->number;
1,723✔
3825

3826
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
1,723✔
3827

3828
  char buf[128] = {0};
1,723✔
3829
  int  len = stateSessionKeyEncode(&sKey, buf);
1,723✔
3830
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
1,723✔
3831
    streamStateFreeCur(pCur);
1,260✔
3832
    return NULL;
1,260✔
3833
  }
3834
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) rocksdb_iter_next(pCur->iter);
463!
3835
  if (!rocksdb_iter_valid(pCur->iter)) {
463!
3836
    streamStateFreeCur(pCur);
×
3837
    return NULL;
×
3838
  }
3839

3840
  size_t           klen;
3841
  const char*      iKey = rocksdb_iter_key(pCur->iter, &klen);
463✔
3842
  SStateSessionKey curKey = {0};
463✔
3843
  TAOS_UNUSED(stateSessionKeyDecode(&curKey, (char*)iKey));
463✔
3844
  if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) < 0) return pCur;
463✔
3845

3846
  rocksdb_iter_next(pCur->iter);
72✔
3847
  if (!rocksdb_iter_valid(pCur->iter)) {
72✔
3848
    streamStateFreeCur(pCur);
70✔
3849
    return NULL;
70✔
3850
  }
3851
  return pCur;
2✔
3852
}
3853

3854
SStreamStateCur* streamStateSessionSeekKeyPrev_rocksdb(SStreamState* pState, const SSessionKey* key) {
59✔
3855
  stDebug("streamStateSessionSeekKeyPrev_rocksdb");
59✔
3856
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
59✔
3857
  if (pState->pTdbState->recalc) {
59!
3858
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
3859
  }
3860
  SStreamStateCur* pCur = createStreamStateCursor();
59✔
3861
  if (pCur == NULL) {
59!
3862
    return NULL;
×
3863
  }
3864
  pCur->db = wrapper->db;
59✔
3865
  pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
118✔
3866
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
59✔
3867
  pCur->number = pState->number;
59✔
3868

3869
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
59✔
3870

3871
  char buf[128] = {0};
59✔
3872
  int  len = stateSessionKeyEncode(&sKey, buf);
59✔
3873
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
59✔
3874
    streamStateFreeCur(pCur);
57✔
3875
    return NULL;
57✔
3876
  }
3877
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) rocksdb_iter_prev(pCur->iter);
2!
3878
  if (!rocksdb_iter_valid(pCur->iter)) {
2!
3879
    streamStateFreeCur(pCur);
×
3880
    return NULL;
×
3881
  }
3882

3883
  size_t           klen;
3884
  const char*      iKey = rocksdb_iter_key(pCur->iter, &klen);
2✔
3885
  SStateSessionKey curKey = {0};
2✔
3886
  TAOS_UNUSED(stateSessionKeyDecode(&curKey, (char*)iKey));
2✔
3887
  if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) > 0) return pCur;
2✔
3888

3889
  rocksdb_iter_prev(pCur->iter);
1✔
3890
  if (!rocksdb_iter_valid(pCur->iter)) {
1!
3891
    streamStateFreeCur(pCur);
×
3892
    return NULL;
×
3893
  }
3894
  return pCur;
1✔
3895
}
3896

3897
int32_t streamStateSessionGetKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SSessionKey* pKey,
5,200✔
3898
                                             void** pVal, int32_t* pVLen) {
3899
  if (!pCur) {
5,200✔
3900
    return -1;
3,172✔
3901
  }
3902
  SStateSessionKey ktmp = {0};
2,028✔
3903
  size_t           kLen = 0, vLen = 0;
2,028✔
3904

3905
  if (!rocksdb_iter_valid(pCur->iter) || iterValueIsStale(pCur->iter)) {
2,028!
3906
    return -1;
24✔
3907
  }
3908
  const char* curKey = rocksdb_iter_key(pCur->iter, (size_t*)&kLen);
2,006✔
3909
  TAOS_UNUSED(stateSessionKeyDecode((void*)&ktmp, (char*)curKey));
2,006✔
3910

3911
  if (pVal != NULL) *pVal = NULL;
2,006✔
3912
  if (pVLen != NULL) *pVLen = 0;
2,006✔
3913

3914
  SStateSessionKey* pKTmp = &ktmp;
2,006✔
3915
  const char*       vval = rocksdb_iter_value(pCur->iter, (size_t*)&vLen);
2,006✔
3916
  char*             val = NULL;
2,006✔
3917
  int32_t           len = valueDecode((void*)vval, vLen, NULL, &val);
2,006✔
3918
  if (len < 0) {
2,006!
3919
    taosMemoryFree(val);
×
3920
    return -1;
×
3921
  }
3922

3923
  if (pKTmp->opNum != pCur->number) {
2,006!
3924
    taosMemoryFree(val);
×
3925
    return -1;
×
3926
  }
3927
  if (pKey->groupId != 0 && pKey->groupId != pKTmp->key.groupId) {
2,006✔
3928
    taosMemoryFree(val);
681!
3929
    return -1;
681✔
3930
  }
3931

3932
  char*  tVal = val;
1,325✔
3933
  size_t tVlen = len;
1,325✔
3934

3935
  if (pVal != NULL) {
1,325✔
3936
    if (pState != NULL && pState->pResultRowStore.resultRowGet != NULL && pState->pExprSupp != NULL) {
1,237!
3937
      int code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, val, len, (char**)&tVal, (size_t*)&tVlen);
1,032✔
3938
      if (code != 0) {
1,032!
3939
        taosMemoryFree(val);
×
3940
        return code;
×
3941
      }
3942
      taosMemoryFree(val);
1,032!
3943
      *pVal = (char*)tVal;
1,032✔
3944
    } else {
3945
      *pVal = (char*)tVal;
205✔
3946
    }
3947
  } else {
3948
    taosMemoryFree(val);
88!
3949
  }
3950

3951
  if (pVLen != NULL) *pVLen = (int32_t)tVlen;
1,325✔
3952

3953
  *pKey = pKTmp->key;
1,325✔
3954
  qTrace("[StreamInternal]: rocsks val len:%d to user_val_len:%zu", len, tVlen);
1,325!
3955

3956
  return 0;
1,325✔
3957
}
3958
// fill cf
3959
int32_t streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
1,214✔
3960
  int code = 0;
1,214✔
3961

3962
  STREAM_STATE_PUT_ROCKSDB(pState, "fill", key, value, vLen);
1,214!
3963
  return code;
1,214✔
3964
}
3965

3966
int32_t streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
1,355✔
3967
  int code = 0;
1,355✔
3968

3969
  STREAM_STATE_GET_ROCKSDB(pState, "fill", key, pVal, pVLen);
1,355!
3970
  return code;
1,355✔
3971
}
3972
int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) {
295✔
3973
  int code = 0;
295✔
3974
  STREAM_STATE_DEL_ROCKSDB(pState, "fill", key);
295!
3975
  return code;
295✔
3976
}
3977

3978
SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
190✔
3979
  stDebug("streamStateFillGetCur_rocksdb");
190✔
3980
  SStreamStateCur* pCur = createStreamStateCursor();
190✔
3981
  STaskDbWrapper*  wrapper = pState->pTdbState->pOwner->pBackend;
190✔
3982
  if (pState->pTdbState->recalc) {
190!
3983
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
3984
  }
3985

3986
  if (pCur == NULL) return NULL;
190!
3987

3988
  pCur->db = wrapper->db;
190✔
3989
  pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot,
380✔
3990
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
190✔
3991
  pCur->number = pState->number;
190✔
3992

3993
  char buf[128] = {0};
190✔
3994
  int  len = winKeyEncode((void*)key, buf);
190✔
3995
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
190✔
3996
    streamStateFreeCur(pCur);
5✔
3997
    return NULL;
5✔
3998
  }
3999
  if (iterValueIsStale(pCur->iter)) {
185!
4000
    streamStateFreeCur(pCur);
×
4001
    return NULL;
×
4002
  }
4003

4004
  if (rocksdb_iter_valid(pCur->iter)) {
185!
4005
    size_t  kLen;
4006
    SWinKey curKey;
4007
    char*   keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
185✔
4008
    TAOS_UNUSED(winKeyDecode((void*)&curKey, keyStr));
185✔
4009
    if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) == 0) {
185✔
4010
      return pCur;
154✔
4011
    }
4012
  }
4013

4014
  streamStateFreeCur(pCur);
31✔
4015
  return NULL;
31✔
4016
}
4017
int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
3,097✔
4018
  if (!pCur) {
3,097!
4019
    return -1;
×
4020
  }
4021
  SWinKey winKey;
4022
  if (!rocksdb_iter_valid(pCur->iter) || iterValueIsStale(pCur->iter)) {
3,097!
4023
    return -1;
1,012✔
4024
  }
4025
  size_t klen, vlen;
4026
  char*  keyStr = (char*)rocksdb_iter_key(pCur->iter, &klen);
2,085✔
4027
  TAOS_UNUSED(winKeyDecode(&winKey, keyStr));
2,085✔
4028

4029
  const char* valStr = rocksdb_iter_value(pCur->iter, &vlen);
2,085✔
4030
  int32_t     len = valueDecode((void*)valStr, vlen, NULL, (char**)pVal);
2,085✔
4031
  if (len < 0) {
2,085!
4032
    return -1;
×
4033
  }
4034
  if (pVLen != NULL) *pVLen = len;
2,085✔
4035

4036
  *pKey = winKey;
2,085✔
4037
  return 0;
2,085✔
4038
}
4039

4040
SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) {
338✔
4041
  stDebug("streamStateFillSeekKeyNext_rocksdb");
338✔
4042
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
338✔
4043
  if (pState->pTdbState->recalc) {
338!
4044
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
4045
  }
4046
  SStreamStateCur* pCur = createStreamStateCursor();
338✔
4047
  if (!pCur) {
338!
4048
    return NULL;
×
4049
  }
4050

4051
  pCur->db = wrapper->db;
338✔
4052
  pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot,
676✔
4053
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
338✔
4054
  pCur->number = pState->number;
338✔
4055

4056
  char buf[128] = {0};
338✔
4057
  int  len = winKeyEncode((void*)key, buf);
338✔
4058
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
338✔
4059
    streamStateFreeCur(pCur);
88✔
4060
    return NULL;
88✔
4061
  }
4062
  // skip stale data
4063
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
250!
4064
    rocksdb_iter_next(pCur->iter);
×
4065
  }
4066

4067
  if (rocksdb_iter_valid(pCur->iter)) {
250!
4068
    SWinKey curKey;
4069
    size_t  kLen = 0;
250✔
4070
    char*   keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
250✔
4071
    TAOS_UNUSED(winKeyDecode((void*)&curKey, keyStr));
250✔
4072
    if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) < 0) {
250!
4073
      return pCur;
×
4074
    }
4075
    rocksdb_iter_next(pCur->iter);
250✔
4076
    return pCur;
250✔
4077
  }
4078
  streamStateFreeCur(pCur);
×
4079
  return NULL;
×
4080
}
4081
SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) {
1,826✔
4082
  stDebug("streamStateFillSeekKeyPrev_rocksdb");
1,826✔
4083
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
1,826✔
4084
  if (pState->pTdbState->recalc) {
1,826!
4085
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
4086
  }
4087
  SStreamStateCur* pCur = createStreamStateCursor();
1,826✔
4088
  if (pCur == NULL) {
1,826!
4089
    return NULL;
×
4090
  }
4091

4092
  pCur->db = wrapper->db;
1,826✔
4093
  pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot,
3,652✔
4094
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
1,826✔
4095
  pCur->number = pState->number;
1,826✔
4096

4097
  char buf[128] = {0};
1,826✔
4098
  int  len = winKeyEncode((void*)key, buf);
1,826✔
4099
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
1,826✔
4100
    streamStateFreeCur(pCur);
571✔
4101
    return NULL;
572✔
4102
  }
4103
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
1,254!
4104
    rocksdb_iter_prev(pCur->iter);
×
4105
  }
4106

4107
  if (rocksdb_iter_valid(pCur->iter)) {
1,254!
4108
    SWinKey curKey;
4109
    size_t  kLen = 0;
1,254✔
4110
    char*   keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
1,254✔
4111
    TAOS_UNUSED(winKeyDecode((void*)&curKey, keyStr));
1,254✔
4112
    if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) > 0) {
1,254✔
4113
      return pCur;
7✔
4114
    }
4115
    rocksdb_iter_prev(pCur->iter);
1,246✔
4116
    return pCur;
1,247✔
4117
  }
4118

4119
  streamStateFreeCur(pCur);
×
4120
  return NULL;
×
4121
}
4122

4123
SStreamStateCur* streamStateFillSeekToLast_rocksdb(SStreamState* pState) {
286✔
4124
  SWinKey key = {.groupId = UINT64_MAX, .ts = INT64_MAX};
286✔
4125
  return streamStateFillSeekKeyPrev_rocksdb(pState, &key);
286✔
4126
}
4127

4128
#ifdef BUILD_NO_CALL
4129
int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
4130
  stDebug("streamStateSessionGetKeyByRange_rocksdb");
4131
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
4132
  if (pState->pTdbState->recalc) {
4133
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
4134
  }
4135
  SStreamStateCur* pCur = createStreamStateCursor();
4136
  if (pCur == NULL) {
4137
    return -1;
4138
  }
4139
  pCur->db = wrapper->db;
4140
  pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
4141
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
4142
  pCur->number = pState->number;
4143

4144
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
4145
  int32_t          c = 0;
4146
  char             buf[128] = {0};
4147
  int              len = stateSessionKeyEncode(&sKey, buf);
4148
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
4149
    streamStateFreeCur(pCur);
4150
    return -1;
4151
  }
4152

4153
  size_t           kLen;
4154
  const char*      iKeyStr = rocksdb_iter_key(pCur->iter, (size_t*)&kLen);
4155
  SStateSessionKey iKey = {0};
4156
  stateSessionKeyDecode(&iKey, (char*)iKeyStr);
4157

4158
  c = stateSessionKeyCmpr(&sKey, sizeof(sKey), &iKey, sizeof(iKey));
4159

4160
  SSessionKey resKey = *key;
4161
  int32_t     code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &resKey, NULL, NULL);
4162
  if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
4163
    *curKey = resKey;
4164
    streamStateFreeCur(pCur);
4165
    return code;
4166
  }
4167

4168
  if (c > 0) {
4169
    streamStateCurNext_rocksdb(pCur);
4170
    code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &resKey, NULL, NULL);
4171
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
4172
      *curKey = resKey;
4173
      streamStateFreeCur(pCur);
4174
      return code;
4175
    }
4176
  } else if (c < 0) {
4177
    streamStateCurPrev(pState, pCur);
4178
    code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &resKey, NULL, NULL);
4179
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
4180
      *curKey = resKey;
4181
      streamStateFreeCur(pCur);
4182
      return code;
4183
    }
4184
  }
4185

4186
  streamStateFreeCur(pCur);
4187
  return -1;
4188
}
4189
#endif
4190

4191
int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
600✔
4192
                                                int32_t* pVLen) {
4193
  stDebug("streamStateSessionAddIfNotExist_rocksdb");
600✔
4194
  // todo refactor
4195
  int32_t     res = 0;
600✔
4196
  SSessionKey originKey = *key;
600✔
4197
  SSessionKey searchKey = *key;
600✔
4198
  searchKey.win.skey = key->win.skey - gap;
600✔
4199
  searchKey.win.ekey = key->win.ekey + gap;
600✔
4200
  int32_t valSize = *pVLen;
600✔
4201

4202
  void* tmp = taosMemoryMalloc(valSize);
600!
4203
  if (tmp == NULL) {
600!
4204
    return terrno;
×
4205
  }
4206

4207
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key);
600✔
4208
  int32_t          code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, key, pVal, pVLen);
595✔
4209

4210
  if (code == 0) {
594✔
4211
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
95✔
4212
      memcpy(tmp, *pVal, *pVLen);
72✔
4213
      taosMemoryFreeClear(*pVal);
72!
4214
      goto _end;
72✔
4215
    }
4216
    taosMemoryFreeClear(*pVal);
23!
4217
    streamStateCurNext_rocksdb(pCur);
23✔
4218
  } else {
4219
    *key = originKey;
499✔
4220
    streamStateFreeCur(pCur);
499✔
4221
    taosMemoryFreeClear(*pVal);
501!
4222
    pCur = streamStateSessionSeekKeyNext_rocksdb(pState, key);
501✔
4223
  }
4224

4225
  code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, key, pVal, pVLen);
527✔
4226
  if (code == 0) {
528✔
4227
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
15!
4228
      memcpy(tmp, *pVal, *pVLen);
×
4229
      goto _end;
×
4230
    }
4231
  }
4232

4233
  *key = originKey;
528✔
4234
  res = 1;
528✔
4235
  memset(tmp, 0, valSize);
528✔
4236

4237
_end:
600✔
4238
  taosMemoryFree(*pVal);
600!
4239
  *pVal = tmp;
600✔
4240
  streamStateFreeCur(pCur);
600✔
4241
  return res;
600✔
4242
}
4243
void streamStateSessionClear_rocksdb(SStreamState* pState) {
120✔
4244
  stDebug("streamStateSessionClear_rocksdb");
120✔
4245
  SSessionKey      key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
120✔
4246
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, &key);
120✔
4247

4248
  while (1) {
×
4249
    SSessionKey delKey = {0};
118✔
4250
    void*       buf = NULL;
118✔
4251
    int32_t     size = 0;
118✔
4252
    int32_t     code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &delKey, &buf, &size);
118✔
4253
    if (code == 0 && size > 0) {
118!
4254
      memset(buf, 0, size);
×
4255
      // refactor later
4256
      TAOS_UNUSED(streamStateSessionPut_rocksdb(pState, &delKey, buf, size));
×
4257
    } else {
4258
      taosMemoryFreeClear(buf);
118!
4259
      break;
118✔
4260
    }
4261
    taosMemoryFreeClear(buf);
×
4262

4263
    streamStateCurNext_rocksdb(pCur);
×
4264
  }
4265
  streamStateFreeCur(pCur);
118✔
4266
}
118✔
4267
int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, char* pKeyData,
406✔
4268
                                              int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) {
4269
  stDebug("streamStateStateAddIfNotExist_rocksdb");
406✔
4270
  // todo refactor
4271
  int32_t     res = 0;
406✔
4272
  SSessionKey tmpKey = *key;
406✔
4273
  int32_t     valSize = *pVLen;
406✔
4274

4275
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key);
406✔
4276
  int32_t          code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, key, pVal, pVLen);
406✔
4277
  if (code == 0) {
406✔
4278
    if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) {
50!
4279
      goto _end;
1✔
4280
    }
4281

4282
    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
49✔
4283
    if (fn(pKeyData, stateKey) == true) {
49✔
4284
      goto _end;
25✔
4285
    }
4286

4287
    streamStateCurNext_rocksdb(pCur);
24✔
4288
  } else {
4289
    *key = tmpKey;
356✔
4290
    streamStateFreeCur(pCur);
356✔
4291
    pCur = streamStateSessionSeekKeyNext_rocksdb(pState, key);
356✔
4292
  }
4293
  taosMemoryFreeClear(*pVal);
380!
4294
  code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, key, pVal, pVLen);
380✔
4295
  if (code == 0) {
380✔
4296
    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
16✔
4297
    if (fn(pKeyData, stateKey) == true) {
16!
4298
      goto _end;
×
4299
    }
4300
  }
4301
  taosMemoryFreeClear(*pVal);
380!
4302

4303
  *key = tmpKey;
380✔
4304
  res = 1;
380✔
4305

4306
_end:
406✔
4307
  if (res == 0 && valSize > *pVLen){
406!
4308
    stError("[StreamInternal] [skey:%"PRId64 ",ekey:%"PRId64 ",groupId:%"PRIu64 "],valSize:%d bigger than get rocksdb len:%d", key->win.skey, key->win.ekey, key->groupId, valSize, *pVLen);
×
4309
  }
4310
  streamStateFreeCur(pCur);
406✔
4311
  return res;
406✔
4312
}
4313

4314
//  partag cf
4315
int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen) {
193✔
4316
  int    code = 0;
193✔
4317
  char*  dst = NULL;
193✔
4318
  size_t size = 0;
193✔
4319
  if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL || tag == NULL) {
193!
4320
    STREAM_STATE_PUT_ROCKSDB(pState, "partag", &groupId, tag, tagLen);
193!
4321
    return code;
193✔
4322
  }
4323
  code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, tag, tagLen, &dst, &size);
×
4324
  if (code != 0) {
×
4325
    return code;
×
4326
  }
4327
  STREAM_STATE_PUT_ROCKSDB(pState, "partag", &groupId, dst, (int32_t)size);
×
4328
  taosMemoryFree(dst);
×
4329
  return code;
×
4330
}
4331

4332
void streamStateParTagSeekKeyNext_rocksdb(SStreamState* pState, const int64_t groupId, SStreamStateCur* pCur) {
3,408✔
4333
  if (pCur == NULL) {
3,408!
4334
    return;
2,506✔
4335
  }
4336
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
3,408✔
4337
  if (pState->pTdbState->recalc) {
3,408!
4338
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
4339
  }
4340

4341
  pCur->number = pState->number;
3,408✔
4342
  pCur->db = wrapper->db;
3,408✔
4343
  pCur->iter = streamStateIterCreate(pState, "partag", (rocksdb_snapshot_t**)&pCur->snapshot,
6,798✔
4344
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
3,408✔
4345
  int i = streamStateGetCfIdx(pState, "partag");
3,390✔
4346
  if (i < 0) {
3,407!
4347
    stError("streamState failed to put to cf name:%s", "partag");
×
4348
    return;
×
4349
  }
4350

4351
  char    buf[128] = {0};
3,407✔
4352
  int32_t klen = ginitDict[i].enFunc((void*)&groupId, buf);
3,407✔
4353
  if (!streamStateIterSeekAndValid(pCur->iter, buf, klen)) {
3,395✔
4354
    return;
2,432✔
4355
  }
4356
  // skip ttl expired data
4357
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
935!
4358
    rocksdb_iter_next(pCur->iter);
×
4359
  }
4360

4361
  if (rocksdb_iter_valid(pCur->iter)) {
944!
4362
    int64_t curGroupId;
4363
    size_t  kLen = 0;
956✔
4364
    char*   keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
956✔
4365
    TAOS_UNUSED(parKeyDecode((void*)&curGroupId, keyStr));
948✔
4366
    if (curGroupId > groupId) return;
946✔
4367

4368
    rocksdb_iter_next(pCur->iter);
872✔
4369
  }
4370
}
4371

4372
int32_t streamStateParTagGetKVByCur_rocksdb(SStreamStateCur* pCur, int64_t* pGroupId, void** pVal, int32_t* pVLen) {
3,459✔
4373
  stDebug("streamStateFillGetKVByCur_rocksdb");
3,459!
4374
  if (!pCur) {
3,482!
4375
    return -1;
×
4376
  }
4377
  SWinKey winKey;
4378
  if (!rocksdb_iter_valid(pCur->iter) || iterValueIsStale(pCur->iter)) {
3,482!
4379
    return -1;
3,408✔
4380
  }
4381

4382
  size_t klen, vlen;
4383
  char*  keyStr = (char*)rocksdb_iter_key(pCur->iter, &klen);
74✔
4384
  (void)parKeyDecode(pGroupId, keyStr);
74✔
4385

4386
  if (pVal) {
74!
4387
    const char* valStr = rocksdb_iter_value(pCur->iter, &vlen);
×
4388
    int32_t     len = valueDecode((void*)valStr, vlen, NULL, (char**)pVal);
×
4389
    if (len < 0) {
×
4390
      return -1;
×
4391
    }
4392
    if (pVLen != NULL) *pVLen = len;
×
4393
  }
4394

4395
  return 0;
74✔
4396
}
4397

4398
int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen) {
×
4399
  int    code = 0;
×
4400
  char*  tVal;
4401
  size_t tValLen = 0;
×
4402
  STREAM_STATE_GET_ROCKSDB(pState, "partag", &groupId, &tVal, &tValLen);
×
4403
  if (code != 0) {
×
4404
    taosMemoryFree(tVal);
×
4405
    return code;
×
4406
  }
4407
  code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)tagVal, (size_t*)tagLen);
×
4408
  taosMemoryFree(tVal);
×
4409

4410
  return code;
×
4411
}
4412

4413
// parname cfg
4414
int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
1,974✔
4415
  int code = 0;
1,974✔
4416
  STREAM_STATE_PUT_ROCKSDB(pState, "parname", &groupId, (char*)tbname, TSDB_TABLE_NAME_LEN);
1,974!
4417
  return code;
1,974✔
4418
}
4419
int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, void** pVal) {
222,748✔
4420
  int    code = 0;
222,748✔
4421
  size_t tagLen;
4422
  STREAM_STATE_GET_ROCKSDB(pState, "parname", &groupId, pVal, &tagLen);
222,963!
4423
  return code;
222,748✔
4424
}
4425

4426
int32_t streamStateDeleteParName_rocksdb(SStreamState* pState, int64_t groupId) {
×
4427
  int code = 0;
×
4428
  STREAM_STATE_DEL_ROCKSDB(pState, "parname", &groupId);
×
4429
  return code;
×
4430
}
4431

4432
int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen) {
100✔
4433
  int code = 0;
100✔
4434
  STREAM_STATE_PUT_ROCKSDB(pState, "default", key, pVal, pVLen);
100!
4435
  return code;
100✔
4436
}
4437
int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen) {
11,087✔
4438
  int code = 0;
11,087✔
4439
  STREAM_STATE_GET_ROCKSDB(pState, "default", key, pVal, pVLen);
11,087!
4440
  return code;
11,088✔
4441
}
4442
int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key) {
×
4443
  int code = 0;
×
4444
  STREAM_STATE_DEL_ROCKSDB(pState, "default", key);
×
4445
  return code;
×
4446
}
4447

4448
int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result) {
1✔
4449
  int   code = 0;
1✔
4450
  char* err = NULL;
1✔
4451

4452
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
1✔
4453
  if (pState->pTdbState->recalc) {
1!
4454
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
4455
  }
4456
  rocksdb_snapshot_t*    snapshot = NULL;
1✔
4457
  rocksdb_readoptions_t* readopts = NULL;
1✔
4458
  rocksdb_iterator_t*    pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts);
1✔
4459
  if (pIter == NULL) {
1!
4460
    return -1;
×
4461
  }
4462
  size_t klen = 0;
1✔
4463
  rocksdb_iter_seek(pIter, start, strlen(start));
1✔
4464
  while (rocksdb_iter_valid(pIter)) {
101✔
4465
    const char* key = rocksdb_iter_key(pIter, &klen);
100✔
4466
    int32_t     vlen = 0;
100✔
4467
    const char* vval = rocksdb_iter_value(pIter, (size_t*)&vlen);
100✔
4468
    char*       val = NULL;
100✔
4469
    int32_t     len = valueDecode((void*)vval, vlen, NULL, NULL);
100✔
4470
    if (len < 0) {
100!
4471
      rocksdb_iter_next(pIter);
×
4472
      continue;
×
4473
    }
4474

4475
    if (end != NULL && strcmp(key, end) > 0) {
100!
4476
      break;
×
4477
    }
4478
    if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) {
100!
4479
      int64_t checkPoint = 0;
100✔
4480
      if (sscanf(key + strlen(key), ":%" PRId64, &checkPoint) == 1) {
100!
4481
        if (taosArrayPush(result, &checkPoint) == NULL) {
×
4482
          code = terrno;
×
4483
          break;
×
4484
        }
4485
      }
4486
    } else {
4487
      break;
4488
    }
4489
    rocksdb_iter_next(pIter);
100✔
4490
  }
4491
  rocksdb_release_snapshot(wrapper->db, snapshot);
1✔
4492
  rocksdb_readoptions_destroy(readopts);
1✔
4493
  rocksdb_iter_destroy(pIter);
1✔
4494
  return code;
1✔
4495
}
4496
#ifdef BUILD_NO_CALL
4497
void* streamDefaultIterCreate_rocksdb(SStreamState* pState) {
4498
  SStreamStateCur* pCur = createStreamStateCursor();
4499
  STaskDbWrapper*  wrapper = pState->pTdbState->pOwner->pBackend;
4500
  if (pState->pTdbState->recalc) {
4501
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
4502
  }
4503

4504
  pCur->db = wrapper->db;
4505
  pCur->iter = streamStateIterCreate(pState, "default", (rocksdb_snapshot_t**)&pCur->snapshot,
4506
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
4507
  pCur->number = pState->number;
4508
  return pCur;
4509
}
4510
bool streamDefaultIterValid_rocksdb(void* iter) {
4511
  if (iter) {
4512
    return false;
4513
  }
4514
  SStreamStateCur* pCur = iter;
4515
  return (rocksdb_iter_valid(pCur->iter) && !iterValueIsStale(pCur->iter)) ? true : false;
4516
}
4517
void streamDefaultIterSeek_rocksdb(void* iter, const char* key) {
4518
  SStreamStateCur* pCur = iter;
4519
  rocksdb_iter_seek(pCur->iter, key, strlen(key));
4520
}
4521
void streamDefaultIterNext_rocksdb(void* iter) {
4522
  SStreamStateCur* pCur = iter;
4523
  rocksdb_iter_next(pCur->iter);
4524
}
4525
char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len) {
4526
  SStreamStateCur* pCur = iter;
4527
  return (char*)rocksdb_iter_key(pCur->iter, (size_t*)len);
4528
}
4529
char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len) {
4530
  SStreamStateCur* pCur = iter;
4531
  char*            ret = NULL;
4532

4533
  int32_t     vlen = 0;
4534
  const char* val = rocksdb_iter_value(pCur->iter, (size_t*)&vlen);
4535
  *len = valueDecode((void*)val, vlen, NULL, &ret);
4536
  if (*len < 0) {
4537
    taosMemoryFree(ret);
4538
    return NULL;
4539
  }
4540

4541
  return ret;
4542
}
4543
#endif
4544
// batch func
4545
void* streamStateCreateBatch() {
2,890✔
4546
  rocksdb_writebatch_t* pBatch = rocksdb_writebatch_create();
2,890✔
4547
  return pBatch;
2,890✔
4548
}
4549
int32_t streamStateGetBatchSize(void* pBatch) {
429,190✔
4550
  if (pBatch == NULL) return 0;
429,190!
4551
  return rocksdb_writebatch_count(pBatch);
429,190✔
4552
}
4553

4554
void    streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_writebatch_t*)pBatch); }
2,205✔
4555
void    streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); }
2,890✔
4556
int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb_writebatch_t* pBatch, void* key,
1,823✔
4557
                            void* val, int32_t vlen, int64_t ttl) {
4558
  int32_t         code = 0;
1,823✔
4559
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
1,823✔
4560
  if (pState->pTdbState->recalc) {
1,823!
4561
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
4562
  }
4563
  TAOS_UNUSED(atomic_add_fetch_64(&wrapper->dataWritten, 1));
1,823✔
4564

4565
  int i = streamStateGetCfIdx(pState, cfKeyName);
1,823✔
4566
  if (i < 0) {
1,823!
4567
    stError("streamState failed to put to cf name:%s", cfKeyName);
×
4568
    return -1;
×
4569
  }
4570

4571
  char    buf[128] = {0};
1,823✔
4572
  int32_t klen = ginitDict[i].enFunc((void*)key, buf);
1,823✔
4573

4574
  char*   ttlV = NULL;
1,823✔
4575
  int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, ttl, &ttlV);
1,823✔
4576

4577
  rocksdb_column_family_handle_t* pCf = wrapper->pCf[ginitDict[i].idx];
1,823✔
4578
  rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
1,823✔
4579
  taosMemoryFree(ttlV);
1,823!
4580

4581
  {
4582
    char tbuf[256] = {0};
1,823✔
4583
    TAOS_UNUSED(ginitDict[i].toStrFunc((void*)key, tbuf));
1,823✔
4584
    stTrace("streamState str: %s succ to write to %s_%s, len: %d", tbuf, wrapper->idstr, ginitDict[i].key, vlen);
1,823✔
4585
  }
4586
  return 0;
1,823✔
4587
}
4588

4589
int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb_writebatch_t* pBatch, void* key,
427,600✔
4590
                                    void* val, int32_t vlen, int64_t ttl, void* tmpBuf) {
4591
  int32_t code = 0;
427,600✔
4592
  char    buf[128] = {0};
427,600✔
4593
  char    toString[128] = {0};
427,600✔
4594

4595
  char*  dst = NULL;
427,600✔
4596
  size_t size = 0;
427,600✔
4597
  if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) {
427,600!
4598
    dst = val;
19✔
4599
    size = vlen;
19✔
4600
  } else {
4601
    code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, val, vlen, &dst, &size);
427,581✔
4602
    if (code != 0) {
427,581!
4603
      return code;
×
4604
    }
4605
  }
4606
  int32_t klen = ginitDict[cfIdx].enFunc((void*)key, buf);
427,600✔
4607

4608
  ginitDict[cfIdx].toStrFunc((void*)key, toString);
427,600✔
4609
  stTrace("[StreamInternal] write cfIdx:%d key:%s user len:%d, rocks len:%zu", cfIdx, toString, vlen, size);
427,600✔
4610

4611
  char*   ttlV = tmpBuf;
427,600✔
4612
  int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(dst, size, ttl, &ttlV);
427,600✔
4613

4614
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
427,600✔
4615
  if (pState->pTdbState->recalc) {
427,600!
4616
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
4617
  }
4618

4619
  TAOS_UNUSED(atomic_add_fetch_64(&wrapper->dataWritten, 1));
427,600✔
4620

4621
  rocksdb_column_family_handle_t* pCf = wrapper->pCf[ginitDict[cfIdx].idx];
427,600✔
4622
  rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
427,600✔
4623

4624
  if (pState->pResultRowStore.resultRowPut != NULL && pState->pExprSupp != NULL) {
427,600!
4625
    taosMemoryFree(dst);
427,581!
4626
  }
4627

4628
  if (tmpBuf == NULL) {
427,600!
4629
    taosMemoryFree(ttlV);
×
4630
  }
4631

4632
  {
4633
    char tbuf[256] = {0};
427,600✔
4634
    TAOS_UNUSED(ginitDict[cfIdx].toStrFunc((void*)key, tbuf));
427,600✔
4635
    stTrace("streamState str: %s succ to write to %s_%s", tbuf, wrapper->idstr, ginitDict[cfIdx].key);
427,600✔
4636
  }
4637
  return 0;
427,600✔
4638
}
4639
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) {
4,031✔
4640
  char*           err = NULL;
4,031✔
4641
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
4,031✔
4642
  if (pState->pTdbState->recalc) {
4,031!
4643
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
4644
  }
4645
  TAOS_UNUSED(atomic_add_fetch_64(&wrapper->dataWritten, 1));
4,031✔
4646
  rocksdb_write(wrapper->db, wrapper->writeOpt, (rocksdb_writebatch_t*)pBatch, &err);
4,031✔
4647
  if (err != NULL) {
4,031!
4648
    stError("streamState failed to write batch, err:%s", err);
×
4649
    taosMemoryFree(err);
×
4650
    return -1;
×
4651
  } else {
4652
    stDebug("write batch to backend:%p", wrapper->db);
4,031✔
4653
  }
4654
  return 0;
4,031✔
4655
}
4656
uint32_t nextPow2(uint32_t x) {
13✔
4657
  if (x <= 1) return 2;
13✔
4658
  x = x - 1;
11✔
4659
  x = x | (x >> 1);
11✔
4660
  x = x | (x >> 2);
11✔
4661
  x = x | (x >> 4);
11✔
4662
  x = x | (x >> 8);
11✔
4663
  x = x | (x >> 16);
11✔
4664
  return x + 1;
11✔
4665
}
4666

4667
#ifdef BUILD_NO_CALL
4668
int32_t copyFiles(const char* src, const char* dst) {
4669
  int32_t code = 0;
4670
  // opt later, just hard link
4671
  int32_t sLen = strlen(src);
4672
  int32_t dLen = strlen(dst);
4673
  char*   srcName = taosMemoryCalloc(1, sLen + 64);
4674
  char*   dstName = taosMemoryCalloc(1, dLen + 64);
4675

4676
  TdDirPtr pDir = taosOpenDir(src);
4677
  if (pDir == NULL) {
4678
    taosMemoryFree(srcName);
4679
    taosMemoryFree(dstName);
4680
    return -1;
4681
  }
4682

4683
  TdDirEntryPtr de = NULL;
4684
  while ((de = taosReadDir(pDir)) != NULL) {
4685
    char* name = taosGetDirEntryName(de);
4686
    if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue;
4687

4688
    sprintf(srcName, "%s%s%s", src, TD_DIRSEP, name);
4689
    sprintf(dstName, "%s%s%s", dst, TD_DIRSEP, name);
4690
    if (!taosDirEntryIsDir(de)) {
4691
      code = taosCopyFile(srcName, dstName);
4692
      if (code == -1) {
4693
        goto _err;
4694
      }
4695
    }
4696

4697
    memset(srcName, 0, sLen + 64);
4698
    memset(dstName, 0, dLen + 64);
4699
  }
4700

4701
_err:
4702
  taosMemoryFreeClear(srcName);
4703
  taosMemoryFreeClear(dstName);
4704
  taosCloseDir(&pDir);
4705
  return code >= 0 ? 0 : -1;
4706
}
4707
#endif
4708

4709
int32_t isBkdDataMeta(char* name, int32_t len) {
12✔
4710
  const char* pCurrent = "CURRENT";
12✔
4711
  int32_t     currLen = strlen(pCurrent);
12✔
4712

4713
  const char* pManifest = "MANIFEST-";
12✔
4714
  int32_t     maniLen = strlen(pManifest);
12✔
4715

4716
  if (len >= maniLen && strncmp(name, pManifest, maniLen) == 0) {
12!
4717
    return 1;
×
4718
  } else if (len == currLen && strcmp(name, pCurrent) == 0) {
12!
4719
    return 1;
×
4720
  }
4721
  return 0;
12✔
4722
}
4723
int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) {
1✔
4724
  int32_t code = 0;
1✔
4725
  size_t  len = 0;
1✔
4726
  void*   pIter = taosHashIterate(p2, NULL);
1✔
4727
  while (pIter) {
7✔
4728
    char* name = taosHashGetKey(pIter, &len);
6✔
4729
    if (!isBkdDataMeta(name, len) && !taosHashGet(p1, name, len)) {
6!
4730
      int32_t cap = len + 1;
×
4731
      char*   fname = taosMemoryCalloc(1, cap);
×
4732
      if (fname == NULL) {
×
4733
        return terrno;
×
4734
      }
4735
      tstrncpy(fname, name, cap);
×
4736
      if (taosArrayPush(diff, &fname) == NULL) {
×
4737
        taosMemoryFree(fname);
×
4738
        return terrno;
×
4739
      }
4740
    }
4741
    pIter = taosHashIterate(p2, pIter);
6✔
4742
  }
4743
  return code;
1✔
4744
}
4745
int32_t compareHashTable(SHashObj* p1, SHashObj* p2, SArray* add, SArray* del) {
1✔
4746
  int32_t code = 0;
1✔
4747

4748
  code = compareHashTableImpl(p1, p2, add);
1✔
4749
  if (code != 0) {
1!
4750
    code = compareHashTableImpl(p2, p1, del);
×
4751
  }
4752

4753
  return code;
1✔
4754
}
4755

4756
void hashTableToDebug(SHashObj* pTbl, char** buf) {
6✔
4757
  size_t  sz = taosHashGetSize(pTbl);
6✔
4758
  int32_t total = 0;
6✔
4759
  int32_t cap = sz * 16 + 4;
6✔
4760

4761
  char* p = taosMemoryCalloc(1, cap);
6!
4762
  if (p == NULL) {
6!
4763
    stError("failed to alloc memory for stream snapshot debug info");
×
4764
    return;
×
4765
  }
4766

4767
  void* pIter = taosHashIterate(pTbl, NULL);
6✔
4768
  while (pIter) {
24✔
4769
    size_t len = 0;
18✔
4770
    char*  name = taosHashGetKey(pIter, &len);
18✔
4771
    if (name == NULL || len <= 0) {
18!
4772
      pIter = taosHashIterate(pTbl, pIter);
×
4773
      continue;
×
4774
    }
4775

4776
    char* pTmp = taosStrndup(name, len);
18!
4777
    int32_t left = cap - strlen(p);
18✔
4778
    int32_t nBytes = snprintf(p + total, left, "%s,", pTmp);
18✔
4779
    taosMemoryFree(pTmp);
18!
4780

4781
    if (nBytes <= 0 || nBytes >= left) {
18!
4782
      stError("failed to debug snapshot info since %s", tstrerror(TSDB_CODE_OUT_OF_RANGE));
×
4783
      taosMemoryFree(p);
×
4784
      return;
×
4785
    }
4786

4787
    pIter = taosHashIterate(pTbl, pIter);
18✔
4788
    total += nBytes;
18✔
4789
  }
4790
  if (total > 0) {
6✔
4791
    p[total - 1] = 0;
3✔
4792
  }
4793
  *buf = p;
6✔
4794
}
4795
void strArrayDebugInfo(SArray* pArr, char** buf) {
6✔
4796
  int32_t sz = taosArrayGetSize(pArr);
6✔
4797
  if (sz <= 0) return;
6✔
4798

4799
  int32_t code = 0;
1✔
4800
  int32_t total = 0, nBytes = 0;
1✔
4801
  int32_t cap = 64 + sz * 64;
1✔
4802

4803
  char* p = (char*)taosMemoryCalloc(1, cap);
1!
4804
  if (p == NULL) {
1!
4805
    stError("failed to alloc memory for stream snapshot debug info");
×
4806
    return;
×
4807
  }
4808

4809
  for (int i = 0; i < sz; i++) {
7✔
4810
    char*   name = taosArrayGetP(pArr, i);
6✔
4811
    int32_t left = cap - strlen(p);
6✔
4812
    nBytes = snprintf(p + total, left, "%s,", name);
6✔
4813
    if (nBytes <= 0 || nBytes >= left) {
6!
4814
      code = TSDB_CODE_OUT_OF_RANGE;
×
4815
      stError("failed to debug snapshot info since %s", tstrerror(code));
×
4816
      taosMemoryFree(p);
×
4817
      return;
×
4818
    }
4819

4820
    total += nBytes;
6✔
4821
  }
4822

4823
  p[total - 1] = 0;
1✔
4824

4825
  *buf = p;
1✔
4826
}
4827
void dbChkpDebugInfo(SDbChkp* pDb) {
3✔
4828
  if (stDebugFlag & DEBUG_INFO) {
3!
4829
    char* p[4] = {NULL};
3✔
4830

4831
    hashTableToDebug(pDb->pSstTbl[pDb->idx], &p[0]);
3✔
4832
    if (p[0]) stTrace("chkp previous file: [%s]", p[0]);
3!
4833

4834
    hashTableToDebug(pDb->pSstTbl[1 - pDb->idx], &p[1]);
3✔
4835
    if (p[1]) stTrace("chkp curr file: [%s]", p[1]);
3!
4836

4837
    strArrayDebugInfo(pDb->pAdd, &p[2]);
3✔
4838
    if (p[2]) stTrace("chkp newly addded file: [%s]", p[2]);
3!
4839

4840
    strArrayDebugInfo(pDb->pDel, &p[3]);
3✔
4841
    if (p[3]) stTrace("chkp newly deleted file: [%s]", p[3]);
3!
4842

4843
    for (int i = 0; i < 4; i++) {
15✔
4844
      taosMemoryFree(p[i]);
12!
4845
    }
4846
  }
4847
}
3✔
4848
int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) {
3✔
4849
  int32_t code = 0;
3✔
4850
  int32_t nBytes;
4851
  TAOS_UNUSED(taosThreadRwlockWrlock(&p->rwLock));
3✔
4852

4853
  p->preCkptId = p->curChkpId;
3✔
4854
  p->curChkpId = chkpId;
3✔
4855
  const char* pCurrent = "CURRENT";
3✔
4856
  int32_t     currLen = strlen(pCurrent);
3✔
4857

4858
  const char* pManifest = "MANIFEST-";
3✔
4859
  int32_t     maniLen = strlen(pManifest);
3✔
4860

4861
  const char* pSST = ".sst";
3✔
4862
  int32_t     sstLen = strlen(pSST);
3✔
4863

4864
  memset(p->buf, 0, p->len);
3✔
4865

4866
  nBytes =
4867
      snprintf(p->buf, p->len, "%s%s%s%scheckpoint%" PRId64, p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId);
3✔
4868
  if (nBytes <= 0 || nBytes >= p->len) {
3!
4869
    TAOS_UNUSED(taosThreadRwlockUnlock(&p->rwLock));
×
4870
    return TSDB_CODE_OUT_OF_RANGE;
×
4871
  }
4872

4873
  taosArrayClearP(p->pAdd, NULL);
3✔
4874
  taosArrayClearP(p->pDel, NULL);
3✔
4875
  taosHashClear(p->pSstTbl[1 - p->idx]);
3✔
4876

4877
  TdDirPtr pDir = taosOpenDir(p->buf);
3✔
4878
  if (pDir == NULL) {
3!
4879
    TAOS_UNUSED(taosThreadRwlockUnlock(&p->rwLock));
×
4880
    return terrno;
×
4881
  }
4882

4883
  TdDirEntryPtr de = NULL;
3✔
4884
  int8_t        dummy = 0;
3✔
4885
  while ((de = taosReadDir(pDir)) != NULL) {
33✔
4886
    char* name = taosGetDirEntryName(de);
30✔
4887
    if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue;
30✔
4888
    if (strlen(name) == currLen && strcmp(name, pCurrent) == 0) {
24!
4889
      taosMemoryFreeClear(p->pCurrent);
3!
4890

4891
      p->pCurrent = taosStrdup(name);
3!
4892
      if (p->pCurrent == NULL) {
3!
4893
        code = terrno;
×
4894
        break;
×
4895
      }
4896
      continue;
3✔
4897
    }
4898

4899
    if (strlen(name) >= maniLen && strncmp(name, pManifest, maniLen) == 0) {
21✔
4900
      taosMemoryFreeClear(p->pManifest);
3!
4901
      p->pManifest = taosStrdup(name);
3!
4902
      if (p->pManifest == NULL) {
3!
4903
        code = terrno;
×
4904
        break;
×
4905
      }
4906
      continue;
3✔
4907
    }
4908
    if (strlen(name) >= sstLen && strncmp(name + strlen(name) - 4, pSST, sstLen) == 0) {
18!
4909
      if (taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy)) != 0) {
12!
4910
        break;
×
4911
      }
4912
      continue;
12✔
4913
    }
4914
  }
4915
  TAOS_UNUSED(taosCloseDir(&pDir));
3✔
4916
  if (code != 0) {
3!
4917
    TAOS_UNUSED(taosThreadRwlockUnlock(&p->rwLock));
×
4918
    return code;
×
4919
  }
4920

4921
  if (p->init == 0) {
3✔
4922
    void* pIter = taosHashIterate(p->pSstTbl[1 - p->idx], NULL);
2✔
4923
    while (pIter) {
8✔
4924
      size_t len = 0;
6✔
4925
      char*  name = taosHashGetKey(pIter, &len);
6✔
4926
      if (name != NULL && !isBkdDataMeta(name, len)) {
6!
4927
        int32_t cap = len + 1;
6✔
4928
        char*   fname = taosMemoryCalloc(1, cap);
6!
4929
        if (fname == NULL) {
6!
4930
          TAOS_UNUSED(taosThreadRwlockUnlock(&p->rwLock));
×
4931
          return terrno;
×
4932
        }
4933

4934
        tstrncpy(fname, name, cap);
6✔
4935
        if (taosArrayPush(p->pAdd, &fname) == NULL) {
12!
4936
          taosMemoryFree(fname);
×
4937
          TAOS_UNUSED(taosThreadRwlockUnlock(&p->rwLock));
×
4938
          return terrno;
×
4939
        }
4940
      }
4941
      pIter = taosHashIterate(p->pSstTbl[1 - p->idx], pIter);
6✔
4942
    }
4943
    if (taosArrayGetSize(p->pAdd) > 0) p->update = 1;
2✔
4944

4945
    p->init = 1;
2✔
4946
    p->preCkptId = -1;
2✔
4947
    p->curChkpId = chkpId;
2✔
4948
  } else {
4949
    int32_t code = compareHashTable(p->pSstTbl[p->idx], p->pSstTbl[1 - p->idx], p->pAdd, p->pDel);
1✔
4950
    if (code != 0) {
1!
4951
      // dead code
4952
      taosArrayClearP(p->pAdd, NULL);
×
4953
      taosArrayClearP(p->pDel, NULL);
×
4954
      taosHashClear(p->pSstTbl[1 - p->idx]);
×
4955
      p->update = 0;
×
4956
      return code;
×
4957
    }
4958

4959
    if (taosArrayGetSize(p->pAdd) == 0 && taosArrayGetSize(p->pDel) == 0) {
1!
4960
      p->update = 0;
1✔
4961
    }
4962

4963
    p->preCkptId = p->curChkpId;
1✔
4964
    p->curChkpId = chkpId;
1✔
4965
  }
4966

4967
  dbChkpDebugInfo(p);
3✔
4968

4969
  p->idx = 1 - p->idx;
3✔
4970

4971
  TAOS_UNUSED(taosThreadRwlockUnlock(&p->rwLock));
3✔
4972

4973
  return code;
3✔
4974
}
4975

4976
void dbChkpDestroy(SDbChkp* pChkp);
4977

4978
int32_t dbChkpCreate(char* path, int64_t initChkpId, SDbChkp** ppChkp) {
2✔
4979
  int32_t  code = 0;
2✔
4980
  SDbChkp* p = taosMemoryCalloc(1, sizeof(SDbChkp));
2!
4981
  if (p == NULL) {
2!
4982
    code = terrno;
×
4983
    goto _EXIT;
×
4984
  }
4985

4986
  p->curChkpId = initChkpId;
2✔
4987
  p->preCkptId = -1;
2✔
4988
  p->pSST = taosArrayInit(64, sizeof(void*));
2✔
4989
  if (p->pSST == NULL) {
2!
4990
    code = terrno;
×
4991
    dbChkpDestroy(p);
×
4992
    return code;
×
4993
  }
4994

4995
  p->path = path;
2✔
4996
  p->len = strlen(path) + 128;
2✔
4997
  p->buf = taosMemoryCalloc(1, p->len);
2!
4998
  if (p->buf == NULL) {
2!
4999
    code = terrno;
×
5000
    goto _EXIT;
×
5001
  }
5002

5003
  p->idx = 0;
2✔
5004
  p->pSstTbl[0] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
2✔
5005
  if (p->pSstTbl[0] == NULL) {
2!
5006
    code = terrno;
×
5007
    goto _EXIT;
×
5008
  }
5009

5010
  p->pSstTbl[1] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
2✔
5011
  if (p->pSstTbl[1] == NULL) {
2!
5012
    code = terrno;
×
5013
    goto _EXIT;
×
5014
  }
5015

5016
  p->pAdd = taosArrayInit(64, sizeof(void*));
2✔
5017
  if (p->pAdd == NULL) {
2!
5018
    code = terrno;
×
5019
    goto _EXIT;
×
5020
  }
5021

5022
  p->pDel = taosArrayInit(64, sizeof(void*));
2✔
5023
  if (p->pDel == NULL) {
2!
5024
    code = terrno;
×
5025
    goto _EXIT;
×
5026
  }
5027

5028
  p->update = 0;
2✔
5029
  TAOS_UNUSED(taosThreadRwlockInit(&p->rwLock, NULL));
2✔
5030

5031
  SArray* list = NULL;
2✔
5032
  code = dbChkpGetDelta(p, initChkpId, list);
2✔
5033
  if (code != 0) {
2!
5034
    goto _EXIT;
×
5035
  }
5036
  *ppChkp = p;
2✔
5037
  return code;
2✔
5038
_EXIT:
×
5039
  dbChkpDestroy(p);
×
5040
  return code;
×
5041
}
5042

5043
void dbChkpDestroy(SDbChkp* pChkp) {
1✔
5044
  if (pChkp == NULL) return;
1!
5045

5046
  taosMemoryFree(pChkp->buf);
1!
5047
  taosMemoryFree(pChkp->path);
1!
5048

5049
  taosArrayDestroyP(pChkp->pSST, NULL);
1✔
5050
  taosArrayDestroyP(pChkp->pAdd, NULL);
1✔
5051
  taosArrayDestroyP(pChkp->pDel, NULL);
1✔
5052

5053
  taosHashCleanup(pChkp->pSstTbl[0]);
1✔
5054
  taosHashCleanup(pChkp->pSstTbl[1]);
1✔
5055

5056
  taosMemoryFree(pChkp->pCurrent);
1!
5057
  taosMemoryFree(pChkp->pManifest);
1!
5058
  taosMemoryFree(pChkp);
1!
5059
}
5060
#ifdef BUILD_NO_CALL
5061
int32_t dbChkpInit(SDbChkp* p) {
5062
  if (p == NULL) return 0;
5063
  return 0;
5064
}
5065
#endif
5066
int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) {
3✔
5067
  static char* chkpMeta = "META";
5068
  int32_t      code = 0;
3✔
5069

5070
  TAOS_UNUSED(taosThreadRwlockRdlock(&p->rwLock));
3✔
5071

5072
  int32_t cap = p->len + 128;
3✔
5073

5074
  char* buffer = taosMemoryCalloc(4, cap);
3!
5075
  if (buffer == NULL) {
3!
5076
    code = terrno;
×
5077
    goto _ERROR;
×
5078
  }
5079

5080
  char* srcBuf = buffer;
3✔
5081
  char* dstBuf = &srcBuf[cap];
3✔
5082
  char* srcDir = &dstBuf[cap];
3✔
5083
  char* dstDir = &srcDir[cap];
3✔
5084

5085
  int nBytes = snprintf(srcDir, cap, "%s%s%s%s%s%" PRId64, p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP,
3✔
5086
                        "checkpoint", p->curChkpId);
5087
  if (nBytes <= 0 || nBytes >= cap) {
3!
5088
    code = TSDB_CODE_OUT_OF_RANGE;
×
5089
    goto _ERROR;
×
5090
  }
5091

5092
  nBytes = snprintf(dstDir, cap, "%s", dname);
3✔
5093
  if (nBytes <= 0 || nBytes >= cap) {
3!
5094
    code = TSDB_CODE_OUT_OF_RANGE;
×
5095
    goto _ERROR;
×
5096
  }
5097

5098
  if (!taosDirExist(srcDir)) {
3!
5099
    stError("failed to dump srcDir %s, reason: not exist such dir", srcDir);
×
5100
    code = TSDB_CODE_INVALID_PARA;
×
5101
    goto _ERROR;
×
5102
  }
5103
  int64_t chkpId = 0, processId = -1;
3✔
5104
  code = chkpLoadExtraInfo(srcDir, &chkpId, &processId);
3✔
5105
  if (code < 0) {
3!
5106
    stError("failed to load extra info from %s, reason:%s", srcDir, code != 0 ? "unkown" : tstrerror(code));
×
5107

5108
    goto _ERROR;
×
5109
  }
5110

5111
  // add file to $name dir
5112
  for (int i = 0; i < taosArrayGetSize(p->pAdd); i++) {
9✔
5113
    memset(srcBuf, 0, cap);
6✔
5114
    memset(dstBuf, 0, cap);
6✔
5115

5116
    char* filename = taosArrayGetP(p->pAdd, i);
6✔
5117
    nBytes = snprintf(srcBuf, cap, "%s%s%s", srcDir, TD_DIRSEP, filename);
6✔
5118
    if (nBytes <= 0 || nBytes >= cap) {
6!
5119
      code = TSDB_CODE_OUT_OF_RANGE;
×
5120
      goto _ERROR;
×
5121
    }
5122

5123
    nBytes = snprintf(dstBuf, cap, "%s%s%s", dstDir, TD_DIRSEP, filename);
6✔
5124
    if (nBytes <= 0 || nBytes >= cap) {
6!
5125
      code = TSDB_CODE_OUT_OF_RANGE;
×
5126
      goto _ERROR;
×
5127
    }
5128

5129
    if (taosCopyFile(srcBuf, dstBuf) < 0) {
6!
5130
      code = TAOS_SYSTEM_ERROR(ERRNO);
×
5131
      stError("failed to copy file from %s to %s, reason:%s", srcBuf, dstBuf, tstrerror(code));
×
5132
      goto _ERROR;
×
5133
    }
5134
  }
5135
  // del file in $name
5136
  for (int i = 0; i < taosArrayGetSize(p->pDel); i++) {
3!
5137
    char* filename = taosArrayGetP(p->pDel, i);
×
5138
    char* p = taosStrdup(filename);
×
5139
    if (p == NULL) {
×
5140
      code = terrno;
×
5141
      goto _ERROR;
×
5142
    }
5143
    if (taosArrayPush(list, &p) == NULL) {
×
5144
      taosMemoryFree(p);
×
5145
      code = terrno;
×
5146
      goto _ERROR;
×
5147
    }
5148
  }
5149

5150
  // copy current file to dst dir
5151
  memset(srcBuf, 0, cap);
3✔
5152
  memset(dstBuf, 0, cap);
3✔
5153

5154
  nBytes = snprintf(srcBuf, cap, "%s%s%s", srcDir, TD_DIRSEP, p->pCurrent);
3✔
5155
  if (nBytes <= 0 || nBytes >= cap) {
3!
5156
    code = TSDB_CODE_OUT_OF_RANGE;
×
5157
    goto _ERROR;
×
5158
  }
5159

5160
  nBytes = snprintf(dstBuf, cap, "%s%s%s_%" PRId64, dstDir, TD_DIRSEP, p->pCurrent, p->curChkpId);
3✔
5161
  if (nBytes <= 0 || nBytes >= cap) {
3!
5162
    code = TSDB_CODE_OUT_OF_RANGE;
×
5163
    goto _ERROR;
×
5164
  }
5165

5166
  if (taosCopyFile(srcBuf, dstBuf) < 0) {
3!
5167
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
5168
    stError("failed to copy file from %s to %s, reason:%s", srcBuf, dstBuf, tstrerror(code));
×
5169
    goto _ERROR;
×
5170
  }
5171

5172
  // copy manifest file to dst dir
5173
  memset(srcBuf, 0, cap);
3✔
5174
  memset(dstBuf, 0, cap);
3✔
5175

5176
  nBytes = snprintf(srcBuf, cap, "%s%s%s", srcDir, TD_DIRSEP, p->pManifest);
3✔
5177
  if (nBytes <= 0 || nBytes >= cap) {
3!
5178
    code = TSDB_CODE_OUT_OF_RANGE;
×
5179
    goto _ERROR;
×
5180
  }
5181

5182
  nBytes = snprintf(dstBuf, cap, "%s%s%s_%" PRId64, dstDir, TD_DIRSEP, p->pManifest, p->curChkpId);
3✔
5183
  if (nBytes <= 0 || nBytes >= cap) {
3!
5184
    code = TSDB_CODE_OUT_OF_RANGE;
×
5185
    goto _ERROR;
×
5186
  }
5187

5188
  if (taosCopyFile(srcBuf, dstBuf) < 0) {
3!
5189
    code = terrno;
×
5190
    stError("failed to copy file from %s to %s, reason:%s", srcBuf, dstBuf, tstrerror(code));
×
5191
    goto _ERROR;
×
5192
  }
5193
  memset(dstBuf, 0, cap);
3✔
5194
  nBytes = snprintf(dstBuf, cap, "%s%s%s", dstDir, TD_DIRSEP, chkpMeta);
3✔
5195
  if (nBytes <= 0 || nBytes >= cap) {
3!
5196
    code = TSDB_CODE_OUT_OF_RANGE;
×
5197
    goto _ERROR;
×
5198
  }
5199

5200
  TdFilePtr pFile = taosOpenFile(dstBuf, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
3✔
5201
  if (pFile == NULL) {
3!
5202
    code = terrno;
×
5203
    stError("chkp failed to create meta file: %s, reason:%s", dstDir, tstrerror(code));
×
5204
    goto _ERROR;
×
5205
  }
5206

5207
  char content[256] = {0};
3✔
5208
  nBytes = tsnprintf(content, sizeof(content), META_ON_S3_FORMATE, p->pCurrent, p->curChkpId, p->pManifest,
3✔
5209
                     p->curChkpId, "processVer", processId);
5210
  if (nBytes <= 0 || nBytes >= sizeof(content)) {
3!
5211
    code = TSDB_CODE_OUT_OF_RANGE;
×
5212
    stError("chkp failed to format meta file: %s, reason: invalid msg", dstDir);
×
5213
    TAOS_UNUSED(taosCloseFile(&pFile));
×
5214
    goto _ERROR;
×
5215
  }
5216

5217
  nBytes = taosWriteFile(pFile, content, strlen(content));
3✔
5218
  if (nBytes != strlen(content)) {
3!
5219
    code = terrno;
×
5220
    stError("chkp failed to write meta file: %s,reason:%s", dstDir, tstrerror(code));
×
5221
    TAOS_UNUSED(taosCloseFile(&pFile));
×
5222
    goto _ERROR;
×
5223
  }
5224
  TAOS_UNUSED(taosCloseFile(&pFile));
3✔
5225

5226
  // clear delta data buf
5227
  taosArrayClearP(p->pAdd, NULL);
3✔
5228
  taosArrayClearP(p->pDel, NULL);
3✔
5229
  code = 0;
3✔
5230

5231
_ERROR:
3✔
5232
  taosMemoryFree(buffer);
3!
5233
  TAOS_UNUSED(taosThreadRwlockUnlock(&p->rwLock));
3✔
5234
  return code;
3✔
5235
}
5236

5237
int32_t bkdMgtCreate(char* path, SBkdMgt** mgt) {
13,501✔
5238
  int32_t  code = 0;
13,501✔
5239
  SBkdMgt* p = taosMemoryCalloc(1, sizeof(SBkdMgt));
13,501!
5240
  if (p == NULL) {
13,504!
5241
    return terrno;
×
5242
  }
5243

5244
  p->pDbChkpTbl = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
13,504✔
5245
  if (p->pDbChkpTbl == NULL) {
13,504!
5246
    code = terrno;
×
5247
    bkdMgtDestroy(p);
×
5248
    return code;
×
5249
  }
5250

5251
  p->path = taosStrdup(path);
13,504!
5252
  if (p->path == NULL) {
13,504!
5253
    code = terrno;
×
5254
    bkdMgtDestroy(p);
×
5255
    return code;
×
5256
  }
5257

5258
  if (taosThreadRwlockInit(&p->rwLock, NULL) != 0) {
13,504!
5259
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
5260
    bkdMgtDestroy(p);
×
5261
    return code;
×
5262
  }
5263
  *mgt = p;
13,504✔
5264

5265
  return code;
13,504✔
5266
}
5267

5268
void bkdMgtDestroy(SBkdMgt* bm) {
13,488✔
5269
  if (bm == NULL) return;
13,488!
5270
  void* pIter = taosHashIterate(bm->pDbChkpTbl, NULL);
13,488✔
5271
  while (pIter) {
13,494✔
5272
    SDbChkp* pChkp = *(SDbChkp**)(pIter);
1✔
5273
    dbChkpDestroy(pChkp);
1✔
5274

5275
    pIter = taosHashIterate(bm->pDbChkpTbl, pIter);
1✔
5276
  }
5277

5278
  TAOS_UNUSED(taosThreadRwlockDestroy(&bm->rwLock));
13,493✔
5279
  taosMemoryFree(bm->path);
13,495!
5280
  taosHashCleanup(bm->pDbChkpTbl);
13,495✔
5281

5282
  taosMemoryFree(bm);
13,495!
5283
}
5284
int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, char* dname) {
3✔
5285
  int32_t code = 0;
3✔
5286
  TAOS_UNUSED(taosThreadRwlockWrlock(&bm->rwLock));
3✔
5287
  SDbChkp** ppChkp = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId));
3✔
5288
  SDbChkp*  pChkp = ppChkp != NULL ? *ppChkp : NULL;
3✔
5289

5290
  if (pChkp == NULL) {
3✔
5291
    int32_t cap = strlen(bm->path) + 64;
2✔
5292
    char*   path = taosMemoryCalloc(1, cap);
2!
5293
    if (path == NULL) {
2!
5294
      TAOS_UNUSED(taosThreadRwlockUnlock(&bm->rwLock));
×
5295
      return terrno;
×
5296
    }
5297

5298
    int32_t nBytes = snprintf(path, cap, "%s%s%s", bm->path, TD_DIRSEP, taskId);
2✔
5299
    if (nBytes <= 0 || nBytes >= cap) {
2!
5300
      taosMemoryFree(path);
×
5301
      TAOS_UNUSED(taosThreadRwlockUnlock(&bm->rwLock));
×
5302
      code = TSDB_CODE_OUT_OF_RANGE;
×
5303
      return code;
×
5304
    }
5305

5306
    SDbChkp* p = NULL;
2✔
5307
    code = dbChkpCreate(path, chkpId, &p);
2✔
5308
    if (code != 0) {
2!
5309
      taosMemoryFree(path);
×
5310
      TAOS_UNUSED(taosThreadRwlockUnlock(&bm->rwLock));
×
5311
      return code;
×
5312
    }
5313

5314
    if (taosHashPut(bm->pDbChkpTbl, taskId, strlen(taskId), &p, sizeof(void*)) != 0) {
2!
5315
      dbChkpDestroy(p);
×
5316
      TAOS_UNUSED(taosThreadRwlockUnlock(&bm->rwLock));
×
5317
      code = terrno;
×
5318
      return code;
×
5319
    }
5320

5321
    pChkp = p;
2✔
5322
    code = dbChkpDumpTo(pChkp, dname, list);
2✔
5323
    TAOS_UNUSED(taosThreadRwlockUnlock(&bm->rwLock));
2✔
5324
    return code;
2✔
5325
  } else {
5326
    code = dbChkpGetDelta(pChkp, chkpId, NULL);
1✔
5327
    if (code == 0) {
1!
5328
      code = dbChkpDumpTo(pChkp, dname, list);
1✔
5329
    }
5330
  }
5331

5332
  TAOS_UNUSED(taosThreadRwlockUnlock(&bm->rwLock));
1✔
5333
  return code;
1✔
5334
}
5335

5336
#ifdef BUILD_NO_CALL
5337
int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path) {
5338
  int32_t code = -1;
5339

5340
  taosThreadRwlockWrlock(&bm->rwLock);
5341
  SDbChkp** pp = taosHashGet(bm->pDbChkpTbl, task, strlen(task));
5342
  if (pp == NULL) {
5343
    SDbChkp* p = NULL;
5344
    code = dbChkpCreate(path, 0, &p);
5345
    if (code != 0) {
5346
      taosHashPut(bm->pDbChkpTbl, task, strlen(task), &p, sizeof(void*));
5347
      code = 0;
5348
    }
5349
  } else {
5350
    stError("task chkp already exists");
5351
  }
5352

5353
  taosThreadRwlockUnlock(&bm->rwLock);
5354

5355
  return code;
5356
}
5357

5358
int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname) {
5359
  int32_t code = 0;
5360
  taosThreadRwlockRdlock(&bm->rwLock);
5361

5362
  SDbChkp* p = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId));
5363
  code = dbChkpDumpTo(p, dname, NULL);
5364

5365
  taosThreadRwlockUnlock(&bm->rwLock);
5366
  return code;
5367
}
5368
#endif
5369

5370
SStreamStateCur* streamStateSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) {
29✔
5371
  stDebug("streamStateSeekKeyPrev_rocksdb");
29!
5372
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
29✔
5373
  if (pState->pTdbState->recalc) {
29!
5374
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
5375
  }
5376

5377
  SStreamStateCur* pCur = createStreamStateCursor();
29✔
5378
  if (pCur == NULL) {
29!
5379
    return NULL;
×
5380
  }
5381

5382
  pCur->db = wrapper->db;
29✔
5383
  pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
58✔
5384
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
29✔
5385
  pCur->number = pState->number;
29✔
5386

5387
  SStateKey sKey = {.key = *key, .opNum = pState->number};
29✔
5388
  char      buf[128] = {0};
29✔
5389
  int       len = stateKeyEncode((void*)&sKey, buf);
29✔
5390
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
29✔
5391
    streamStateFreeCur(pCur);
27✔
5392
    return NULL;
27✔
5393
  }
5394
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
2!
5395
    rocksdb_iter_prev(pCur->iter);
×
5396
  }
5397

5398
  if (rocksdb_iter_valid(pCur->iter)) {
2!
5399
    SWinKey curKey;
5400
    size_t  kLen = 0;
2✔
5401
    char*   keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
2✔
5402
    TAOS_UNUSED(winKeyDecode((void*)&curKey, keyStr));
2✔
5403
    if (winKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) > 0) {
2!
5404
      return pCur;
2✔
5405
    }
5406
    rocksdb_iter_prev(pCur->iter);
×
5407
    return pCur;
×
5408
  }
5409

5410
  streamStateFreeCur(pCur);
×
5411
  return NULL;
×
5412
}
5413

5414
int32_t streamStateGetGroupKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SWinKey* pKey,
31✔
5415
                                           const void** pVal, int32_t* pVLen) {
5416
  if (!pCur) {
31✔
5417
    return -1;
27✔
5418
  }
5419
  uint64_t groupId = pKey->groupId;
4✔
5420

5421
  int32_t code = streamStateGetKVByCur_rocksdb(pState, pCur, pKey, pVal, pVLen);
4✔
5422
  if (code == 0) {
4✔
5423
    if (pKey->groupId == groupId) {
2!
5424
      return 0;
2✔
5425
    }
5426
    if (pVal != NULL) {
×
5427
      taosMemoryFree((void*)*pVal);
×
5428
      *pVal = NULL;
×
5429
    }
5430
  }
5431
  return -1;
2✔
5432
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc