• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3559

18 Dec 2024 12:59AM UTC coverage: 59.805% (+0.03%) from 59.778%
#3559

push

travis-ci

web-flow
Merge pull request #29187 from taosdata/merge/mainto3.0

merge: main to 3.0 branch

132705 of 287544 branches covered (46.15%)

Branch coverage included in aggregate %.

87 of 95 new or added lines in 19 files covered. (91.58%)

1132 existing lines in 133 files now uncovered.

209591 of 284807 relevant lines covered (73.59%)

8125235.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.61
/source/libs/stream/src/streamBackendRocksdb.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamBackendRocksdb.h"
17
#include "lz4.h"
18
#include "streamInt.h"
19
#include "tcommon.h"
20
#include "tref.h"
21

22
typedef struct SCompactFilteFactory {
23
  void* status;
24
} SCompactFilteFactory;
25

26
typedef struct {
27
  rocksdb_t*                       db;
28
  rocksdb_column_family_handle_t** pHandle;
29
  rocksdb_writeoptions_t*          wOpt;
30
  rocksdb_readoptions_t*           rOpt;
31
  rocksdb_options_t**              cfOpt;
32
  rocksdb_options_t*               dbOpt;
33
  RocksdbCfParam*                  param;
34
  void*                            pBackend;
35
  SListNode*                       pCompareNode;
36
  rocksdb_comparator_t**           pCompares;
37
} RocksdbCfInst;
38

39
int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf);
40

41
void            destroyRocksdbCfInst(RocksdbCfInst* inst);
42
int32_t         getCfIdx(const char* cfName);
43
STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath);
44

45
static int32_t backendCopyFiles(const char* src, const char* dst);
46

47
void        destroyCompactFilteFactory(void* arg);
48
void        destroyCompactFilte(void* arg);
49
const char* compactFilteFactoryName(void* arg);
50
const char* compactFilteFactoryNameSess(void* arg);
51
const char* compactFilteFactoryNameState(void* arg);
52
const char* compactFilteFactoryNameFunc(void* arg);
53
const char* compactFilteFactoryNameFill(void* arg);
54

55
const char* compactFilteName(void* arg);
56
const char* compactFilteNameSess(void* arg);
57
const char* compactFilteNameState(void* arg);
58
const char* compactFilteNameFill(void* arg);
59
const char* compactFilteNameFunc(void* arg);
60

61
unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
62
                           char** newval, size_t* newvlen, unsigned char* value_changed);
63
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx);
64
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterSess(void* arg, rocksdb_compactionfiltercontext_t* ctx);
65
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterState(void* arg, rocksdb_compactionfiltercontext_t* ctx);
66
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFunc(void* arg, rocksdb_compactionfiltercontext_t* ctx);
67
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFill(void* arg, rocksdb_compactionfiltercontext_t* ctx);
68

69
typedef int (*__db_key_encode_fn_t)(void* key, char* buf);
70
typedef int (*__db_key_decode_fn_t)(void* key, char* buf);
71
typedef int (*__db_key_tostr_fn_t)(void* key, char* buf);
72
typedef const char* (*__db_key_cmpname_fn_t)(void* statue);
73
typedef int (*__db_key_cmp_fn_t)(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
74
typedef void (*__db_key_cmp_destroy_fn_t)(void* state);
75
typedef int32_t (*__db_value_encode_fn_t)(void* value, int32_t vlen, int64_t ttl, char** dest);
76
typedef int32_t (*__db_value_decode_fn_t)(void* value, int32_t vlen, int64_t* ttl, char** dest);
77

78
typedef rocksdb_compactionfilter_t* (*__db_factory_create_fn_t)(void* arg, rocksdb_compactionfiltercontext_t* ctx);
79
typedef const char* (*__db_factory_name_fn_t)(void* arg);
80
typedef void (*__db_factory_destroy_fn_t)(void* arg);
81
typedef struct {
82
  const char*               key;
83
  int32_t                   len;
84
  int                       idx;
85
  __db_key_cmp_fn_t         cmpKey;
86
  __db_key_encode_fn_t      enFunc;
87
  __db_key_decode_fn_t      deFunc;
88
  __db_key_tostr_fn_t       toStrFunc;
89
  __db_key_cmpname_fn_t     cmpName;
90
  __db_key_cmp_destroy_fn_t destroyCmp;
91
  __db_value_encode_fn_t    enValueFunc;
92
  __db_value_decode_fn_t    deValueFunc;
93

94
  __db_factory_create_fn_t  createFilter;
95
  __db_factory_destroy_fn_t destroyFilter;
96
  __db_factory_name_fn_t    funcName;
97

98
} SCfInit;
99

100
const char* compareDefaultName(void* name);
101
const char* compareStateName(void* name);
102
const char* compareWinKeyName(void* name);
103
const char* compareSessionKeyName(void* name);
104
const char* compareFuncKeyName(void* name);
105
const char* compareParKeyName(void* name);
106
const char* comparePartagKeyName(void* name);
107

108
int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
109
int defaultKeyEncode(void* k, char* buf);
110
int defaultKeyDecode(void* k, char* buf);
111
int defaultKeyToString(void* k, char* buf);
112

113
int stateKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
114
int stateKeyEncode(void* k, char* buf);
115
int stateKeyDecode(void* k, char* buf);
116
int stateKeyToString(void* k, char* buf);
117

118
int stateSessionKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
119
int stateSessionKeyEncode(void* ses, char* buf);
120
int stateSessionKeyDecode(void* ses, char* buf);
121
int stateSessionKeyToString(void* k, char* buf);
122

123
int winKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
124
int winKeyEncode(void* k, char* buf);
125
int winKeyDecode(void* k, char* buf);
126
int winKeyToString(void* k, char* buf);
127

128
int tupleKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
129
int tupleKeyEncode(void* k, char* buf);
130
int tupleKeyDecode(void* k, char* buf);
131
int tupleKeyToString(void* k, char* buf);
132

133
int parKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
134
int parKeyEncode(void* k, char* buf);
135
int parKeyDecode(void* k, char* buf);
136
int parKeyToString(void* k, char* buf);
137

138
int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest);
139
int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest);
140
int32_t valueToString(void* k, char* buf);
141
int32_t valueIsStale(void* k, int64_t ts);
142

143
void        destroyCompare(void* arg);
144
static void cleanDir(const char* pPath, const char* id);
145

146
static bool                streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len);
147
static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName,
148
                                                 rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt);
149

150
void taskDbRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp);
151
void taskDbUnRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp);
152

153
int32_t chkpAddExtraInfo(char* pChkpIdDir, int64_t chkpId, int64_t processId);
154
int32_t chkpLoadExtraInfo(char* pChkpIdDir, int64_t* chkpId, int64_t* processId);
155

156
int32_t  copyFiles(const char* src, const char* dst);
157
uint32_t nextPow2(uint32_t x);
158

159
SCfInit ginitDict[] = {
160
    {"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName,
161
     destroyCompare, valueEncode, valueDecode, compactFilteFactoryCreateFilter, destroyCompactFilteFactory,
162
     compactFilteFactoryName},
163

164
    {"state", 5, 1, stateKeyDBComp, stateKeyEncode, stateKeyDecode, stateKeyToString, compareStateName, destroyCompare,
165
     valueEncode, valueDecode, compactFilteFactoryCreateFilterState, destroyCompactFilteFactory,
166
     compactFilteFactoryNameState},
167

168
    {"fill", 4, 2, winKeyDBComp, winKeyEncode, winKeyDecode, winKeyToString, compareWinKeyName, destroyCompare,
169
     valueEncode, valueDecode, compactFilteFactoryCreateFilterFill, destroyCompactFilteFactory,
170
     compactFilteFactoryNameFill},
171

172
    {"sess", 4, 3, stateSessionKeyDBComp, stateSessionKeyEncode, stateSessionKeyDecode, stateSessionKeyToString,
173
     compareSessionKeyName, destroyCompare, valueEncode, valueDecode, compactFilteFactoryCreateFilterSess,
174
     destroyCompactFilteFactory, compactFilteFactoryNameSess},
175

176
    {"func", 4, 4, tupleKeyDBComp, tupleKeyEncode, tupleKeyDecode, tupleKeyToString, compareFuncKeyName, destroyCompare,
177
     valueEncode, valueDecode, compactFilteFactoryCreateFilterFunc, destroyCompactFilteFactory,
178
     compactFilteFactoryNameFunc},
179

180
    {"parname", 7, 5, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, compareParKeyName, destroyCompare,
181
     valueEncode, valueDecode, compactFilteFactoryCreateFilter, destroyCompactFilteFactory, compactFilteFactoryName},
182

183
    {"partag", 6, 6, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, comparePartagKeyName, destroyCompare,
184
     valueEncode, valueDecode, compactFilteFactoryCreateFilter, destroyCompactFilteFactory, compactFilteFactoryName},
185
};
186

187
int32_t getCfIdx(const char* cfName) {
9,436✔
188
  int    idx = -1;
9,436✔
189
  size_t len = strlen(cfName);
9,436✔
190
  for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
9,670!
191
    if (len == ginitDict[i].len && strncmp(cfName, ginitDict[i].key, strlen(cfName)) == 0) {
9,670✔
192
      idx = i;
9,436✔
193
      break;
9,436✔
194
    }
195
  }
196
  return idx;
9,436✔
197
}
198

199
bool isValidCheckpoint(const char* dir) {
30✔
200
  // not implement yet
201
  return true;
30✔
202
}
203

204
/*
205
 *copy pChkpIdDir's file to state dir
206
 */
207
int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
2✔
208
  // impl later
209
  int32_t code = 0;
2✔
210
  int32_t cap = strlen(path) + 64;
2✔
211
  int32_t nBytes = 0;
2✔
212

213
  char* state = taosMemoryCalloc(1, cap);
2!
214
  if (state == NULL) {
2!
215
    return terrno;
×
216
  }
217

218
  nBytes = snprintf(state, cap, "%s%s%s", path, TD_DIRSEP, "state");
2✔
219
  if (nBytes <= 0 || nBytes >= cap) {
2!
220
    taosMemoryFree(state);
×
221
    return TSDB_CODE_OUT_OF_RANGE;
×
222
  }
223

224
  if (chkpId != 0) {
2!
225
    char* chkp = taosMemoryCalloc(1, cap);
2!
226
    if (chkp == NULL) {
2!
227
      taosMemoryFree(state);
×
228
      return terrno;
×
229
    }
230

231
    nBytes = snprintf(chkp, cap, "%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId);
2✔
232
    if (nBytes <= 0 || nBytes >= cap) {
2!
233
      taosMemoryFree(state);
×
234
      taosMemoryFree(chkp);
×
235
      return TSDB_CODE_OUT_OF_RANGE;
×
236
    }
237

238
    if (taosIsDir(chkp) && isValidCheckpoint(chkp)) {
2!
239
      cleanDir(state, "");
×
240
      code = backendCopyFiles(chkp, state);
×
241
      if (code != 0) {
×
242
        stError("failed to restart stream backend from %s, reason: %s", chkp, tstrerror(TAOS_SYSTEM_ERROR(code)));
×
243
      } else {
244
        stInfo("start to restart stream backend at checkpoint path: %s", chkp);
×
245
      }
246

247
    } else {
248
      stError("failed to start stream backend at %s, reason: %s, restart from default state dir:%s", chkp,
2!
249
              tstrerror(terrno), state);
250
      code = taosMkDir(state);
2✔
251
      if (code != 0) {
2!
252
        code = TAOS_SYSTEM_ERROR(errno);
×
253
      }
254
    }
255

256
    taosMemoryFree(chkp);
2!
257
  }
258

259
  *dst = state;
2✔
260
  return code;
2✔
261
}
262

263
typedef struct {
264
  char    pCurrName[24];
265
  int64_t currChkptId;
266

267
  char    pManifestName[24];
268
  int64_t manifestChkptId;
269

270
  char    processName[24];
271
  int64_t processId;
272
} SSChkpMetaOnS3;
273

274
int32_t remoteChkp_readMetaData(char* path, SSChkpMetaOnS3** pMeta) {
×
275
  int32_t   code = 0;
×
276
  int32_t   cap = strlen(path) + 32;
×
277
  TdFilePtr pFile = NULL;
×
278

279
  char* metaPath = taosMemoryCalloc(1, cap);
×
280
  if (metaPath == NULL) {
×
281
    return terrno;
×
282
  }
283

284
  int32_t n = snprintf(metaPath, cap, "%s%s%s", path, TD_DIRSEP, "META");
×
285
  if (n <= 0 || n >= cap) {
×
286
    taosMemoryFree(metaPath);
×
287
    return TSDB_CODE_OUT_OF_MEMORY;
×
288
  }
289

290
  pFile = taosOpenFile(path, TD_FILE_READ);
×
291
  if (pFile == NULL) {
×
292
    code = terrno;
×
293
    goto _EXIT;
×
294
  }
295

296
  char buf[256] = {0};
×
297
  if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) {
×
298
    code = terrno;
×
299
    goto _EXIT;
×
300
  }
301

302
  SSChkpMetaOnS3* p = taosMemoryCalloc(1, sizeof(SSChkpMetaOnS3));
×
303
  if (p == NULL) {
×
304
    code = terrno;
×
305
    goto _EXIT;
×
306
  }
307
  n = sscanf(buf, META_ON_S3_FORMATE, p->pCurrName, &p->currChkptId, p->pManifestName, &p->manifestChkptId,
×
308
             p->processName, &p->processId);
×
309
  if (n != 6) {
×
310
    code = TSDB_CODE_INVALID_MSG;
×
311
    taosMemoryFree(p);
×
312
    goto _EXIT;
×
313
  }
314

315
  if (p->currChkptId != p->manifestChkptId) {
×
316
    code = TSDB_CODE_INVALID_MSG;
×
317
    taosMemoryFree(p);
×
318
    goto _EXIT;
×
319
  }
320
  *pMeta = p;
×
321
  code = 0;
×
322
_EXIT:
×
323
  taosCloseFile(&pFile);
×
324
  taosMemoryFree(metaPath);
×
325
  return code;
×
326
}
327

328
int32_t remoteChkp_validAndCvtMeta(char* path, SSChkpMetaOnS3* pMeta, int64_t chkpId) {
×
329
  int32_t code = 0;
×
330
  int32_t nBytes = 0;
×
331

332
  int32_t cap = strlen(path) + 64;
×
333
  char*   src = taosMemoryCalloc(1, cap);
×
334
  char*   dst = taosMemoryCalloc(1, cap);
×
335
  if (src == NULL || dst == NULL) {
×
336
    code = terrno;
×
337
    goto _EXIT;
×
338
  }
339

340
  if (pMeta->currChkptId != chkpId || pMeta->manifestChkptId != chkpId) {
×
341
    code = TSDB_CODE_INVALID_CFG;
×
342
    goto _EXIT;
×
343
  }
344
  // rename current_chkp/mainfest to current
345
  for (int i = 0; i < 2; i++) {
×
346
    char* key = (i == 0 ? pMeta->pCurrName : pMeta->pManifestName);
×
347
    if (strlen(key) <= 0) {
×
348
      code = TSDB_CODE_INVALID_PARA;
×
349
      goto _EXIT;
×
350
    }
351

352
    nBytes = snprintf(src, cap, "%s%s%s_%" PRId64 "", path, TD_DIRSEP, key, pMeta->currChkptId);
×
353
    if (nBytes <= 0 || nBytes >= cap) {
×
354
      code = TSDB_CODE_OUT_OF_RANGE;
×
355
      goto _EXIT;
×
356
    }
357

358
    if (taosStatFile(src, NULL, NULL, NULL) != 0) {
×
359
      code = terrno;
×
360
      goto _EXIT;
×
361
    }
362

363
    nBytes = snprintf(dst, cap, "%s%s%s", path, TD_DIRSEP, key);
×
364
    if (nBytes <= 0 || nBytes >= cap) {
×
365
      code = TSDB_CODE_OUT_OF_RANGE;
×
366
      goto _EXIT;
×
367
    }
368

369
    code = taosRenameFile(src, dst);
×
370
    if (code != 0) {
×
371
      goto _EXIT;
×
372
    }
373

374
    memset(src, 0, cap);
×
375
    memset(dst, 0, cap);
×
376
  }
377
  code = 0;
×
378

379
// rename manifest_chkp to manifest
380
_EXIT:
×
381
  taosMemoryFree(src);
×
382
  taosMemoryFree(dst);
×
383
  return code;
×
384
}
385
int32_t remoteChkpGetDelFile(char* path, SArray* toDel) {
×
386
  int32_t code = 0;
×
387
  int32_t nBytes = 0;
×
388

389
  SSChkpMetaOnS3* pMeta = NULL;
×
390
  code = remoteChkp_readMetaData(path, &pMeta);
×
391
  if (code != 0) {
×
392
    return code;
×
393
  }
394

395
  for (int i = 0; i < 2; i++) {
×
396
    char* key = (i == 0 ? pMeta->pCurrName : pMeta->pManifestName);
×
397

398
    int32_t cap = strlen(key) + 32;
×
399
    char*   p = taosMemoryCalloc(1, cap);
×
400
    if (p == NULL) {
×
401
      taosMemoryFree(pMeta);
×
402
      return terrno;
×
403
    }
404

405
    nBytes = snprintf(p, cap, "%s_%" PRId64 "", key, pMeta->currChkptId);
×
406
    if (nBytes <= 0 || nBytes >= cap) {
×
407
      taosMemoryFree(pMeta);
×
408
      taosMemoryFree(p);
×
409
      return TSDB_CODE_OUT_OF_RANGE;
×
410
    }
411
    if (taosArrayPush(toDel, &p) == NULL) {
×
412
      taosMemoryFree(pMeta);
×
413
      taosMemoryFree(p);
×
414
      return terrno;
×
415
    }
416
  }
417

418
  return 0;
×
419
}
420

421
void cleanDir(const char* pPath, const char* id) {
4,679✔
422
  if (pPath == NULL) {
4,679!
423
    stError("%s try to clean dir, but path is NULL", id);
×
424
    return;
×
425
  }
426

427
  if (taosIsDir(pPath)) {
4,679!
428
    taosRemoveDir(pPath);
4,679✔
429
    if (taosMkDir(pPath) != 0) {
4,679!
430
      stError("%s failed to create dir:%s", id, pPath);
×
431
    }
432
    stInfo("%s clear dir:%s, succ", id, pPath);
4,678!
433
  }
434
}
435

436
int32_t createDirIfNotExist(const char* pPath) {
14,031✔
437
  if (!taosIsDir(pPath)) {
14,031✔
438
    return taosMulMkDir(pPath);
13,697✔
439
  } else {
440
    return 0;
336✔
441
  }
442
}
443

444
int32_t rebuildFromRemoteChkp_rsync(const char* key, char* checkpointPath, int64_t checkpointId, char* defaultPath) {
×
445
  int32_t code = 0;
×
446
  if (taosIsDir(checkpointPath)) {
×
447
    taosRemoveDir(checkpointPath);
×
448
    stDebug("remove local checkpoint data dir:%s succ", checkpointPath);
×
449
  }
450

451
  cleanDir(defaultPath, key);
×
452
  stDebug("clear local default dir before downloading checkpoint data:%s succ", defaultPath);
×
453

454
  code = streamTaskDownloadCheckpointData(key, checkpointPath, checkpointId);
×
455
  if (code != 0) {
×
456
    stError("failed to download checkpoint data:%s", key);
×
457
    return code;
×
458
  }
459

460
  stDebug("download remote checkpoint data for checkpointId:%" PRId64 ", %s", checkpointId, key);
×
461
  return backendCopyFiles(checkpointPath, defaultPath);
×
462
}
463

464
int32_t rebuildDataFromS3(char* chkpPath, int64_t chkpId) {
×
465
  SSChkpMetaOnS3* pMeta = NULL;
×
466

467
  int32_t code = remoteChkp_readMetaData(chkpPath, &pMeta);
×
468
  if (code != 0) {
×
469
    return code;
×
470
  }
471

472
  if (pMeta->currChkptId != chkpId || pMeta->manifestChkptId != chkpId) {
×
473
    taosMemoryFree(pMeta);
×
474
    return TSDB_CODE_INVALID_PARA;
×
475
  }
476

477
  code = remoteChkp_validAndCvtMeta(chkpPath, pMeta, chkpId);
×
478
  if (code != 0) {
×
479
    taosMemoryFree(pMeta);
×
480
    return code;
×
481
  }
482
  taosMemoryFree(pMeta);
×
483

484
  return chkpAddExtraInfo(chkpPath, chkpId, pMeta->processId);
×
485
}
486

487
int32_t rebuildFromRemoteChkp_s3(const char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
×
488
  int8_t  rename = 0;
×
489
  int32_t code = streamTaskDownloadCheckpointData(key, chkpPath, chkpId);
×
490
  if (code != 0) {
×
491
    return code;
×
492
  }
493

494
  int32_t cap = strlen(defaultPath) + 32;
×
495

496
  char* defaultTmp = taosMemoryCalloc(1, cap);
×
497
  if (defaultTmp == NULL) {
×
498
    return terrno;
×
499
  }
500

501
  int32_t nBytes = snprintf(defaultPath, cap, "%s%s", defaultPath, "_tmp");
×
502
  if (nBytes <= 0 || nBytes >= cap) {
×
503
    taosMemoryFree(defaultPath);
×
504
    return TSDB_CODE_OUT_OF_RANGE;
×
505
  }
506

507
  if (taosIsDir(defaultTmp)) taosRemoveDir(defaultTmp);
×
508
  if (taosIsDir(defaultPath)) {
×
509
    code = taosRenameFile(defaultPath, defaultTmp);
×
510
    if (code != 0) {
×
511
      goto _EXIT;
×
512
    } else {
513
      rename = 1;
×
514
    }
515
  } else {
516
    code = taosMkDir(defaultPath);
×
517
    if (code != 0) {
×
518
      code = TAOS_SYSTEM_ERROR(errno);
×
519
      goto _EXIT;
×
520
    }
521
  }
522

523
  code = rebuildDataFromS3(chkpPath, chkpId);
×
524
  if (code != 0) {
×
525
    goto _EXIT;
×
526
  }
527

528
  code = backendCopyFiles(chkpPath, defaultPath);
×
529
  if (code != 0) {
×
530
    goto _EXIT;
×
531
  }
532
  code = 0;
×
533

534
_EXIT:
×
535
  if (code != 0) {
×
536
    if (rename) {
×
537
      TAOS_UNUSED(taosRenameFile(defaultTmp, defaultPath));
×
538
    }
539
  }
540

541
  if (taosIsDir(defaultPath)) {
×
542
    taosRemoveDir(defaultPath);
×
543
  }
544

545
  taosMemoryFree(defaultTmp);
×
546
  return code;
×
547
}
548

549
int32_t rebuildFromRemoteCheckpoint(const char* key, char* checkpointPath, int64_t checkpointId, char* defaultPath) {
17✔
550
  ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType();
17✔
551
  if (type == DATA_UPLOAD_S3) {
17!
552
    return rebuildFromRemoteChkp_s3(key, checkpointPath, checkpointId, defaultPath);
×
553
  } else if (type == DATA_UPLOAD_RSYNC) {
17!
554
    return rebuildFromRemoteChkp_rsync(key, checkpointPath, checkpointId, defaultPath);
×
555
  } else {
556
    stError("%s no remote backup checkpoint data for:%" PRId64, key, checkpointId);
17!
557
  }
558

559
  return -1;
17✔
560
}
561

562
int32_t copyFiles_create(char* src, char* dst, int8_t type) {
60✔
563
  // create and copy file
564
  int32_t err = taosCopyFile(src, dst);
60✔
565

566
  if (errno == EXDEV || errno == ENOTSUP) {
60!
567
    errno = 0;
×
568
    return 0;
×
569
  }
570
  return 0;
60✔
571
}
572
int32_t copyFiles_hardlink(char* src, char* dst, int8_t type) {
120✔
573
  // same fs and hard link
574
  return taosLinkFile(src, dst);
120✔
575
}
576

577
int32_t backendFileCopyFilesImpl(const char* src, const char* dst) {
30✔
578
  const char* current = "CURRENT";
30✔
579
  size_t      currLen = strlen(current);
30✔
580

581
  const char* info = "info";
30✔
582
  size_t      infoLen = strlen(info);
30✔
583

584
  int32_t code = 0;
30✔
585
  int32_t sLen = strlen(src);
30✔
586
  int32_t dLen = strlen(dst);
30✔
587
  int32_t cap = TMAX(sLen, dLen) + 64;
30✔
588
  int32_t nBytes = 0;
30✔
589

590
  char* srcName = taosMemoryCalloc(1, cap);
30!
591
  char* dstName = taosMemoryCalloc(1, cap);
30!
592
  if (srcName == NULL || dstName == NULL) {
30!
593
    taosMemoryFree(srcName);
×
594
    taosMemoryFree(dstName);
×
595
    return terrno;
×
596
  }
597

598
  // copy file to dst
599
  TdDirPtr pDir = taosOpenDir(src);
30✔
600
  if (pDir == NULL) {
30!
601
    code = terrno;
×
602
    goto _ERROR;
×
603
  }
604

605
  errno = 0;
30✔
606
  TdDirEntryPtr de = NULL;
30✔
607
  while ((de = taosReadDir(pDir)) != NULL) {
270✔
608
    char* name = taosGetDirEntryName(de);
240✔
609
    if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) {
240✔
610
      continue;
60✔
611
    }
612

613
    nBytes = snprintf(srcName, cap, "%s%s%s", src, TD_DIRSEP, name);
180✔
614
    if (nBytes <= 0 || nBytes >= cap) {
180!
615
      code = TSDB_CODE_OUT_OF_RANGE;
×
616
      goto _ERROR;
×
617
    }
618

619
    nBytes = snprintf(dstName, cap, "%s%s%s", dst, TD_DIRSEP, name);
180✔
620
    if (nBytes <= 0 || nBytes >= cap) {
180!
621
      code = TSDB_CODE_OUT_OF_RANGE;
×
622
      goto _ERROR;
×
623
    }
624

625
    if (strncmp(name, current, strlen(name) <= currLen ? strlen(name) : currLen) == 0) {
180✔
626
      code = copyFiles_create(srcName, dstName, 0);
30✔
627
      if (code != 0) {
30!
628
        code = TAOS_SYSTEM_ERROR(errno);
×
629
        stError("failed to copy file, detail: %s to %s reason:%s", srcName, dstName, tstrerror(code));
×
630
        goto _ERROR;
×
631
      }
632
    } else if (strncmp(name, info, strlen(name) <= infoLen ? strlen(name) : infoLen) == 0) {
150✔
633
      code = copyFiles_create(srcName, dstName, 0);
30✔
634
      if (code != 0) {
30!
635
        code = TAOS_SYSTEM_ERROR(errno);
×
636
        stError("failed to copy file, detail: %s to %s reason:%s", srcName, dstName, tstrerror(code));
×
637
        goto _ERROR;
×
638
      }
639

640
    } else {
641
      code = copyFiles_hardlink(srcName, dstName, 0);
120✔
642
      if (code != 0) {
120!
643
        code = TAOS_SYSTEM_ERROR(errno);
×
644
        stError("failed to hard link file, detail:%s to %s, reason:%s", srcName, dstName, tstrerror(code));
×
645
        goto _ERROR;
×
646
      } else {
647
        stDebug("succ hard link file:%s to %s", srcName, dstName);
120✔
648
      }
649
    }
650

651
    memset(srcName, 0, cap);
180✔
652
    memset(dstName, 0, cap);
180✔
653
  }
654

655
  taosMemoryFreeClear(srcName);
30!
656
  taosMemoryFreeClear(dstName);
30!
657
  TAOS_UNUSED(taosCloseDir(&pDir));
30✔
658
  return code;
30✔
659

660
_ERROR:
×
661
  taosMemoryFreeClear(srcName);
×
662
  taosMemoryFreeClear(dstName);
×
663
  TAOS_UNUSED(taosCloseDir(&pDir));
×
664
  return code;
×
665
}
666

667
int32_t backendCopyFiles(const char* src, const char* dst) { return backendFileCopyFilesImpl(src, dst); }
30✔
668

669
static int32_t rebuildFromLocalCheckpoint(const char* pTaskIdStr, const char* checkpointPath, int64_t checkpointId,
47✔
670
                                          const char* defaultPath, int64_t* processVer) {
671
  int32_t code = 0;
47✔
672
  cleanDir(defaultPath, pTaskIdStr);
47✔
673

674
  if (taosIsDir(checkpointPath) && isValidCheckpoint(checkpointPath)) {
47!
675
    stDebug("%s local checkpoint data existed, checkpointId:%" PRId64 " copy to backend dir", pTaskIdStr, checkpointId);
30✔
676

677
    code = backendCopyFiles(checkpointPath, defaultPath);
30✔
678
    if (code != TSDB_CODE_SUCCESS) {
30!
679
      cleanDir(defaultPath, pTaskIdStr);
×
680
      stError("%s failed to start stream backend from local %s, reason:%s, try download checkpoint from remote",
×
681
              pTaskIdStr, checkpointPath, tstrerror(TAOS_SYSTEM_ERROR(code)));
682
      code = TSDB_CODE_SUCCESS;
×
683
    } else {
684
      stInfo("%s copy checkpoint data from:%s to:%s succ, try to start stream backend", pTaskIdStr, checkpointPath,
30!
685
             defaultPath);
686
    }
687
  } else {
688
    code = terrno;
17✔
689
    stError("%s no valid data for checkpointId:%" PRId64 " in %s", pTaskIdStr, checkpointId, checkpointPath);
17!
690
  }
691

692
  return code;
47✔
693
}
694

695
int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId, char** dbPrefixPath, char** dbPath,
4,677✔
696
                              int64_t* processVer) {
697
  int32_t code = 0;
4,677✔
698

699
  char* prefixPath = NULL;
4,677✔
700
  char* defaultPath = NULL;
4,677✔
701
  char* checkpointPath = NULL;
4,677✔
702
  char* checkpointRoot = NULL;
4,677✔
703

704
  int32_t cap = strlen(path) + 128;
4,677✔
705
  int32_t nBytes;
706

707
  // alloc buf
708
  prefixPath = taosMemoryCalloc(1, cap);
4,677!
709
  defaultPath = taosMemoryCalloc(1, cap);
4,679!
710
  checkpointPath = taosMemoryCalloc(1, cap);
4,679!
711
  checkpointRoot = taosMemoryCalloc(1, cap);
4,679!
712
  if (prefixPath == NULL || defaultPath == NULL || checkpointPath == NULL || checkpointRoot == NULL) {
4,679!
713
    code = terrno;
×
714
    goto _EXIT;
×
715
  }
716

717
  nBytes = snprintf(prefixPath, cap, "%s%s%s", path, TD_DIRSEP, key);
4,679✔
718
  if (nBytes <= 0 || nBytes >= cap) {
4,679!
719
    code = TSDB_CODE_OUT_OF_RANGE;
×
720
    goto _EXIT;
×
721
  }
722

723
  code = createDirIfNotExist(prefixPath);
4,679✔
724
  if (code != 0) {
4,678!
725
    code = TAOS_SYSTEM_ERROR(errno);
×
726
    goto _EXIT;
×
727
  }
728

729
  nBytes = snprintf(defaultPath, cap, "%s%s%s", prefixPath, TD_DIRSEP, "state");
4,678✔
730
  if (nBytes <= 0 || nBytes >= cap) {
4,678!
731
    code = TSDB_CODE_OUT_OF_RANGE;
×
732
    goto _EXIT;
×
733
  }
734

735
  code = createDirIfNotExist(defaultPath);
4,679✔
736
  if (code != 0) {
4,679!
737
    code = TAOS_SYSTEM_ERROR(errno);
×
738
    goto _EXIT;
×
739
  }
740

741
  nBytes = snprintf(checkpointRoot, cap, "%s%s%s", prefixPath, TD_DIRSEP, "checkpoints");
4,679✔
742
  if (nBytes <= 0 || nBytes >= cap) {
4,679!
743
    code = TSDB_CODE_OUT_OF_RANGE;
×
744
    goto _EXIT;
×
745
  }
746

747
  code = createDirIfNotExist(checkpointRoot);
4,679✔
748
  if (code != 0) {
4,677!
749
    code = TAOS_SYSTEM_ERROR(errno);
×
750
    goto _EXIT;
×
751
  }
752

753
  stDebug("%s check local backend dir:%s, checkpointId:%" PRId64 " succ", key, defaultPath, chkptId);
4,677✔
754
  if (chkptId > 0) {
4,678✔
755
    nBytes = snprintf(checkpointPath, cap, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP,
47✔
756
                      "checkpoint", chkptId);
757
    if (nBytes <= 0 || nBytes >= cap) {
47!
758
      code = TSDB_CODE_OUT_OF_RANGE;
×
759
      goto _EXIT;
×
760
    }
761

762
    code = rebuildFromLocalCheckpoint(key, checkpointPath, chkptId, defaultPath, processVer);
47✔
763
    if (code != 0) {
47✔
764
      code = rebuildFromRemoteCheckpoint(key, checkpointPath, chkptId, defaultPath);
17✔
765
    }
766

767
    if (code != 0) {
47✔
768
      stError("failed to start stream backend at %s, restart from defaultPath:%s, reason:%s", checkpointPath,
17!
769
              defaultPath, tstrerror(code));
770
      code = 0;  // reset the error code
17✔
771
    }
772
  } else {  // no valid checkpoint id
773
    stInfo("%s no valid checkpoint ever generated, no need to copy checkpoint data, clean defaultPath:%s", key,
4,631!
774
           defaultPath);
775
    cleanDir(defaultPath, key);
4,632✔
776
  }
777

778
  *dbPath = defaultPath;
4,679✔
779
  *dbPrefixPath = prefixPath;
4,679✔
780
  defaultPath = NULL;
4,679✔
781
  prefixPath = NULL;
4,679✔
782

783
  code = 0;
4,679✔
784

785
_EXIT:
4,678✔
786
  taosMemoryFree(defaultPath);
4,678!
787
  taosMemoryFree(prefixPath);
4,679!
788
  taosMemoryFree(checkpointPath);
4,679!
789
  taosMemoryFree(checkpointRoot);
4,679!
790
  return code;
4,679✔
791
}
792
bool streamBackendDataIsExist(const char* path, int64_t chkpId) {
141✔
793
  bool    exist = true;
141✔
794
  int32_t cap = strlen(path) + 32;
141✔
795

796
  char* state = taosMemoryCalloc(1, cap);
141!
797
  if (state == NULL) {
141!
798
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
799
    return false;
×
800
  }
801

802
  int16_t nBytes = snprintf(state, cap, "%s%s%s", path, TD_DIRSEP, "state");
141✔
803
  if (nBytes <= 0 || nBytes >= cap) {
141!
804
    terrno = TSDB_CODE_OUT_OF_RANGE;
×
805
    exist = false;
×
806
  } else {
807
    if (!taosDirExist(state)) {
141!
808
      exist = false;
141✔
809
    }
810
  }
811

812
  taosMemoryFree(state);
141!
813
  return exist;
141✔
814
}
815

816
int32_t streamBackendInit(const char* streamPath, int64_t chkpId, int32_t vgId, SBackendWrapper** pBackend) {
2✔
817
  char*   backendPath = NULL;
2✔
818
  int32_t code = 0;
2✔
819
  int32_t lino = 0;
2✔
820
  char*   err = NULL;
2✔
821
  size_t  nCf = 0;
2✔
822

823
  *pBackend = NULL;
2✔
824

825
  code = rebuildDirFromCheckpoint(streamPath, chkpId, &backendPath);
2✔
826
  TSDB_CHECK_CODE(code, lino, _EXIT);
2!
827

828
  stDebug("start to init stream backend:%s, checkpointId:%" PRId64 " vgId:%d", backendPath, chkpId, vgId);
2!
829

830
  uint32_t         dbMemLimit = nextPow2(tsMaxStreamBackendCache) << 20;
2✔
831
  SBackendWrapper* pHandle = taosMemoryCalloc(1, sizeof(SBackendWrapper));
2!
832
  TSDB_CHECK_NULL(pHandle, code, lino, _EXIT, terrno);
2!
833

834
  pHandle->list = tdListNew(sizeof(SCfComparator));
2✔
835
  TSDB_CHECK_NULL(pHandle->list, code, lino, _EXIT, terrno);
2!
836

837
  code = taosThreadMutexInit(&pHandle->mutex, NULL);
2✔
838
  TSDB_CHECK_CODE(code, lino, _EXIT);
2!
839

840
  code = taosThreadMutexInit(&pHandle->cfMutex, NULL);
2✔
841
  TSDB_CHECK_CODE(code, lino, _EXIT);
2!
842

843
  pHandle->cfInst = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
2✔
844
  TSDB_CHECK_NULL(pHandle->cfInst, code, lino, _EXIT, terrno);
2!
845

846
  rocksdb_env_t* env = rocksdb_create_default_env();  // rocksdb_envoptions_create();
2✔
847

848
  int32_t nBGThread = tsNumOfSnodeStreamThreads <= 2 ? 1 : tsNumOfSnodeStreamThreads / 2;
2!
849
  rocksdb_env_set_low_priority_background_threads(env, nBGThread);
2✔
850
  rocksdb_env_set_high_priority_background_threads(env, nBGThread);
2✔
851

852
  rocksdb_cache_t* cache = rocksdb_cache_create_lru(dbMemLimit / 2);
2✔
853

854
  rocksdb_options_t* opts = rocksdb_options_create();
2✔
855
  rocksdb_options_set_env(opts, env);
2✔
856
  rocksdb_options_set_create_if_missing(opts, 1);
2✔
857
  rocksdb_options_set_create_missing_column_families(opts, 1);
2✔
858
  rocksdb_options_set_max_total_wal_size(opts, dbMemLimit);
2✔
859
  rocksdb_options_set_recycle_log_file_num(opts, 6);
2✔
860
  rocksdb_options_set_max_write_buffer_number(opts, 3);
2✔
861
  rocksdb_options_set_info_log_level(opts, 1);
2✔
862
  rocksdb_options_set_db_write_buffer_size(opts, dbMemLimit);
2✔
863
  rocksdb_options_set_write_buffer_size(opts, dbMemLimit / 2);
2✔
864
  rocksdb_options_set_atomic_flush(opts, 1);
2✔
865

866
  pHandle->env = env;
2✔
867
  pHandle->dbOpt = opts;
2✔
868
  pHandle->cache = cache;
2✔
869
  pHandle->filterFactory = rocksdb_compactionfilterfactory_create(
2✔
870
      NULL, destroyCompactFilteFactory, compactFilteFactoryCreateFilter, compactFilteFactoryName);
871
  rocksdb_options_set_compaction_filter_factory(pHandle->dbOpt, pHandle->filterFactory);
2✔
872

873
  char** cfs = rocksdb_list_column_families(opts, backendPath, &nCf, &err);
2✔
874
  if (nCf == 0 || nCf == 1 || err != NULL) {
2!
875
    taosMemoryFreeClear(err);
2!
876
    pHandle->db = rocksdb_open(opts, backendPath, &err);
2✔
877
    if (err != NULL) {
2!
878
      stError("failed to open rocksdb, path:%s, reason:%s", backendPath, err);
×
879
      taosMemoryFreeClear(err);
×
880
      rocksdb_list_column_families_destroy(cfs, nCf);
×
881
      goto _EXIT;
×
882
    }
883
  } else {
884
    /*
885
      list all cf and get prefix
886
    */
887
    code = streamStateOpenBackendCf(pHandle, (char*)backendPath, cfs, nCf);
×
888
    if (code != 0) {
×
889
      rocksdb_list_column_families_destroy(cfs, nCf);
×
890
      goto _EXIT;
×
891
    }
892
  }
893

894
  if (cfs != NULL) {
2!
895
    rocksdb_list_column_families_destroy(cfs, nCf);
2✔
896
  }
897

898
  stDebug("init stream backend at %s, backend:%p, vgId:%d", backendPath, pHandle, vgId);
2!
899
  taosMemoryFreeClear(backendPath);
2!
900

901
  *pBackend = pHandle;
2✔
902
  return code;
2✔
903

904
_EXIT:
×
905
  rocksdb_options_destroy(opts);
×
906
  rocksdb_cache_destroy(cache);
×
907
  rocksdb_env_destroy(env);
×
908
  streamMutexDestroy(&pHandle->mutex);
×
909
  streamMutexDestroy(&pHandle->cfMutex);
×
910
  taosHashCleanup(pHandle->cfInst);
×
911
  pHandle->list = tdListFree(pHandle->list);
×
912
  taosMemoryFree(pHandle);
×
913
  stDebug("failed to init stream backend at %s, vgId:%d line:%d code:%s", backendPath, vgId, lino, tstrerror(code));
×
914
  taosMemoryFree(backendPath);
×
915
  return code;
×
916
}
917
void streamBackendCleanup(void* arg) {
2✔
918
  SBackendWrapper* pHandle = (SBackendWrapper*)arg;
2✔
919

920
  void* pIter = taosHashIterate(pHandle->cfInst, NULL);
2✔
921
  while (pIter != NULL) {
2!
922
    RocksdbCfInst* inst = *(RocksdbCfInst**)pIter;
×
923
    destroyRocksdbCfInst(inst);
×
924
    pIter = taosHashIterate(pHandle->cfInst, pIter);
×
925
  }
926

927
  taosHashCleanup(pHandle->cfInst);
2✔
928

929
  if (pHandle->db) {
2!
930
    rocksdb_close(pHandle->db);
2✔
931
    pHandle->db = NULL;
2✔
932
  }
933
  rocksdb_options_destroy(pHandle->dbOpt);
2✔
934
  rocksdb_env_destroy(pHandle->env);
2✔
935
  rocksdb_cache_destroy(pHandle->cache);
2✔
936

937
  SListNode* head = tdListPopHead(pHandle->list);
2✔
938
  while (head != NULL) {
2!
939
    streamStateDestroyCompar(head->data);
×
940
    taosMemoryFree(head);
×
941
    head = tdListPopHead(pHandle->list);
×
942
  }
943

944
  pHandle->list = tdListFree(pHandle->list);
2✔
945
  streamMutexDestroy(&pHandle->mutex);
2✔
946

947
  streamMutexDestroy(&pHandle->cfMutex);
2✔
948
  stDebug("destroy stream backend :%p", pHandle);
2!
949
  taosMemoryFree(pHandle);
2!
950
  return;
2✔
951
}
952
void streamBackendHandleCleanup(void* arg) {
×
953
  SBackendCfWrapper* wrapper = arg;
×
954
  bool               remove = wrapper->remove;
×
955
  TAOS_UNUSED(taosThreadRwlockWrlock(&wrapper->rwLock));
×
956

957
  stDebug("start to do-close backendwrapper %p, %s", wrapper, wrapper->idstr);
×
958
  if (wrapper->rocksdb == NULL) {
×
959
    TAOS_UNUSED(taosThreadRwlockUnlock(&wrapper->rwLock));
×
960
    return;
×
961
  }
962

963
  int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
×
964

965
  char* err = NULL;
×
966
  if (remove) {
×
967
    for (int i = 0; i < cfLen; i++) {
×
968
      if (wrapper->pHandle[i] != NULL) rocksdb_drop_column_family(wrapper->rocksdb, wrapper->pHandle[i], &err);
×
969
      if (err != NULL) {
×
970
        stError("failed to drop cf:%s_%s, reason:%s", wrapper->idstr, ginitDict[i].key, err);
×
971
        taosMemoryFreeClear(err);
×
972
      }
973
    }
974
  } else {
975
    rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
×
976
    rocksdb_flushoptions_set_wait(flushOpt, 1);
×
977

978
    for (int i = 0; i < cfLen; i++) {
×
979
      if (wrapper->pHandle[i] != NULL) rocksdb_flush_cf(wrapper->rocksdb, flushOpt, wrapper->pHandle[i], &err);
×
980
      if (err != NULL) {
×
981
        stError("failed to flush cf:%s_%s, reason:%s", wrapper->idstr, ginitDict[i].key, err);
×
982
        taosMemoryFreeClear(err);
×
983
      }
984
    }
985
    rocksdb_flushoptions_destroy(flushOpt);
×
986
  }
987

988
  for (int i = 0; i < cfLen; i++) {
×
989
    if (wrapper->pHandle[i] != NULL) {
×
990
      rocksdb_column_family_handle_destroy(wrapper->pHandle[i]);
×
991
    }
992
  }
993
  taosMemoryFreeClear(wrapper->pHandle);
×
994

995
  for (int i = 0; i < cfLen; i++) {
×
996
    rocksdb_options_destroy(wrapper->cfOpts[i]);
×
997
    rocksdb_block_based_options_destroy(((RocksdbCfParam*)wrapper->param)[i].tableOpt);
×
998
  }
999

1000
  if (remove) {
×
1001
    streamBackendDelCompare(wrapper->pBackend, wrapper->pComparNode);
×
1002
  }
1003
  rocksdb_writeoptions_destroy(wrapper->writeOpts);
×
1004
  wrapper->writeOpts = NULL;
×
1005

1006
  rocksdb_readoptions_destroy(wrapper->readOpts);
×
1007
  wrapper->readOpts = NULL;
×
1008
  taosMemoryFreeClear(wrapper->cfOpts);
×
1009
  taosMemoryFreeClear(wrapper->param);
×
1010
  TAOS_UNUSED(taosThreadRwlockUnlock(&wrapper->rwLock));
×
1011

1012
  TAOS_UNUSED(taosThreadRwlockDestroy(&wrapper->rwLock));
×
1013
  wrapper->rocksdb = NULL;
×
1014
  // taosReleaseRef(streamBackendId, wrapper->backendId);
1015

1016
  stDebug("end to do-close backendwrapper %p, %s", wrapper, wrapper->idstr);
×
1017
  taosMemoryFree(wrapper);
×
1018
  return;
×
1019
}
1020

1021
#ifdef BUILD_NO_CALL
1022
int32_t getLatestCheckpoint(void* arg, int64_t* checkpoint) {
1023
  SStreamMeta* pMeta = arg;
1024
  taosWLockLatch(&pMeta->chkpDirLock);
1025
  int64_t tc = 0;
1026
  int32_t sz = taosArrayGetSize(pMeta->chkpSaved);
1027
  if (sz <= 0) {
1028
    taosWUnLockLatch(&pMeta->chkpDirLock);
1029
    return -1;
1030
  } else {
1031
    tc = *(int64_t*)taosArrayGetLast(pMeta->chkpSaved);
1032
  }
1033

1034
  taosArrayPush(pMeta->chkpInUse, &tc);
1035

1036
  *checkpoint = tc;
1037
  taosWUnLockLatch(&pMeta->chkpDirLock);
1038
  return 0;
1039
}
1040
/*
1041
 *  checkpointSave |--cp1--|--cp2--|--cp3--|--cp4--|--cp5--|
1042
 *  chkpInUse: |--cp2--|--cp4--|
1043
 *  chkpInUse is doing translation, cannot del until
1044
 *  replication is finished
1045
 */
1046
int32_t delObsoleteCheckpoint(void* arg, const char* path) {
1047
  SStreamMeta* pMeta = arg;
1048

1049
  taosWLockLatch(&pMeta->chkpDirLock);
1050

1051
  SArray* chkpDel = taosArrayInit(10, sizeof(int64_t));
1052
  SArray* chkpDup = taosArrayInit(10, sizeof(int64_t));
1053

1054
  int64_t firsId = 0;
1055
  if (taosArrayGetSize(pMeta->chkpInUse) >= 1) {
1056
    firsId = *(int64_t*)taosArrayGet(pMeta->chkpInUse, 0);
1057

1058
    for (int i = 0; i < taosArrayGetSize(pMeta->chkpSaved); i++) {
1059
      int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpSaved, i);
1060
      if (id >= firsId) {
1061
        taosArrayPush(chkpDup, &id);
1062
      } else {
1063
        taosArrayPush(chkpDel, &id);
1064
      }
1065
    }
1066
  } else {
1067
    int32_t sz = taosArrayGetSize(pMeta->chkpSaved);
1068
    int32_t dsz = sz - pMeta->chkpCap;  // del size
1069

1070
    for (int i = 0; i < dsz; i++) {
1071
      int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpSaved, i);
1072
      taosArrayPush(chkpDel, &id);
1073
    }
1074
    for (int i = dsz < 0 ? 0 : dsz; i < sz; i++) {
1075
      int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpSaved, i);
1076
      taosArrayPush(chkpDup, &id);
1077
    }
1078
  }
1079
  taosArrayDestroy(pMeta->chkpSaved);
1080
  pMeta->chkpSaved = chkpDup;
1081

1082
  taosWUnLockLatch(&pMeta->chkpDirLock);
1083

1084
  for (int i = 0; i < taosArrayGetSize(chkpDel); i++) {
1085
    int64_t id = *(int64_t*)taosArrayGet(chkpDel, i);
1086
    char    tbuf[256] = {0};
1087
    sprintf(tbuf, "%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, id);
1088
    if (taosIsDir(tbuf)) {
1089
      taosRemoveDir(tbuf);
1090
    }
1091
  }
1092
  taosArrayDestroy(chkpDel);
1093
  return 0;
1094
}
1095
#endif
1096
/*
1097
 *  checkpointSave |--cp1--|--cp2--|--cp3--|--cp4--|--cp5--|
1098
 *  chkpInUse: |--cp2--|--cp4--|
1099
 *  chkpInUse is doing translation, cannot del until
1100
 *  replication is finished
1101
 */
1102
int32_t chkpMayDelObsolete(void* arg, int64_t chkpId, char* path) {
2,195✔
1103
  int32_t         code = 0;
2,195✔
1104
  STaskDbWrapper* pBackend = arg;
2,195✔
1105
  SArray *        chkpDel = NULL, *chkpDup = NULL;
2,195✔
1106
  TAOS_UNUSED(taosThreadRwlockWrlock(&pBackend->chkpDirLock));
2,195✔
1107

1108
  if (taosArrayPush(pBackend->chkpSaved, &chkpId) == NULL) {
4,390!
1109
    TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
1110
  }
1111

1112
  chkpDel = taosArrayInit(8, sizeof(int64_t));
2,195✔
1113
  if (chkpDel == NULL) {
2,195!
1114
    TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
1115
  }
1116

1117
  chkpDup = taosArrayInit(8, sizeof(int64_t));
2,195✔
1118
  if (chkpDup == NULL) {
2,195!
1119
    TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
1120
  }
1121

1122
  int64_t firsId = 0;
2,195✔
1123
  if (taosArrayGetSize(pBackend->chkpInUse) >= 1) {
2,195✔
1124
    firsId = *(int64_t*)taosArrayGet(pBackend->chkpInUse, 0);
6✔
1125

1126
    for (int i = 0; i < taosArrayGetSize(pBackend->chkpSaved); i++) {
12✔
1127
      int64_t id = *(int64_t*)taosArrayGet(pBackend->chkpSaved, i);
6✔
1128
      if (id >= firsId) {
6!
1129
        if (taosArrayPush(chkpDup, &id) == NULL) {
6!
1130
          TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
1131
        }
1132
      } else {
1133
        if (taosArrayPush(chkpDel, &id) == NULL) {
×
1134
          TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
1135
        }
1136
      }
1137
    }
1138
  } else {
1139
    int32_t sz = taosArrayGetSize(pBackend->chkpSaved);
2,189✔
1140
    int32_t dsz = sz - pBackend->chkpCap;  // del size
2,189✔
1141

1142
    for (int i = 0; i < dsz; i++) {
2,189!
1143
      int64_t id = *(int64_t*)taosArrayGet(pBackend->chkpSaved, i);
×
1144
      if (taosArrayPush(chkpDel, &id) == NULL) {
×
1145
        TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
1146
      }
1147
    }
1148
    for (int i = dsz < 0 ? 0 : dsz; i < sz; i++) {
4,418✔
1149
      int64_t id = *(int64_t*)taosArrayGet(pBackend->chkpSaved, i);
2,229✔
1150
      if (taosArrayPush(chkpDup, &id) == NULL) {
2,229!
1151
        TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
1152
      }
1153
    }
1154
  }
1155

1156
  taosArrayDestroy(pBackend->chkpSaved);
2,195✔
1157
  pBackend->chkpSaved = chkpDup;
2,195✔
1158
  chkpDup = NULL;
2,195✔
1159

1160
  TAOS_UNUSED(taosThreadRwlockUnlock(&pBackend->chkpDirLock));
2,195✔
1161

1162
  for (int i = 0; i < taosArrayGetSize(chkpDel); i++) {
2,195!
1163
    int64_t id = *(int64_t*)taosArrayGet(chkpDel, i);
×
1164
    char    tbuf[256] = {0};
×
1165
    if (snprintf(tbuf, sizeof(tbuf), "%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, id) >= sizeof(tbuf)) {
×
1166
      code = TSDB_CODE_OUT_OF_RANGE;
×
1167
      TAOS_CHECK_GOTO(code, NULL, _exception);
×
1168
    }
1169

1170
    stInfo("backend remove obsolete checkpoint: %s", tbuf);
×
1171
    if (taosIsDir(tbuf)) {
×
1172
      taosRemoveDir(tbuf);
×
1173
    }
1174
  }
1175
  taosArrayDestroy(chkpDel);
2,195✔
1176
  return 0;
2,195✔
1177
_exception:
×
1178
  taosArrayDestroy(chkpDup);
×
1179
  taosArrayDestroy(chkpDel);
×
1180
  TAOS_UNUSED(taosThreadRwlockUnlock(&pBackend->chkpDirLock));
×
1181
  return code;
×
1182
}
1183

1184
int chkpIdComp(const void* a, const void* b) {
14✔
1185
  int64_t x = *(int64_t*)a;
14✔
1186
  int64_t y = *(int64_t*)b;
14✔
1187
  if (x == y) return 0;
14!
1188

1189
  return x < y ? -1 : 1;
14!
1190
}
1191
int32_t taskDbLoadChkpInfo(STaskDbWrapper* pBackend) {
4,679✔
1192
  int32_t code = 0;
4,679✔
1193
  int32_t nBytes = 0;
4,679✔
1194
  int32_t cap = 256;
4,679✔
1195
  char*   pChkpDir = taosMemoryCalloc(1, cap);
4,679!
1196
  if (pChkpDir == NULL) {
4,679!
1197
    return terrno;
×
1198
  }
1199

1200
  nBytes = snprintf(pChkpDir, cap, "%s%s%s", pBackend->path, TD_DIRSEP, "checkpoints");
4,679✔
1201
  if (nBytes >= cap) {
4,679!
1202
    return TSDB_CODE_OUT_OF_RANGE;
×
1203
  }
1204
  if (!taosIsDir(pChkpDir)) {
4,679!
1205
    taosMemoryFree(pChkpDir);
×
1206
    return 0;
×
1207
  }
1208
  TdDirPtr pDir = taosOpenDir(pChkpDir);
4,679✔
1209
  if (pDir == NULL) {
4,679!
1210
    taosMemoryFree(pChkpDir);
×
1211
    return 0;
×
1212
  }
1213
  TdDirEntryPtr de = NULL;
4,679✔
1214
  while ((de = taosReadDir(pDir)) != NULL) {
14,087✔
1215
    if (strcmp(taosGetDirEntryName(de), ".") == 0 || strcmp(taosGetDirEntryName(de), "..") == 0) continue;
9,408✔
1216

1217
    if (taosDirEntryIsDir(de)) {
52!
1218
      char    checkpointPrefix[32] = {0};
53✔
1219
      int64_t checkpointId = 0;
53✔
1220

1221
      int ret = sscanf(taosGetDirEntryName(de), "checkpoint%" PRId64 "", &checkpointId);
53✔
1222
      if (ret == 1) {
53!
1223
        if (taosArrayPush(pBackend->chkpSaved, &checkpointId) == NULL) {
106!
1224
          TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
1225
        }
1226
      }
1227
    } else {
1228
      continue;
×
1229
    }
1230
  }
1231
  taosArraySort(pBackend->chkpSaved, chkpIdComp);
4,675✔
1232

1233
  taosMemoryFree(pChkpDir);
4,675!
1234
  TAOS_UNUSED(taosCloseDir(&pDir));
4,679✔
1235

1236
  return 0;
4,679✔
1237
_exception:
×
1238
  taosMemoryFree(pChkpDir);
×
1239
  TAOS_UNUSED(taosCloseDir(&pDir));
×
1240
  return code;
×
1241
}
1242
int32_t chkpGetAllDbCfHandle2(STaskDbWrapper* pBackend, rocksdb_column_family_handle_t*** ppHandle) {
2,189✔
1243
  int32_t code = 0;
2,189✔
1244
  SArray* pHandle = taosArrayInit(8, POINTER_BYTES);
2,189✔
1245
  for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
17,537✔
1246
    if (pBackend->pCf[i]) {
15,344✔
1247
      rocksdb_column_family_handle_t* p = pBackend->pCf[i];
5,202✔
1248
      if (taosArrayPush(pHandle, &p) == NULL) {
5,202!
1249
        code = terrno;
×
1250
        goto _exception;
×
1251
      }
1252
    }
1253
  }
1254
  int32_t nCf = taosArrayGetSize(pHandle);
2,193✔
1255
  if (nCf == 0) {
2,192!
1256
    taosArrayDestroy(pHandle);
×
1257
    return nCf;
×
1258
  }
1259

1260
  rocksdb_column_family_handle_t** ppCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
2,192!
1261
  if (ppCf == NULL) {
2,195!
1262
    TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
1263
  }
1264
  for (int i = 0; i < nCf; i++) {
7,400✔
1265
    ppCf[i] = taosArrayGetP(pHandle, i);
5,205✔
1266
  }
1267

1268
  taosArrayDestroy(pHandle);
2,195✔
1269

1270
  *ppHandle = ppCf;
2,195✔
1271
  return nCf;
2,195✔
1272
_exception:
×
1273
  taosArrayDestroy(pHandle);
×
1274
  return code;
×
1275
}
1276

1277
int32_t chkpDoDbCheckpoint(rocksdb_t* db, char* path) {
2,192✔
1278
  int32_t               code = -1;
2,192✔
1279
  char*                 err = NULL;
2,192✔
1280
  rocksdb_checkpoint_t* cp = rocksdb_checkpoint_object_create(db, &err);
2,192✔
1281
  if (cp == NULL || err != NULL) {
2,195!
1282
    stError("failed to do checkpoint at:%s, reason:%s", path, err);
×
1283
    taosMemoryFreeClear(err);
×
1284
    code = TSDB_CODE_THIRDPARTY_ERROR;
×
1285
    goto _ERROR;
×
1286
  }
1287
  rocksdb_checkpoint_create(cp, path, UINT64_MAX, &err);
2,195✔
1288
  if (err != NULL) {
2,195!
1289
    stError("failed to do checkpoint at:%s, reason:%s", path, err);
×
1290
    taosMemoryFreeClear(err);
×
1291
    code = TSDB_CODE_THIRDPARTY_ERROR;
×
1292
  } else {
1293
    code = 0;
2,195✔
1294
  }
1295
_ERROR:
2,195✔
1296
  rocksdb_checkpoint_object_destroy(cp);
2,195✔
1297
  return code;
2,195✔
1298
}
1299

1300
int32_t chkpPreFlushDb(rocksdb_t* db, rocksdb_column_family_handle_t** cf, int32_t nCf) {
1,838✔
1301
  int   code = 0;
1,838✔
1302
  char* err = NULL;
1,838✔
1303

1304
  rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
1,838✔
1305
  if (flushOpt == NULL) {
1,838!
1306
    return TSDB_CODE_OUT_OF_MEMORY;
×
1307
  }
1308

1309
  rocksdb_flushoptions_set_wait(flushOpt, 1);
1,838✔
1310

1311
  rocksdb_flush_cfs(db, flushOpt, cf, nCf, &err);
1,838✔
1312
  if (err != NULL) {
1,839!
1313
    stError("failed to flush db before streamBackend clean up, reason:%s", err);
×
1314
    taosMemoryFree(err);
×
1315
    code = TSDB_CODE_THIRDPARTY_ERROR;
×
1316
  }
1317
  rocksdb_flushoptions_destroy(flushOpt);
1,839✔
1318
  return code;
1,839✔
1319
}
1320

1321
int32_t chkpPreBuildDir(char* path, int64_t chkpId, char** chkpDir, char** chkpIdDir) {
2,194✔
1322
  int32_t code = 0;
2,194✔
1323
  int32_t cap = strlen(path) + 256;
2,194✔
1324
  int32_t nBytes = 0;
2,194✔
1325

1326
  char* pChkpDir = taosMemoryCalloc(1, cap);
2,194!
1327
  char* pChkpIdDir = taosMemoryCalloc(1, cap);
2,191!
1328
  if (pChkpDir == NULL || pChkpIdDir == NULL) {
2,193!
1329
    code = terrno;
×
1330
    goto _EXIT;
×
1331
  }
1332

1333
  nBytes = snprintf(pChkpDir, cap, "%s%s%s", path, TD_DIRSEP, "checkpoints");
2,193✔
1334
  if (nBytes <= 0 || nBytes >= cap) {
2,193!
1335
    code = TSDB_CODE_OUT_OF_RANGE;
1✔
1336
    goto _EXIT;
1✔
1337
  }
1338

1339
  nBytes = snprintf(pChkpIdDir, cap, "%s%s%s%" PRId64, pChkpDir, TD_DIRSEP, "checkpoint", chkpId);
2,192✔
1340
  if (nBytes <= 0 || nBytes >= cap) {
2,192!
1341
    code = TSDB_CODE_OUT_OF_RANGE;
×
1342
    goto _EXIT;
×
1343
  }
1344

1345
  code = taosMulModeMkDir(pChkpDir, 0755, true);
2,194✔
1346
  if (code != 0) {
2,187!
1347
    code = terrno;
×
1348
    stError("failed to prepare checkpoint dir, path:%s, reason:%s", path, tstrerror(code));
×
1349
    goto _EXIT;
×
1350
  }
1351

1352
  if (taosIsDir(pChkpIdDir)) {
2,187!
UNCOV
1353
    stInfo("stream rm exist checkpoint%s", pChkpIdDir);
×
UNCOV
1354
    taosRemoveDir(pChkpIdDir);
×
1355
  }
1356

1357
  *chkpDir = pChkpDir;
2,191✔
1358
  *chkpIdDir = pChkpIdDir;
2,191✔
1359
  return 0;
2,191✔
1360
_EXIT:
×
1361
  taosMemoryFree(pChkpDir);
×
1362
  taosMemoryFree(pChkpIdDir);
×
1363
  return code;
×
1364
}
1365

1366
int32_t taskDbBuildSnap(void* arg, SArray* pSnap) {
63✔
1367
  // vnode task->db
1368
  SStreamMeta* pMeta = arg;
63✔
1369

1370
  streamMutexLock(&pMeta->backendMutex);
63✔
1371
  void*   pIter = taosHashIterate(pMeta->pTaskDbUnique, NULL);
63✔
1372
  int32_t code = 0;
63✔
1373

1374
  while (pIter) {
69✔
1375
    STaskDbWrapper* pTaskDb = *(STaskDbWrapper**)pIter;
6✔
1376

1377
    void* p = taskDbAddRef(pTaskDb);
6✔
1378
    if (p == NULL) {
6!
1379
      terrno = 0;
×
1380
      pIter = taosHashIterate(pMeta->pTaskDbUnique, pIter);
×
1381
      continue;
×
1382
    }
1383

1384
    // add chkpId to in-use-ckpkIdSet
1385
    taskDbRefChkp(pTaskDb, pTaskDb->chkpId);
6✔
1386

1387
    code = taskDbDoCheckpoint(pTaskDb, pTaskDb->chkpId, ((SStreamTask*)pTaskDb->pTask)->chkInfo.processedVer);
6✔
1388
    if (code != 0) {
6!
1389
      // remove chkpId from in-use-ckpkIdSet
1390
      taskDbUnRefChkp(pTaskDb, pTaskDb->chkpId);
×
1391
      taskDbRemoveRef(pTaskDb);
×
1392
      break;
×
1393
    }
1394

1395
    SStreamTask*    pTask = pTaskDb->pTask;
6✔
1396
    SStreamTaskSnap snap = {.streamId = pTask->id.streamId,
12✔
1397
                            .taskId = pTask->id.taskId,
6✔
1398
                            .chkpId = pTaskDb->chkpId,
6✔
1399
                            .dbPrefixPath = taosStrdup(pTaskDb->path)};
6!
1400
    if (snap.dbPrefixPath == NULL) {
6!
1401
      // remove chkpid from chkp-in-use set
1402
      taskDbUnRefChkp(pTaskDb, pTaskDb->chkpId);
×
1403
      taskDbRemoveRef(pTaskDb);
×
1404
      code = terrno;
×
1405
      break;
×
1406
    }
1407
    if (taosArrayPush(pSnap, &snap) == NULL) {
6!
1408
      taskDbUnRefChkp(pTaskDb, pTaskDb->chkpId);
×
1409
      taskDbRemoveRef(pTaskDb);
×
1410
      code = terrno;
×
1411
      break;
×
1412
    }
1413

1414
    taskDbRemoveRef(pTaskDb);
6✔
1415
    pIter = taosHashIterate(pMeta->pTaskDbUnique, pIter);
6✔
1416
  }
1417
  streamMutexUnlock(&pMeta->backendMutex);
63✔
1418
  return code;
63✔
1419
}
1420
int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo) {
69✔
1421
  if (pSnapInfo == NULL) return 0;
69✔
1422
  SStreamMeta* pMeta = arg;
63✔
1423
  int32_t      code = 0;
63✔
1424
  int32_t      cap = 256;
63✔
1425
  int32_t      nBytes = 0;
63✔
1426
  streamMutexLock(&pMeta->backendMutex);
63✔
1427

1428
  char buf[256] = {0};
63✔
1429
  for (int i = 0; i < taosArrayGetSize(pSnapInfo); i++) {
69✔
1430
    SStreamTaskSnap* pSnap = taosArrayGet(pSnapInfo, i);
6✔
1431
    nBytes = snprintf(buf, cap, "0x%" PRIx64 "-0x%x", pSnap->streamId, (int32_t)pSnap->taskId);
6✔
1432
    if (nBytes <= 0 || nBytes >= cap) {
6!
1433
      code = TSDB_CODE_OUT_OF_RANGE;
×
1434
      break;
×
1435
    }
1436
    STaskDbWrapper** pTaskDb = taosHashGet(pMeta->pTaskDbUnique, buf, strlen(buf));
6✔
1437
    if (pTaskDb == NULL || *pTaskDb == NULL) {
6!
1438
      stWarn("stream backend:%p failed to find task db, streamId:% " PRId64 "", pMeta, pSnap->streamId);
6!
1439
      memset(buf, 0, sizeof(buf));
6✔
1440
      continue;
6✔
1441
    }
UNCOV
1442
    memset(buf, 0, sizeof(buf));
×
1443

UNCOV
1444
    taskDbUnRefChkp(*pTaskDb, pSnap->chkpId);
×
1445
  }
1446
  streamMutexUnlock(&pMeta->backendMutex);
63✔
1447
  return code;
63✔
1448
}
1449
#ifdef BUILD_NO_CALL
1450
int32_t streamBackendAddInUseChkp(void* arg, int64_t chkpId) {
1451
  // if (arg == NULL) return 0;
1452

1453
  // SStreamMeta* pMeta = arg;
1454
  // taosWLockLatch(&pMeta->chkpDirLock);
1455
  // taosArrayPush(pMeta->chkpInUse, &chkpId);
1456
  // taosWUnLockLatch(&pMeta->chkpDirLock);
1457
  return 0;
1458
}
1459
int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId) {
1460
  return 0;
1461
  // if (arg == NULL) return 0;
1462

1463
  // SStreamMeta* pMeta = arg;
1464
  // taosWLockLatch(&pMeta->chkpDirLock);
1465
  // if (taosArrayGetSize(pMeta->chkpInUse) > 0) {
1466
  //   int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpInUse, 0);
1467
  //   if (id == chkpId) {
1468
  //     taosArrayPopFrontBatch(pMeta->chkpInUse, 1);
1469
  //   }
1470
  // }
1471
  // taosWUnLockLatch(&pMeta->chkpDirLock);
1472
}
1473
#endif
1474

1475
/*
1476
   0
1477
*/
1478

1479
void* taskAcquireDb(int64_t refId) {
×
1480
  // acquire
1481
  void* p = taosAcquireRef(taskDbWrapperId, refId);
×
1482
  return p;
×
1483
}
1484
void taskReleaseDb(int64_t refId) {
×
1485
  // release
1486
  TAOS_UNUSED(taosReleaseRef(taskDbWrapperId, refId));
×
1487
}
×
1488

1489
int64_t taskGetDBRef(void* arg) {
×
1490
  if (arg == NULL) return -1;
×
1491
  STaskDbWrapper* pDb = arg;
×
1492
  return pDb->refId;
×
1493
}
1494

1495
int32_t chkpLoadExtraInfo(char* pChkpIdDir, int64_t* chkpId, int64_t* processId) {
4,681✔
1496
  TdFilePtr pFile = NULL;
4,681✔
1497
  int32_t   code = 0;
4,681✔
1498

1499
  char    buf[256] = {0};
4,681✔
1500
  int32_t nBytes = 0;
4,681✔
1501

1502
  int32_t len = strlen(pChkpIdDir);
4,681✔
1503
  if (len == 0) {
4,681!
1504
    code = TSDB_CODE_INVALID_PARA;
×
1505
    stError("failed to load extra info, dir:%s, reason:%s", pChkpIdDir, tstrerror(code));
×
1506
    return code;
×
1507
  }
1508

1509
  int32_t cap = len + 64;
4,681✔
1510
  char*   pDst = taosMemoryCalloc(1, cap);
4,681!
1511
  if (pDst == NULL) {
4,681!
1512
    code = terrno;
×
1513
    stError("failed to alloc memory to load extra info, dir:%s", pChkpIdDir);
×
1514
    goto _EXIT;
×
1515
  }
1516

1517
  nBytes = snprintf(pDst, cap, "%s%sinfo", pChkpIdDir, TD_DIRSEP);
4,681✔
1518
  if (nBytes <= 0 || nBytes >= cap) {
4,681!
1519
    code = TSDB_CODE_OUT_OF_RANGE;
×
1520
    stError("failed to build dst to load extra info, dir:%s", pChkpIdDir);
×
1521
    goto _EXIT;
×
1522
  }
1523

1524
  pFile = taosOpenFile(pDst, TD_FILE_READ);
4,681✔
1525
  if (pFile == NULL) {
4,681✔
1526
    // compatible with previous version
1527
    *processId = -1;
4,649✔
1528
    code = 0;
4,649✔
1529
    stWarn("failed to open file to load extra info, file:%s, reason:%s", pDst, tstrerror(terrno));
4,649!
1530
    goto _EXIT;
4,649✔
1531
  }
1532

1533
  if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) {
32!
1534
    code = terrno;
×
1535
    stError("failed to read file to load extra info, file:%s, reason:%s", pDst, tstrerror(code));
×
1536
    goto _EXIT;
×
1537
  }
1538

1539
  if (sscanf(buf, "%" PRId64 " %" PRId64 "", chkpId, processId) < 2) {
32!
1540
    code = TSDB_CODE_INVALID_PARA;
×
1541
    stError("failed to read file content to load extra info, file:%s, reason:%s", pDst, tstrerror(code));
×
1542
    goto _EXIT;
×
1543
  }
1544
  code = 0;
32✔
1545
_EXIT:
4,681✔
1546
  taosMemoryFree(pDst);
4,681!
1547
  TAOS_UNUSED(taosCloseFile(&pFile));
4,681✔
1548
  return code;
4,681✔
1549
}
1550
int32_t chkpAddExtraInfo(char* pChkpIdDir, int64_t chkpId, int64_t processId) {
2,195✔
1551
  int32_t code = 0;
2,195✔
1552

1553
  TdFilePtr pFile = NULL;
2,195✔
1554

1555
  char    buf[256] = {0};
2,195✔
1556
  int32_t nBytes = 0;
2,195✔
1557

1558
  int32_t len = strlen(pChkpIdDir);
2,195✔
1559
  if (len == 0) {
2,195!
1560
    code = TSDB_CODE_INVALID_PARA;
×
1561
    stError("failed to add extra info, dir:%s, reason:%s", pChkpIdDir, tstrerror(code));
×
1562
    return code;
×
1563
  }
1564

1565
  int32_t cap = len + 64;
2,195✔
1566
  char*   pDst = taosMemoryCalloc(1, cap);
2,195!
1567
  if (pDst == NULL) {
2,195!
1568
    code = terrno;
×
1569
    stError("failed to alloc memory to add extra info, dir:%s", pChkpIdDir);
×
1570
    goto _EXIT;
×
1571
  }
1572

1573
  nBytes = snprintf(pDst, cap, "%s%sinfo", pChkpIdDir, TD_DIRSEP);
2,195✔
1574
  if (nBytes <= 0 || nBytes >= cap) {
2,195!
1575
    code = TSDB_CODE_OUT_OF_RANGE;
×
1576
    stError("failed to build dst to add extra info, dir:%s, reason:%s", pChkpIdDir, tstrerror(code));
×
1577
    goto _EXIT;
×
1578
  }
1579

1580
  pFile = taosOpenFile(pDst, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
2,195✔
1581
  if (pFile == NULL) {
2,194!
1582
    code = terrno;
×
1583
    stError("failed to open file to add extra info, file:%s, reason:%s", pDst, tstrerror(code));
×
1584
    goto _EXIT;
×
1585
  }
1586

1587
  nBytes = snprintf(buf, sizeof(buf), "%" PRId64 " %" PRId64 "", chkpId, processId);
2,194✔
1588
  if (nBytes <= 0 || nBytes >= sizeof(buf)) {
2,194!
1589
    code = TSDB_CODE_OUT_OF_RANGE;
×
1590
    stError("failed to build content to add extra info, dir:%s,reason:%s", pChkpIdDir, tstrerror(code));
×
1591
    goto _EXIT;
×
1592
  }
1593

1594
  if (nBytes != taosWriteFile(pFile, buf, nBytes)) {
2,195!
1595
    code = terrno;
×
1596
    stError("failed to write file to add extra info, file:%s, reason:%s", pDst, tstrerror(code));
×
1597
    goto _EXIT;
×
1598
  }
1599
  code = 0;
2,195✔
1600

1601
_EXIT:
2,195✔
1602
  TAOS_UNUSED(taosCloseFile(&pFile));
2,195✔
1603
  taosMemoryFree(pDst);
2,195!
1604
  return code;
2,195✔
1605
}
1606
int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId, int64_t processId) {
2,192✔
1607
  STaskDbWrapper* pTaskDb = arg;
2,192✔
1608
  int64_t         st = taosGetTimestampMs();
2,192✔
1609
  int32_t         code = 0;
2,192✔
1610
  int64_t         refId = pTaskDb->refId;
2,192✔
1611

1612
  if (taosAcquireRef(taskDbWrapperId, refId) == NULL) {
2,192!
1613
    code = terrno;
×
1614
    return code;
×
1615
  }
1616

1617
  char* pChkpDir = NULL;
2,195✔
1618
  char* pChkpIdDir = NULL;
2,195✔
1619
  if ((code = chkpPreBuildDir(pTaskDb->path, chkpId, &pChkpDir, &pChkpIdDir)) < 0) {
2,195!
1620
    goto _EXIT;
×
1621
  }
1622
  // Get all cf and acquire cfWrappter
1623
  rocksdb_column_family_handle_t** ppCf = NULL;
2,189✔
1624

1625
  int32_t nCf = chkpGetAllDbCfHandle2(pTaskDb, &ppCf);
2,189✔
1626
  stDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pTaskDb, pChkpIdDir, nCf);
2,195✔
1627

1628
  int64_t written = atomic_load_64(&pTaskDb->dataWritten);
2,195✔
1629

1630
  // flush db
1631
  if (written > 0) {
2,195✔
1632
    stDebug("stream backend:%p start to flush db at:%s, data written:%" PRId64 "", pTaskDb, pChkpIdDir, written);
1,839✔
1633
    code = chkpPreFlushDb(pTaskDb->db, ppCf, nCf);
1,839✔
1634
    if (code != 0) goto _EXIT;
1,839!
1635
  } else {
1636
    stDebug("stream backend:%p not need flush db at:%s, data written:%" PRId64 "", pTaskDb, pChkpIdDir, written);
356✔
1637
  }
1638

1639
  // do checkpoint
1640
  if ((code = chkpDoDbCheckpoint(pTaskDb->db, pChkpIdDir)) != 0) {
2,195!
1641
    stError("stream backend:%p failed to do checkpoint at:%s", pTaskDb, pChkpIdDir);
×
1642
    goto _EXIT;
×
1643
  } else {
1644
    stDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pTaskDb, pChkpIdDir,
3,580✔
1645
            taosGetTimestampMs() - st);
1646
  }
1647

1648
  // add extra info to checkpoint
1649
  if ((code = chkpAddExtraInfo(pChkpIdDir, chkpId, processId)) != 0) {
2,195!
1650
    stError("stream backend:%p failed to add extra info to checkpoint at:%s", pTaskDb, pChkpIdDir);
×
1651
    goto _EXIT;
×
1652
  }
1653

1654
  // delete ttl checkpoint
1655
  code = chkpMayDelObsolete(pTaskDb, chkpId, pChkpDir);
2,195✔
1656
  if (code < 0) {
2,195!
1657
    goto _EXIT;
×
1658
  }
1659

1660
  TAOS_UNUSED(atomic_store_64(&pTaskDb->dataWritten, 0));
2,195✔
1661
  pTaskDb->chkpId = chkpId;
2,195✔
1662

1663
_EXIT:
2,195✔
1664

1665
  // clear checkpoint dir if failed
1666
  if (code != 0 && pChkpDir != NULL) {
2,195!
1667
    if (taosDirExist(pChkpIdDir)) {
×
1668
      TAOS_UNUSED(taosRemoveDir(pChkpIdDir));
×
1669
    }
1670
  }
1671
  taosMemoryFree(pChkpIdDir);
2,195!
1672
  taosMemoryFree(pChkpDir);
2,195!
1673

1674
  TAOS_UNUSED(taosReleaseRef(taskDbWrapperId, refId));
2,195✔
1675
  taosMemoryFree(ppCf);
2,195!
1676
  return code;
2,195✔
1677
}
1678

1679
int32_t streamBackendDoCheckpoint(void* arg, int64_t chkpId, int64_t processVer) {
2,183✔
1680
  return taskDbDoCheckpoint(arg, chkpId, processVer);
2,183✔
1681
}
1682

1683
SListNode* streamBackendAddCompare(void* backend, void* arg) {
×
1684
  SBackendWrapper* pHandle = (SBackendWrapper*)backend;
×
1685
  SListNode*       node = NULL;
×
1686
  streamMutexLock(&pHandle->mutex);
×
1687
  node = tdListAdd(pHandle->list, arg);
×
1688
  streamMutexUnlock(&pHandle->mutex);
×
1689
  return node;
×
1690
}
1691
void streamBackendDelCompare(void* backend, void* arg) {
×
1692
  SBackendWrapper* pHandle = (SBackendWrapper*)backend;
×
1693
  SListNode*       node = NULL;
×
1694
  streamMutexLock(&pHandle->mutex);
×
1695
  node = tdListPopNode(pHandle->list, arg);
×
1696
  streamMutexUnlock(&pHandle->mutex);
×
1697
  if (node) {
×
1698
    streamStateDestroyCompar(node->data);
×
1699
    taosMemoryFree(node);
×
1700
  }
1701
}
×
1702
void destroyRocksdbCfInst(RocksdbCfInst* inst) {
×
1703
  int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
×
1704
  if (inst->pHandle) {
×
1705
    for (int i = 0; i < cfLen; i++) {
×
1706
      if (inst->pHandle[i]) rocksdb_column_family_handle_destroy((inst->pHandle)[i]);
×
1707
    }
1708
    taosMemoryFree(inst->pHandle);
×
1709
  }
1710

1711
  if (inst->cfOpt) {
×
1712
    for (int i = 0; i < cfLen; i++) {
×
1713
      rocksdb_options_destroy(inst->cfOpt[i]);
×
1714
      rocksdb_block_based_options_destroy(((RocksdbCfParam*)inst->param)[i].tableOpt);
×
1715
    }
1716
    taosMemoryFreeClear(inst->cfOpt);
×
1717
    taosMemoryFreeClear(inst->param);
×
1718
  }
1719
  if (inst->wOpt) rocksdb_writeoptions_destroy(inst->wOpt);
×
1720
  if (inst->rOpt) rocksdb_readoptions_destroy(inst->rOpt);
×
1721

1722
  taosMemoryFree(inst);
×
1723
}
×
1724

1725
// |key|-----value------|
1726
// |key|ttl|len|userData
1727

1728
int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
28,551✔
1729
  int len = aLen < bLen ? aLen : bLen;
28,551✔
1730
  int ret = memcmp(aBuf, bBuf, len);
28,551✔
1731
  if (ret == 0) {
28,551✔
1732
    if (aLen < bLen)
8,664✔
1733
      return -1;
67✔
1734
    else if (aLen > bLen)
8,597✔
1735
      return 1;
40✔
1736
    else
1737
      return 0;
8,557✔
1738
  } else {
1739
    return ret;
19,887✔
1740
  }
1741
}
1742
int streamStateValueIsStale(char* v) {
31,030✔
1743
  int64_t ts = 0;
31,030!
1744
  TAOS_UNUSED(taosDecodeFixedI64(v, &ts));
1745
  return (ts != 0 && ts < taosGetTimestampMs()) ? 1 : 0;
31,030!
1746
}
1747
int iterValueIsStale(rocksdb_iterator_t* iter) {
15,656✔
1748
  size_t len;
1749
  char*  v = (char*)rocksdb_iter_value(iter, &len);
15,656✔
1750
  return streamStateValueIsStale(v);
15,655✔
1751
}
1752
int defaultKeyEncode(void* k, char* buf) {
26,594✔
1753
  int len = strlen((char*)k);
26,594✔
1754
  memcpy(buf, (char*)k, len);
26,594✔
1755
  return len;
26,594✔
1756
}
1757
int defaultKeyDecode(void* k, char* buf) {
×
1758
  int len = strlen(buf);
×
1759
  memcpy(k, buf, len);
×
1760
  return len;
×
1761
}
1762
int defaultKeyToString(void* k, char* buf) {
15,368✔
1763
  // just to debug
1764
  return sprintf(buf, "key: %s", (char*)k);
15,368✔
1765
}
1766
//
1767
//  SStateKey
1768
//  |--groupid--|---ts------|--opNum----|
1769
//  |--uint64_t-|-uint64_t--|--int64_t--|
1770
//
1771
//
1772
//
1773
int stateKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
31,810,505✔
1774
  SStateKey key1, key2;
1775
  memset(&key1, 0, sizeof(key1));
31,810,505✔
1776
  memset(&key2, 0, sizeof(key2));
31,810,505✔
1777

1778
  char* p1 = (char*)aBuf;
31,810,505✔
1779
  char* p2 = (char*)bBuf;
31,810,505!
1780

1781
  p1 = taosDecodeFixedU64(p1, &key1.key.groupId);
31,810,505!
1782
  p2 = taosDecodeFixedU64(p2, &key2.key.groupId);
31,810,505!
1783

1784
  p1 = taosDecodeFixedI64(p1, &key1.key.ts);
31,810,505✔
1785
  p2 = taosDecodeFixedI64(p2, &key2.key.ts);
31,810,505✔
1786

1787
  TAOS_UNUSED(taosDecodeFixedI64(p1, &key1.opNum));
1788
  TAOS_UNUSED(taosDecodeFixedI64(p2, &key2.opNum));
1789

1790
  return stateKeyCmpr(&key1, sizeof(key1), &key2, sizeof(key2));
31,810,505✔
1791
}
1792

1793
int stateKeyEncode(void* k, char* buf) {
1,974,951✔
1794
  SStateKey* key = k;
1,974,951✔
1795
  int        len = 0;
1,974,951✔
1796
  len += taosEncodeFixedU64((void**)&buf, key->key.groupId);
1,974,951✔
1797
  len += taosEncodeFixedI64((void**)&buf, key->key.ts);
1,974,951✔
1798
  len += taosEncodeFixedI64((void**)&buf, key->opNum);
1,974,951!
1799
  return len;
1,974,951✔
1800
}
1801
int stateKeyDecode(void* k, char* buf) {
18✔
1802
  SStateKey* key = k;
18✔
1803
  int        len = 0;
18✔
1804
  char*      p = buf;
18✔
1805
  p = taosDecodeFixedU64(p, &key->key.groupId);
18!
1806
  p = taosDecodeFixedI64(p, &key->key.ts);
18!
1807
  p = taosDecodeFixedI64(p, &key->opNum);
18!
1808
  return p - buf;
18✔
1809
}
1810

1811
int stateKeyToString(void* k, char* buf) {
1,818,020✔
1812
  SStateKey* key = k;
1,818,020✔
1813
  int        n = 0;
1,818,020✔
1814
  n += sprintf(buf + n, "[groupId:%" PRIu64 ",", key->key.groupId);
1,818,020✔
1815
  n += sprintf(buf + n, "ts:%" PRIi64 ",", key->key.ts);
1,818,020✔
1816
  n += sprintf(buf + n, "opNum:%" PRIi64 "]", key->opNum);
1,818,020✔
1817
  return n;
1,818,020✔
1818
}
1819

1820
//
1821
// SStateSessionKey
1822
//  |-----------SSessionKey----------|
1823
//  |-----STimeWindow-----|
1824
//  |---skey--|---ekey----|--groupId-|--opNum--|
1825
//  |---int64-|--int64_t--|--uint64--|--int64_t|
1826
// |
1827
//
1828
int stateSessionKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
412,402✔
1829
  SStateSessionKey w1, w2;
1830
  memset(&w1, 0, sizeof(w1));
412,402✔
1831
  memset(&w2, 0, sizeof(w2));
412,402✔
1832

1833
  char* p1 = (char*)aBuf;
412,402✔
1834
  char* p2 = (char*)bBuf;
412,402✔
1835

1836
  p1 = taosDecodeFixedI64(p1, &w1.key.win.skey);
412,402✔
1837
  p2 = taosDecodeFixedI64(p2, &w2.key.win.skey);
412,402✔
1838

1839
  p1 = taosDecodeFixedI64(p1, &w1.key.win.ekey);
412,402✔
1840
  p2 = taosDecodeFixedI64(p2, &w2.key.win.ekey);
412,402✔
1841

1842
  p1 = taosDecodeFixedU64(p1, &w1.key.groupId);
412,402!
1843
  p2 = taosDecodeFixedU64(p2, &w2.key.groupId);
412,402✔
1844

1845
  p1 = taosDecodeFixedI64(p1, &w1.opNum);
412,402✔
1846
  p2 = taosDecodeFixedI64(p2, &w2.opNum);
412,402✔
1847

1848
  return stateSessionKeyCmpr(&w1, sizeof(w1), &w2, sizeof(w2));
412,402✔
1849
}
1850
int stateSessionKeyEncode(void* k, char* buf) {
21,524✔
1851
  SStateSessionKey* sess = k;
21,524✔
1852
  int               len = 0;
21,524✔
1853
  len += taosEncodeFixedI64((void**)&buf, sess->key.win.skey);
21,524✔
1854
  len += taosEncodeFixedI64((void**)&buf, sess->key.win.ekey);
21,524!
1855
  len += taosEncodeFixedU64((void**)&buf, sess->key.groupId);
21,524!
1856
  len += taosEncodeFixedI64((void**)&buf, sess->opNum);
21,524✔
1857
  return len;
21,524✔
1858
}
1859
int stateSessionKeyDecode(void* k, char* buf) {
9,680✔
1860
  SStateSessionKey* sess = k;
9,680✔
1861
  int               len = 0;
9,680✔
1862

1863
  char* p = buf;
9,680✔
1864
  p = taosDecodeFixedI64(p, &sess->key.win.skey);
9,680!
1865
  p = taosDecodeFixedI64(p, &sess->key.win.ekey);
9,680!
1866
  p = taosDecodeFixedU64(p, &sess->key.groupId);
9,680!
1867
  p = taosDecodeFixedI64(p, &sess->opNum);
9,680!
1868
  return p - buf;
9,680✔
1869
}
1870
int stateSessionKeyToString(void* k, char* buf) {
1,741✔
1871
  SStateSessionKey* key = k;
1,741✔
1872
  int               n = 0;
1,741✔
1873
  n += sprintf(buf + n, "[skey:%" PRIi64 ",", key->key.win.skey);
1,741✔
1874
  n += sprintf(buf + n, "ekey:%" PRIi64 ",", key->key.win.ekey);
1,741✔
1875
  n += sprintf(buf + n, "groupId:%" PRIu64 ",", key->key.groupId);
1,741✔
1876
  n += sprintf(buf + n, "opNum:%" PRIi64 "]", key->opNum);
1,741✔
1877
  return n;
1,741✔
1878
}
1879

1880
/**
1881
 *  SWinKey
1882
 *  |------groupId------|-----ts------|
1883
 *  |------uint64-------|----int64----|
1884
 */
1885
int winKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
74,806✔
1886
  SWinKey w1, w2;
1887
  memset(&w1, 0, sizeof(w1));
74,806✔
1888
  memset(&w2, 0, sizeof(w2));
74,806✔
1889

1890
  char* p1 = (char*)aBuf;
74,806✔
1891
  char* p2 = (char*)bBuf;
74,806!
1892

1893
  p1 = taosDecodeFixedU64(p1, &w1.groupId);
74,806!
1894
  p2 = taosDecodeFixedU64(p2, &w2.groupId);
74,806!
1895

1896
  p1 = taosDecodeFixedI64(p1, &w1.ts);
74,806!
1897
  p2 = taosDecodeFixedI64(p2, &w2.ts);
74,806✔
1898

1899
  return winKeyCmpr(&w1, sizeof(w1), &w2, sizeof(w2));
74,806✔
1900
}
1901

1902
int winKeyEncode(void* k, char* buf) {
7,475✔
1903
  SWinKey* key = k;
7,475✔
1904
  int      len = 0;
7,475✔
1905
  len += taosEncodeFixedU64((void**)&buf, key->groupId);
7,475!
1906
  len += taosEncodeFixedI64((void**)&buf, key->ts);
7,475!
1907
  return len;
7,475✔
1908
}
1909

1910
int winKeyDecode(void* k, char* buf) {
5,544✔
1911
  SWinKey* key = k;
5,544✔
1912
  int      len = 0;
5,544✔
1913
  char*    p = buf;
5,544✔
1914
  p = taosDecodeFixedU64(p, &key->groupId);
5,544!
1915
  p = taosDecodeFixedI64(p, &key->ts);
5,544!
1916
  return len;
5,544✔
1917
}
1918

1919
int winKeyToString(void* k, char* buf) {
1,418✔
1920
  SWinKey* key = k;
1,418✔
1921
  int      n = 0;
1,418✔
1922
  n += sprintf(buf + n, "[groupId:%" PRIu64 ",", key->groupId);
1,418✔
1923
  n += sprintf(buf + n, "ts:%" PRIi64 "]", key->ts);
1,418✔
1924
  return n;
1,418✔
1925
}
1926
/*
1927
 * STupleKey
1928
 * |---groupId---|---ts---|---exprIdx---|
1929
 * |---uint64--|---int64--|---int32-----|
1930
 */
1931
int tupleKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
3,024✔
1932
  STupleKey w1, w2;
1933
  memset(&w1, 0, sizeof(w1));
3,024✔
1934
  memset(&w2, 0, sizeof(w2));
3,024✔
1935

1936
  char* p1 = (char*)aBuf;
3,024✔
1937
  char* p2 = (char*)bBuf;
3,024!
1938

1939
  p1 = taosDecodeFixedU64(p1, &w1.groupId);
3,024!
1940
  p2 = taosDecodeFixedU64(p2, &w2.groupId);
3,024!
1941

1942
  p1 = taosDecodeFixedI64(p1, &w1.ts);
3,024!
1943
  p2 = taosDecodeFixedI64(p2, &w2.ts);
3,024!
1944

1945
  p1 = taosDecodeFixedI32(p1, &w1.exprIdx);
3,024!
1946
  p2 = taosDecodeFixedI32(p2, &w2.exprIdx);
3,024✔
1947

1948
  return STupleKeyCmpr(&w1, sizeof(w1), &w2, sizeof(w2));
3,024✔
1949
}
1950

1951
int tupleKeyEncode(void* k, char* buf) {
300✔
1952
  STupleKey* key = k;
300✔
1953
  int        len = 0;
300✔
1954
  len += taosEncodeFixedU64((void**)&buf, key->groupId);
300!
1955
  len += taosEncodeFixedI64((void**)&buf, key->ts);
300!
1956
  len += taosEncodeFixedI32((void**)&buf, key->exprIdx);
300!
1957
  return len;
300✔
1958
}
1959
int tupleKeyDecode(void* k, char* buf) {
×
1960
  STupleKey* key = k;
×
1961
  int        len = 0;
×
1962
  char*      p = buf;
×
1963
  p = taosDecodeFixedU64(p, &key->groupId);
×
1964
  p = taosDecodeFixedI64(p, &key->ts);
×
1965
  p = taosDecodeFixedI32(p, &key->exprIdx);
×
1966
  return len;
×
1967
}
1968
int tupleKeyToString(void* k, char* buf) {
×
1969
  int        n = 0;
×
1970
  STupleKey* key = k;
×
1971
  n += sprintf(buf + n, "[groupId:%" PRIu64 ",", key->groupId);
×
1972
  n += sprintf(buf + n, "ts:%" PRIi64 ",", key->ts);
×
1973
  n += sprintf(buf + n, "exprIdx:%d]", key->exprIdx);
×
1974
  return n;
×
1975
}
1976

1977
int parKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
34,457✔
1978
  int64_t w1, w2;
1979
  memset(&w1, 0, sizeof(w1));
1980
  memset(&w2, 0, sizeof(w2));
1981
  char* p1 = (char*)aBuf;
34,457✔
1982
  char* p2 = (char*)bBuf;
34,457!
1983

1984
  TAOS_UNUSED(taosDecodeFixedI64(p1, &w1));
1985
  TAOS_UNUSED(taosDecodeFixedI64(p2, &w2));
1986
  if (w1 == w2) {
34,457✔
1987
    return 0;
2,101✔
1988
  } else {
1989
    return w1 < w2 ? -1 : 1;
32,356✔
1990
  }
1991
}
1992
int parKeyEncode(void* k, char* buf) {
568,742✔
1993
  int64_t* groupid = k;
568,742✔
1994
  int      len = taosEncodeFixedI64((void**)&buf, *groupid);
568,742!
1995
  return len;
568,742✔
1996
}
1997
int parKeyDecode(void* k, char* buf) {
396✔
1998
  char*    p = buf;
396✔
1999
  int64_t* groupid = k;
396!
2000

2001
  p = taosDecodeFixedI64(p, groupid);
396✔
2002
  return p - buf;
396✔
2003
}
2004
int parKeyToString(void* k, char* buf) {
6,189✔
2005
  int64_t* key = k;
6,189✔
2006
  int      n = 0;
6,189✔
2007
  n = sprintf(buf + n, "[groupId:%" PRIi64 "]", *key);
6,189✔
2008
  return n;
6,189✔
2009
}
2010
int32_t valueToString(void* k, char* buf) {
×
2011
  SStreamValue* key = k;
×
2012
  int           n = 0;
×
2013
  n += sprintf(buf + n, "[unixTimestamp:%" PRIi64 ",", key->unixTimestamp);
×
2014
  n += sprintf(buf + n, "len:%d,", key->len);
×
2015
  n += sprintf(buf + n, "data:%s]", key->data);
×
2016
  return n;
×
2017
}
2018

2019
/*1: stale, 0: no stale*/
2020
int32_t valueIsStale(void* k, int64_t ts) {
×
2021
  SStreamValue* key = k;
×
2022
  if (key->unixTimestamp < ts) {
×
2023
    return 1;
×
2024
  }
2025
  return 0;
×
2026
}
2027

2028
void destroyCompare(void* arg) {
31,605✔
2029
  TAOS_UNUSED(arg);
2030
  return;
31,605✔
2031
}
2032

2033
int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest) {
1,824,579✔
2034
  int32_t      code = 0;
1,824,579✔
2035
  SStreamValue key = {.unixTimestamp = ttl, .len = vlen, .rawLen = vlen, .compress = 0, .data = (char*)(value)};
1,824,579✔
2036
  int32_t      len = 0;
1,824,579✔
2037
  char*        dst = NULL;
1,824,579✔
2038
  if (vlen > 512) {
1,824,579✔
2039
    dst = taosMemoryCalloc(1, vlen + 128);
484,169!
2040
    if (dst == NULL) {
486,000!
2041
      return terrno;
×
2042
    }
2043
    int32_t dstCap = vlen + 128;
486,000✔
2044
    int32_t compressedSize = LZ4_compress_default((char*)value, dst, vlen, dstCap);
486,000✔
2045
    if (compressedSize < vlen) {
482,419!
2046
      key.compress = 1;
482,420✔
2047
      key.len = compressedSize;
482,420✔
2048
      value = dst;
482,420✔
2049
    }
2050
  }
2051

2052
  if (*dest == NULL) {
1,822,829✔
2053
    size_t size = sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len;
19,221✔
2054
    char*  p = taosMemoryCalloc(1, size);
19,221!
2055
    if (p == NULL) {
19,225!
2056
      code = terrno;
×
2057
      goto _exception;
×
2058
    }
2059
    char* buf = p;
19,225✔
2060
    len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp);
19,225!
2061
    len += taosEncodeFixedI32((void**)&buf, key.len);
19,225!
2062
    len += taosEncodeFixedI32((void**)&buf, key.rawLen);
19,225!
2063
    len += taosEncodeFixedI8((void**)&buf, key.compress);
19,225✔
2064
    if (value != NULL && key.len != 0) {
19,225✔
2065
      len += taosEncodeBinary((void**)&buf, (char*)value, key.len);
26,920!
2066
    }
2067
    *dest = p;
19,225✔
2068
  } else {
2069
    char* buf = *dest;
1,803,608✔
2070
    len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp);
1,803,608!
2071
    len += taosEncodeFixedI32((void**)&buf, key.len);
1,803,608!
2072
    len += taosEncodeFixedI32((void**)&buf, key.rawLen);
1,803,608!
2073
    len += taosEncodeFixedI8((void**)&buf, key.compress);
1,803,608!
2074
    if (value != NULL && key.len != 0) {
1,803,608!
2075
      len += taosEncodeBinary((void**)&buf, (char*)value, key.len);
3,607,782!
2076
    }
2077
  }
2078

2079
  taosMemoryFree(dst);
1,822,833!
2080
  return len;
1,826,673✔
2081
_exception:
×
2082
  taosMemoryFree(dst);
×
2083
  return code;
×
2084
}
2085

2086
/*
2087
 *  ret >= 0 : found valid value
2088
 *  ret < 0 : error or timeout
2089
 */
2090
int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest) {
14,357✔
2091
  int32_t      code = -1;
14,357✔
2092
  SStreamValue key = {0};
14,357✔
2093
  char*        p = value;
14,357✔
2094

2095
  char* pCompressData = NULL;
14,357✔
2096
  char* pOutput = NULL;
14,357✔
2097
  if (streamStateValueIsStale(p)) {
14,357!
2098
    code = TSDB_CODE_INVALID_DATA_FMT;
×
2099
    goto _EXCEPT;
×
2100
  }
2101

2102
  p = taosDecodeFixedI64(p, &key.unixTimestamp);
14,358✔
2103
  p = taosDecodeFixedI32(p, &key.len);
14,358✔
2104
  if (key.len == 0) {
14,358!
2105
    code = 0;
×
2106
    goto _EXCEPT;
×
2107
  }
2108
  if (vlen == (sizeof(key.unixTimestamp) + sizeof(key.len) + key.len)) {
14,358!
2109
    // compatiable with previous data
2110
    p = taosDecodeBinary(p, (void**)&pOutput, key.len);
×
2111
    if (p == NULL) {
×
2112
      code = terrno;
×
2113
      goto _EXCEPT;
×
2114
    }
2115

2116
  } else {
2117
    p = taosDecodeFixedI32(p, &key.rawLen);
14,358✔
2118
    p = taosDecodeFixedI8(p, &key.compress);
14,358✔
2119
    if (vlen != (sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len)) {
14,358!
2120
      stError("vlen: %d, read len: %d", vlen, key.len);
×
2121
      code = TSDB_CODE_INVALID_DATA_FMT;
×
2122
      goto _EXCEPT;
×
2123
    }
2124
    if (key.compress == 1) {
14,358✔
2125
      p = taosDecodeBinary(p, (void**)&pCompressData, key.len);
6,168!
2126
      if (p == NULL) {
6,169!
2127
        code = terrno;
×
2128
        goto _EXCEPT;
×
2129
      }
2130
      pOutput = taosMemoryCalloc(1, key.rawLen);
6,169!
2131
      if (pOutput == NULL) {
6,169!
2132
        code = terrno;
×
2133
        goto _EXCEPT;
×
2134
      }
2135

2136
      int32_t rawLen = LZ4_decompress_safe(pCompressData, pOutput, key.len, key.rawLen);
6,169✔
2137
      if (rawLen != key.rawLen) {
6,168!
2138
        stError("read invalid read, rawlen: %d, currlen: %d", key.rawLen, key.len);
×
2139
        code = TSDB_CODE_INVALID_DATA_FMT;
×
2140
        goto _EXCEPT;
×
2141
      }
2142
      key.len = rawLen;
6,168✔
2143
    } else {
2144
      p = taosDecodeBinary(p, (void**)&pOutput, key.len);
8,190✔
2145
      if (p == NULL) {
8,187!
2146
        code = terrno;
×
2147
        goto _EXCEPT;
×
2148
      }
2149
    }
2150
  }
2151

2152
  if (ttl != NULL) *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - taosGetTimestampMs();
14,355!
2153

2154
  code = 0;
14,355✔
2155
  if (dest) {
14,355✔
2156
    *dest = pOutput;
13,714✔
2157
    pOutput = NULL;
13,714✔
2158
  }
2159
  taosMemoryFree(pCompressData);
14,355!
2160
  taosMemoryFree(pOutput);
14,355!
2161
  return key.len;
14,356✔
2162

2163
_EXCEPT:
×
2164
  if (dest != NULL) *dest = NULL;
×
2165
  if (ttl != NULL) *ttl = 0;
×
2166

2167
  taosMemoryFree(pOutput);
×
2168
  taosMemoryFree(pCompressData);
×
2169
  return code;
×
2170
}
2171

2172
const char* compareDefaultName(void* arg) {
72,106✔
2173
  TAOS_UNUSED(arg);
2174
  return ginitDict[0].key;
72,106✔
2175
}
2176
const char* compareStateName(void* arg) {
21,917✔
2177
  TAOS_UNUSED(arg);
2178
  return ginitDict[1].key;
21,917✔
2179
}
2180
const char* compareWinKeyName(void* arg) { return ginitDict[2].key; }
1,721✔
2181
const char* compareSessionKeyName(void* arg) {
8,535✔
2182
  TAOS_UNUSED(arg);
2183
  return ginitDict[3].key;
8,535✔
2184
}
2185
const char* compareFuncKeyName(void* arg) {
15✔
2186
  TAOS_UNUSED(arg);
2187
  return ginitDict[4].key;
15✔
2188
}
2189
const char* compareParKeyName(void* arg) {
13,157✔
2190
  TAOS_UNUSED(arg);
2191
  return ginitDict[5].key;
13,157✔
2192
}
2193
const char* comparePartagKeyName(void* arg) {
410✔
2194
  TAOS_UNUSED(arg);
2195
  return ginitDict[6].key;
410✔
2196
}
2197

2198
void destroyCompactFilteFactory(void* arg) {
36,121✔
2199
  if (arg == NULL) return;
36,121!
2200
}
2201
const char* compactFilteFactoryName(void* arg) {
47,611✔
2202
  SCompactFilteFactory* state = arg;
47,611✔
2203
  return "stream_compact_factory_filter_default";
47,611✔
2204
}
2205
const char* compactFilteFactoryNameSess(void* arg) {
4,357✔
2206
  SCompactFilteFactory* state = arg;
4,357✔
2207
  return "stream_compact_factory_filter_sess";
4,357✔
2208
}
2209
const char* compactFilteFactoryNameState(void* arg) {
11,891✔
2210
  SCompactFilteFactory* state = arg;
11,891✔
2211
  return "stream_compact_factory_filter_state";
11,891✔
2212
}
2213
const char* compactFilteFactoryNameFill(void* arg) {
916✔
2214
  SCompactFilteFactory* state = arg;
916✔
2215
  return "stream_compact_factory_filter_fill";
916✔
2216
}
2217
const char* compactFilteFactoryNameFunc(void* arg) {
9✔
2218
  SCompactFilteFactory* state = arg;
9✔
2219
  return "stream_compact_factory_filter_func";
9✔
2220
}
2221

2222
void          destroyCompactFilte(void* arg) { TAOS_UNUSED(arg); }
476✔
2223
unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
1,021✔
2224
                           char** newval, size_t* newvlen, unsigned char* value_changed) {
2225
  return streamStateValueIsStale((char*)val) ? 1 : 0;
1,021✔
2226
}
2227
const char* compactFilteName(void* arg) { return "stream_filte_default"; }
×
2228
const char* compactFilteNameSess(void* arg) { return "stream_filte_sess"; }
×
2229
const char* compactFilteNameState(void* arg) { return "stream_filte_state"; }
×
2230
const char* compactFilteNameFill(void* arg) { return "stream_filte_fill"; }
×
2231
const char* compactFilteNameFunc(void* arg) { return "stream_filte_func"; }
×
2232

2233
unsigned char compactFilteSess(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
839✔
2234
                               char** newval, size_t* newvlen, unsigned char* value_changed) {
2235
  // not impl yet
2236
  return 0;
839✔
2237
}
2238

2239
unsigned char compactFilteState(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
1,002,233✔
2240
                                char** newval, size_t* newvlen, unsigned char* value_changed) {
2241
  // not impl yet
2242
  return 0;
1,002,233✔
2243
}
2244

2245
unsigned char compactFilteFill(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
18✔
2246
                               char** newval, size_t* newvlen, unsigned char* value_changed) {
2247
  // not impl yet
2248
  return 0;
18✔
2249
}
2250

2251
unsigned char compactFilteFunc(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
×
2252
                               char** newval, size_t* newvlen, unsigned char* value_changed) {
2253
  // not impl yet
2254
  return 0;
×
2255
  // return streamStateValueIsStale((char*)val) ? 1 : 0;
2256
}
2257

2258
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx) {
286✔
2259
  SCompactFilteFactory*       state = arg;
286✔
2260
  rocksdb_compactionfilter_t* filter =
2261
      rocksdb_compactionfilter_create(state, destroyCompactFilte, compactFilte, compactFilteName);
286✔
2262
  return filter;
286✔
2263
}
2264
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterSess(void* arg, rocksdb_compactionfiltercontext_t* ctx) {
159✔
2265
  SCompactFilteFactory*       state = arg;
159✔
2266
  rocksdb_compactionfilter_t* filter =
2267
      rocksdb_compactionfilter_create(state, destroyCompactFilte, compactFilteSess, compactFilteNameSess);
159✔
2268
  return filter;
159✔
2269
}
2270
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterState(void* arg, rocksdb_compactionfiltercontext_t* ctx) {
25✔
2271
  SCompactFilteFactory*       state = arg;
25✔
2272
  rocksdb_compactionfilter_t* filter =
2273
      rocksdb_compactionfilter_create(state, destroyCompactFilte, compactFilteState, compactFilteNameState);
25✔
2274
  return filter;
25✔
2275
}
2276
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFill(void* arg, rocksdb_compactionfiltercontext_t* ctx) {
6✔
2277
  SCompactFilteFactory*       state = arg;
6✔
2278
  rocksdb_compactionfilter_t* filter =
2279
      rocksdb_compactionfilter_create(state, destroyCompactFilte, compactFilteFill, compactFilteNameFill);
6✔
2280
  return filter;
6✔
2281
}
2282
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFunc(void* arg, rocksdb_compactionfiltercontext_t* ctx) {
×
2283
  SCompactFilteFactory*       state = arg;
×
2284
  rocksdb_compactionfilter_t* filter =
2285
      rocksdb_compactionfilter_create(state, destroyCompactFilte, compactFilteFunc, compactFilteNameFunc);
×
2286
  return filter;
×
2287
}
2288

2289
int32_t taskDbOpenCfs(STaskDbWrapper* pTask, char* path, char** pCfNames, int32_t nCf) {
4,679✔
2290
  int32_t code = -1;
4,679✔
2291
  char*   err = NULL;
4,679✔
2292

2293
  rocksdb_options_t** cfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*));
4,679!
2294
  if (cfOpts == NULL) {
4,679!
2295
    return terrno;
×
2296
  }
2297
  rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
4,679!
2298
  if (cfHandle == NULL) {
4,679!
2299
    return terrno;
×
2300
  }
2301

2302
  for (int i = 0; i < nCf; i++) {
9,397✔
2303
    int32_t idx = getCfIdx(pCfNames[i]);
4,718✔
2304
    cfOpts[i] = pTask->pCfOpts[idx];
4,718✔
2305
  }
2306

2307
  rocksdb_t* db = rocksdb_open_column_families(pTask->dbOpt, path, nCf, (const char* const*)pCfNames,
4,679✔
2308
                                               (const rocksdb_options_t* const*)cfOpts, cfHandle, &err);
2309

2310
  if (err != NULL) {
4,679!
2311
    stError("failed to open cf path: %s", err);
×
2312
    taosMemoryFree(err);
×
2313
    goto _EXIT;
×
2314
  }
2315

2316
  for (int i = 0; i < nCf; i++) {
9,397✔
2317
    int32_t idx = getCfIdx(pCfNames[i]);
4,718✔
2318
    pTask->pCf[idx] = cfHandle[i];
4,718✔
2319
  }
2320

2321
  pTask->db = db;
4,679✔
2322
  code = 0;
4,679✔
2323

2324
_EXIT:
4,679✔
2325
  taosMemoryFree(cfOpts);
4,679!
2326
  taosMemoryFree(cfHandle);
4,679!
2327
  return code;
4,679✔
2328
}
2329

2330
void* taskDbAddRef(void* pTaskDb) {
2,467✔
2331
  STaskDbWrapper* pBackend = pTaskDb;
2,467✔
2332
  return taosAcquireRef(taskDbWrapperId, pBackend->refId);
2,467✔
2333
}
2334

2335
void taskDbRemoveRef(void* pTaskDb) {
6,981✔
2336
  if (pTaskDb == NULL) {
6,981!
2337
    return;
×
2338
  }
2339

2340
  STaskDbWrapper* pBackend = pTaskDb;
6,981✔
2341
  TAOS_UNUSED(taosReleaseRef(taskDbWrapperId, pBackend->refId));
6,981✔
2342
}
2343

2344
void taskDbSetClearFileFlag(void* pTaskDb) {
3,460✔
2345
  if (pTaskDb == NULL) {
3,460!
2346
    return;
×
2347
  }
2348

2349
  STaskDbWrapper* pBackend = pTaskDb;
3,460✔
2350
  atomic_store_8(&pBackend->removeAllFiles, 1);
3,460✔
2351
}
2352

2353
void taskDbInitOpt(STaskDbWrapper* pTaskDb) {
4,678✔
2354
  rocksdb_env_t* env = rocksdb_create_default_env();
4,678✔
2355

2356
  rocksdb_cache_t*   cache = rocksdb_cache_create_lru(256);
4,679✔
2357
  rocksdb_options_t* opts = rocksdb_options_create();
4,679✔
2358
  rocksdb_options_set_env(opts, env);
4,679✔
2359
  rocksdb_options_set_create_if_missing(opts, 1);
4,679✔
2360
  rocksdb_options_set_create_missing_column_families(opts, 1);
4,679✔
2361
  // rocksdb_options_set_max_total_wal_size(opts, dbMemLimit);
2362
  // rocksdb_options_set_ecycle_log_file_num(opts, 6);
2363
  rocksdb_options_set_max_write_buffer_number(opts, 3);
4,679✔
2364
  rocksdb_options_set_info_log_level(opts, 1);
4,678✔
2365
  rocksdb_options_set_db_write_buffer_size(opts, 256 << 20);
4,679✔
2366
  rocksdb_options_set_write_buffer_size(opts, 128 << 20);
4,679✔
2367
  rocksdb_options_set_atomic_flush(opts, 1);
4,679✔
2368

2369
  pTaskDb->dbOpt = opts;
4,679✔
2370
  pTaskDb->env = env;
4,679✔
2371
  pTaskDb->cache = cache;
4,679✔
2372
  pTaskDb->filterFactory = rocksdb_compactionfilterfactory_create(
4,679✔
2373
      NULL, destroyCompactFilteFactory, compactFilteFactoryCreateFilter, compactFilteFactoryName);
2374
  rocksdb_options_set_compaction_filter_factory(pTaskDb->dbOpt, pTaskDb->filterFactory);
4,679✔
2375
  pTaskDb->readOpt = rocksdb_readoptions_create();
4,679✔
2376
  pTaskDb->writeOpt = rocksdb_writeoptions_create();
4,677✔
2377
  rocksdb_writeoptions_disable_WAL(pTaskDb->writeOpt, 1);
4,677✔
2378

2379
  size_t nCf = sizeof(ginitDict) / sizeof(ginitDict[0]);
4,679✔
2380
  pTaskDb->pCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
4,679!
2381
  pTaskDb->pCfParams = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam));
4,679!
2382
  pTaskDb->pCfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*));
4,679!
2383
  pTaskDb->pCompares = taosMemoryCalloc(nCf, sizeof(rocksdb_comparator_t*));
4,679!
2384
  if (pTaskDb->pCf == NULL || pTaskDb->pCfParams == NULL || pTaskDb->pCfOpts == NULL || pTaskDb->pCompares == NULL) {
4,679!
UNCOV
2385
    stError("failed to alloc memory for cf");
×
2386
    taosMemoryFreeClear(pTaskDb->pCf);
×
2387
    taosMemoryFreeClear(pTaskDb->pCfParams);
×
2388
    taosMemoryFreeClear(pTaskDb->pCfOpts);
×
2389
    taosMemoryFreeClear(pTaskDb->pCompares);
×
2390
    return;
×
2391
  }
2392

2393
  for (int i = 0; i < nCf; i++) {
37,427✔
2394
    rocksdb_options_t*                   opt = rocksdb_options_create_copy(pTaskDb->dbOpt);
32,748✔
2395
    rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create();
32,753✔
2396
    rocksdb_block_based_options_set_block_cache(tableOpt, pTaskDb->cache);
32,752✔
2397
    rocksdb_block_based_options_set_partition_filters(tableOpt, 1);
32,749✔
2398

2399
    rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15);
32,750✔
2400
    rocksdb_block_based_options_set_filter_policy(tableOpt, filter);
32,753✔
2401

2402
    rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)opt, tableOpt);
32,752✔
2403

2404
    SCfInit* cfPara = &ginitDict[i];
32,753✔
2405

2406
    rocksdb_comparator_t* compare =
2407
        rocksdb_comparator_create(NULL, cfPara->destroyCmp, cfPara->cmpKey, cfPara->cmpName);
32,753✔
2408
    rocksdb_options_set_comparator((rocksdb_options_t*)opt, compare);
32,749✔
2409

2410
    rocksdb_compactionfilterfactory_t* filterFactory =
2411
        rocksdb_compactionfilterfactory_create(NULL, cfPara->destroyFilter, cfPara->createFilter, cfPara->funcName);
32,751✔
2412
    rocksdb_options_set_compaction_filter_factory(opt, filterFactory);
32,752✔
2413

2414
    pTaskDb->pCompares[i] = compare;
32,748✔
2415
    pTaskDb->pCfOpts[i] = opt;
32,748✔
2416
    pTaskDb->pCfParams[i].tableOpt = tableOpt;
32,748✔
2417
  }
2418
  return;
4,679✔
2419
}
2420
void taskDbInitChkpOpt(STaskDbWrapper* pTaskDb) {
4,679✔
2421
  pTaskDb->chkpId = -1;
4,679✔
2422
  pTaskDb->chkpCap = 4;
4,679✔
2423
  pTaskDb->chkpSaved = taosArrayInit(4, sizeof(int64_t));
4,679✔
2424
  TAOS_UNUSED(taskDbLoadChkpInfo(pTaskDb));
4,679✔
2425

2426
  pTaskDb->chkpInUse = taosArrayInit(4, sizeof(int64_t));
4,679✔
2427

2428
  TAOS_UNUSED(taosThreadRwlockInit(&pTaskDb->chkpDirLock, NULL));
4,679✔
2429
}
4,679✔
2430

2431
void taskDbRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) {
6✔
2432
  TAOS_UNUSED(taosThreadRwlockWrlock(&pTaskDb->chkpDirLock));
6✔
2433
  if (taosArrayPush(pTaskDb->chkpInUse, &chkp) == NULL) {
12!
2434
    stError("failed to push chkp: %" PRIi64 " into inuse", chkp);
×
2435
  }
2436
  taosArraySort(pTaskDb->chkpInUse, chkpIdComp);
6✔
2437
  TAOS_UNUSED(taosThreadRwlockUnlock(&pTaskDb->chkpDirLock));
6✔
2438
}
6✔
2439

UNCOV
2440
void taskDbUnRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) {
×
UNCOV
2441
  TAOS_UNUSED(taosThreadRwlockWrlock(&pTaskDb->chkpDirLock));
×
UNCOV
2442
  int32_t size = taosArrayGetSize(pTaskDb->chkpInUse);
×
UNCOV
2443
  for (int i = 0; i < size; i++) {
×
UNCOV
2444
    int64_t* p = taosArrayGet(pTaskDb->chkpInUse, i);
×
UNCOV
2445
    if (*p == chkp) {
×
UNCOV
2446
      taosArrayRemove(pTaskDb->chkpInUse, i);
×
UNCOV
2447
      break;
×
2448
    }
2449
  }
UNCOV
2450
  TAOS_UNUSED(taosThreadRwlockUnlock(&pTaskDb->chkpDirLock));
×
UNCOV
2451
}
×
2452

2453
void taskDbDestroyChkpOpt(STaskDbWrapper* pTaskDb) {
4,515✔
2454
  taosArrayDestroy(pTaskDb->chkpSaved);
4,515✔
2455
  taosArrayDestroy(pTaskDb->chkpInUse);
4,515✔
2456
  TAOS_UNUSED(taosThreadRwlockDestroy(&pTaskDb->chkpDirLock));
4,515✔
2457
}
4,515✔
2458

2459
int32_t taskDbBuildFullPath(char* path, char* key, char** dbFullPath, char** stateFullPath) {
×
2460
  int32_t code = 0;
×
2461
  int32_t cap = strlen(path) + 128, nBytes = 0;
×
2462
  char*   statePath = NULL;
×
2463
  char*   dbPath = NULL;
×
2464

2465
  statePath = taosMemoryCalloc(1, cap);
×
2466
  if (statePath == NULL) {
×
2467
    TAOS_CHECK_GOTO(terrno, NULL, _err);
×
2468
  }
2469

2470
  nBytes = snprintf(statePath, cap, "%s%s%s", path, TD_DIRSEP, key);
×
2471
  if (nBytes < 0 || nBytes >= cap) {
×
2472
    code = TSDB_CODE_OUT_OF_RANGE;
×
2473
    TAOS_CHECK_GOTO(code, NULL, _err);
×
2474
  }
2475

2476
  if (!taosDirExist(statePath)) {
×
2477
    code = taosMulMkDir(statePath);
×
2478
    TAOS_CHECK_GOTO(code, NULL, _err);
×
2479
  }
2480

2481
  dbPath = taosMemoryCalloc(1, cap);
×
2482
  if (dbPath == NULL) {
×
2483
    TAOS_CHECK_GOTO(terrno, NULL, _err);
×
2484
  }
2485
  nBytes = snprintf(dbPath, cap, "%s%s%s", statePath, TD_DIRSEP, "state");
×
2486
  if (nBytes < 0 || nBytes >= cap) {
×
2487
    code = TSDB_CODE_OUT_OF_RANGE;
×
2488
    TAOS_CHECK_GOTO(code, NULL, _err);
×
2489
  }
2490

2491
  if (!taosDirExist(dbPath)) {
×
2492
    code = taosMulMkDir(dbPath);
×
2493
    TAOS_CHECK_GOTO(code, NULL, _err);
×
2494
  }
2495

2496
  *dbFullPath = dbPath;
×
2497
  *stateFullPath = statePath;
×
2498
  return 0;
×
2499
_err:
×
2500
  stError("failed to create dir: %s, reason:%s", dbPath, tstrerror(code));
×
2501

2502
  taosMemoryFree(statePath);
×
2503
  taosMemoryFree(dbPath);
×
2504
  return code;
×
2505
}
2506

2507
void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId) {
×
2508
  STaskDbWrapper* p = pTaskDb;
×
2509
  TAOS_UNUSED(streamMutexLock(&p->mutex));
×
2510
  p->chkpId = chkpId;
×
2511
  TAOS_UNUSED(streamMutexUnlock(&p->mutex));
×
2512
}
×
2513

2514
STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) {
4,679✔
2515
  char*   err = NULL;
4,679✔
2516
  char**  cfNames = NULL;
4,679✔
2517
  size_t  nCf = 0;
4,679✔
2518
  int32_t code = 0;
4,679✔
2519
  int32_t lino = 0;
4,679✔
2520

2521
  STaskDbWrapper* pTaskDb = taosMemoryCalloc(1, sizeof(STaskDbWrapper));
4,679!
2522
  TSDB_CHECK_NULL(pTaskDb, code, lino, _EXIT, terrno);
4,679!
2523

2524
  pTaskDb->idstr = key ? taosStrdup(key) : NULL;
4,679!
2525
  pTaskDb->path = statePath ? taosStrdup(statePath) : NULL;
4,679!
2526

2527
  code = taosThreadMutexInit(&pTaskDb->mutex, NULL);
4,679✔
2528
  TSDB_CHECK_CODE(code, lino, _EXIT);
4,679!
2529

2530
  taskDbInitChkpOpt(pTaskDb);
4,679✔
2531
  taskDbInitOpt(pTaskDb);
4,679✔
2532

2533
  cfNames = rocksdb_list_column_families(pTaskDb->dbOpt, dbPath, &nCf, &err);
4,679✔
2534
  if (nCf == 0) {
4,679✔
2535
    stInfo("%s newly create db in state-backend", key);
4,649✔
2536
    // pre create db
2537
    pTaskDb->db = rocksdb_open(pTaskDb->pCfOpts[0], dbPath, &err);
4,650✔
2538
    if (pTaskDb->db == NULL) {
4,649!
2539
      stError("%s open state-backend failed, reason:%s", key, err);
×
2540
      code = TSDB_CODE_STREAM_INTERNAL_ERROR;
×
2541
      goto _EXIT;
×
2542
    }
2543

2544
    rocksdb_close(pTaskDb->db);
4,649✔
2545
    pTaskDb->db = NULL;
4,649✔
2546

2547
    if (cfNames != NULL) {
4,649!
2548
      rocksdb_list_column_families_destroy(cfNames, nCf);
4,649✔
2549
    }
2550

2551
    taosMemoryFree(err);
4,649!
2552
    err = NULL;
4,649✔
2553

2554
    cfNames = rocksdb_list_column_families(pTaskDb->dbOpt, dbPath, &nCf, &err);
4,649✔
2555
    if (err != NULL) {
4,649!
2556
      stError("%s failed to create column-family, %s, %" PRIzu ", reason:%s", key, dbPath, nCf, err);
×
2557
      code = TSDB_CODE_STREAM_INTERNAL_ERROR;
×
2558
      goto _EXIT;
×
2559
    }
2560
  }
2561

2562
  if ((code = taskDbOpenCfs(pTaskDb, dbPath, cfNames, nCf)) != 0) {
4,679!
2563
    goto _EXIT;
×
2564
  }
2565

2566
  if (cfNames != NULL) {
4,679!
2567
    rocksdb_list_column_families_destroy(cfNames, nCf);
4,679✔
2568
    cfNames = NULL;
4,679✔
2569
  }
2570

2571
  stDebug("init s-task backend in:%s, backend:%p, %s", dbPath, pTaskDb, key);
4,679✔
2572
  return pTaskDb;
4,679✔
2573

2574
_EXIT:
×
2575
  stError("%s taskDb open failed, %s at line:%d code:%s", key, __func__, lino, tstrerror(code));
×
2576

2577
  taskDbDestroy(pTaskDb, false);
×
2578
  if (err) taosMemoryFree(err);
×
2579
  if (cfNames) rocksdb_list_column_families_destroy(cfNames, nCf);
×
2580
  return NULL;
×
2581
}
2582

2583
int32_t taskDbOpen(const char* path, const char* key, int64_t chkptId, int64_t* processVer, STaskDbWrapper** ppTaskDb) {
4,677✔
2584
  char* statePath = NULL;
4,677✔
2585
  char* dbPath = NULL;
4,677✔
2586
  int   code = 0;
4,677✔
2587
  if ((code = restoreCheckpointData(path, key, chkptId, &statePath, &dbPath, processVer)) < 0) {
4,677!
2588
    stError("failed to restore checkpoint data, path:%s, key:%s, checkpointId: %" PRId64 "reason:%s", path, key,
×
2589
            chkptId, tstrerror(code));
2590
    return code;
×
2591
  }
2592

2593
  STaskDbWrapper* pTaskDb = taskDbOpenImpl(key, statePath, dbPath);
4,679✔
2594
  if (pTaskDb != NULL) {
4,679!
2595
    int64_t chkpId = -1, ver = -1;
4,679✔
2596
    if ((code = chkpLoadExtraInfo(dbPath, &chkpId, &ver)) == 0) {
4,679!
2597
      *processVer = ver;
4,679✔
2598
    } else {
2599
      stError("failed to load extra info, path:%s, key:%s, checkpointId: %" PRId64 "reason:%s", path, key, chkptId,
×
2600
              tstrerror(code));
2601
      taskDbDestroy(pTaskDb, false);
×
2602
      return code;
×
2603
    }
2604
  } else {
2605
    code = TSDB_CODE_INVALID_PARA;
×
2606
  }
2607

2608
  taosMemoryFree(dbPath);
4,679!
2609
  taosMemoryFree(statePath);
4,679!
2610
  *ppTaskDb = pTaskDb;
4,679✔
2611
  return code;
4,679✔
2612
}
2613

2614
void taskDbDestroy(void* pDb, bool flush) {
4,514✔
2615
  STaskDbWrapper* wrapper = pDb;
4,514✔
2616
  if (wrapper == NULL) return;
4,514!
2617

2618
  streamMetaRemoveDB(wrapper->pMeta, wrapper->idstr);
4,514✔
2619

2620
  stDebug("succ to destroy stream backend:%p", wrapper);
4,515✔
2621

2622
  int8_t nCf = tListLen(ginitDict);
4,515✔
2623
  if (flush && wrapper->removeAllFiles == 0) {
4,515!
2624
    if (wrapper->db && wrapper->pCf) {
1,059!
2625
      rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
1,059✔
2626
      rocksdb_flushoptions_set_wait(flushOpt, 1);
1,059✔
2627

2628
      char*                            err = NULL;
1,059✔
2629
      rocksdb_column_family_handle_t** cfs = taosMemoryCalloc(1, sizeof(rocksdb_column_family_handle_t*) * nCf);
1,059!
2630
      int                              numOfFlushCf = 0;
1,059✔
2631
      for (int i = 0; i < nCf; i++) {
8,472✔
2632
        if (wrapper->pCf[i] != NULL) {
7,413✔
2633
          cfs[numOfFlushCf++] = wrapper->pCf[i];
2,690✔
2634
        }
2635
      }
2636
      if (numOfFlushCf != 0) {
1,059!
2637
        rocksdb_flush_cfs(wrapper->db, flushOpt, cfs, numOfFlushCf, &err);
1,059✔
2638
        if (err != NULL) {
1,059!
2639
          stError("failed to flush all cfs, reason:%s", err);
×
2640
          taosMemoryFreeClear(err);
×
2641
        }
2642
      }
2643
      taosMemoryFree(cfs);
1,059!
2644
      rocksdb_flushoptions_destroy(flushOpt);
1,059✔
2645
    }
2646
  }
2647

2648
  if (wrapper->pCf != NULL) {
4,515!
2649
    for (int i = 0; i < nCf; i++) {
36,117✔
2650
      if (wrapper->pCf[i] != NULL) {
31,602✔
2651
        rocksdb_column_family_handle_destroy(wrapper->pCf[i]);
10,568✔
2652
      }
2653
    }
2654
  }
2655

2656
  if (wrapper->db) {
4,515!
2657
    rocksdb_close(wrapper->db);
4,515✔
2658
    wrapper->db = NULL;
4,515✔
2659
  }
2660

2661
  rocksdb_options_destroy(wrapper->dbOpt);
4,515✔
2662
  rocksdb_readoptions_destroy(wrapper->readOpt);
4,515✔
2663
  rocksdb_writeoptions_destroy(wrapper->writeOpt);
4,515✔
2664
  rocksdb_env_destroy(wrapper->env);
4,515✔
2665
  rocksdb_cache_destroy(wrapper->cache);
4,515✔
2666

2667
  taosMemoryFree(wrapper->pCf);
4,515!
2668
  for (int i = 0; i < nCf; i++) {
36,118✔
2669
    rocksdb_options_t*                   opt = wrapper->pCfOpts[i];
31,603✔
2670
    rocksdb_comparator_t*                compare = wrapper->pCompares[i];
31,603✔
2671
    rocksdb_block_based_table_options_t* tblOpt = wrapper->pCfParams[i].tableOpt;
31,603✔
2672

2673
    rocksdb_options_destroy(opt);
31,603✔
2674
    rocksdb_comparator_destroy(compare);
31,605✔
2675
    rocksdb_block_based_options_destroy(tblOpt);
31,605✔
2676
  }
2677
  taosMemoryFree(wrapper->pCompares);
4,515!
2678
  taosMemoryFree(wrapper->pCfOpts);
4,515!
2679
  taosMemoryFree(wrapper->pCfParams);
4,515!
2680

2681
  streamMutexDestroy(&wrapper->mutex);
4,515✔
2682

2683
  taskDbDestroyChkpOpt(wrapper);
4,515✔
2684

2685
  taosMemoryFree(wrapper->idstr);
4,515!
2686

2687
  if (wrapper->removeAllFiles) {
4,515✔
2688
    char* err = NULL;
3,456✔
2689
    stInfo("drop task remove backend dat:%s", wrapper->path);
3,456!
2690
    taosRemoveDir(wrapper->path);
3,456✔
2691
  }
2692
  taosMemoryFree(wrapper->path);
4,515!
2693
  taosMemoryFree(wrapper);
4,514!
2694

2695
  return;
4,515✔
2696
}
2697

2698
void taskDbDestroy2(void* pDb) { taskDbDestroy(pDb, true); }
4,514✔
2699

2700
int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char** path) {
×
2701
  int32_t code = 0;
×
2702
  int64_t refId = pDb->refId;
×
2703
  int32_t nBytes = 0;
×
2704

2705
  if (taosAcquireRef(taskDbWrapperId, refId) == NULL) {
×
2706
    code = terrno;
×
2707
    return code;
×
2708
  }
2709

2710
  int32_t cap = strlen(pDb->path) + 128;
×
2711

2712
  char* buf = taosMemoryCalloc(1, cap);
×
2713
  if (buf == NULL) {
×
2714
    TAOS_UNUSED(taosReleaseRef(taskDbWrapperId, refId));
×
2715
    return terrno;
×
2716
  }
2717

2718
  nBytes =
2719
      snprintf(buf, cap, "%s%s%s%s%s%" PRId64 "", pDb->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkpId);
×
2720
  if (nBytes <= 0 || nBytes >= cap) {
×
2721
    taosMemoryFree(buf);
×
2722
    TAOS_UNUSED(taosReleaseRef(taskDbWrapperId, refId));
×
2723
    return TSDB_CODE_OUT_OF_RANGE;
×
2724
  }
2725

2726
  if (taosIsDir(buf)) {
×
2727
    code = 0;
×
2728
    *path = buf;
×
2729
  } else {
2730
    taosMemoryFree(buf);
×
2731
  }
2732

2733
  TAOS_UNUSED(taosReleaseRef(taskDbWrapperId, refId));
×
2734
  return code;
×
2735
}
2736

2737
int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64_t chkpId, char** path, SArray* list,
×
2738
                                    const char* idStr) {
2739
  int32_t  code = 0;
×
2740
  int32_t  cap = strlen(pDb->path) + 32;
×
2741
  SBkdMgt* p = (SBkdMgt*)bkdChkpMgt;
×
2742

2743
  char* temp = taosMemoryCalloc(1, cap);
×
2744
  if (temp == NULL) {
×
2745
    return terrno;
×
2746
  }
2747

2748
  int32_t nBytes = snprintf(temp, cap, "%s%s%s%" PRId64, pDb->path, TD_DIRSEP, "tmp", chkpId);
×
2749
  if (nBytes <= 0 || nBytes >= cap) {
×
2750
    taosMemoryFree(temp);
×
2751
    return TSDB_CODE_OUT_OF_RANGE;
×
2752
  }
2753

2754
  if (taosDirExist(temp)) {
×
2755
    cleanDir(temp, idStr);
×
2756
  } else {
2757
    code = taosMkDir(temp);
×
2758
    if (code != 0) {
×
2759
      taosMemoryFree(temp);
×
2760
      return TAOS_SYSTEM_ERROR(errno);
×
2761
    }
2762
  }
2763

2764
  code = bkdMgtGetDelta(p, pDb->idstr, chkpId, list, temp);
×
2765
  *path = temp;
×
2766

2767
  return code;
×
2768
}
2769

2770
int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list,
×
2771
                                const char* idStr) {
2772
  int32_t                 code = -1;
×
2773
  STaskDbWrapper*         pDb = arg;
×
2774
  ECHECKPOINT_BACKUP_TYPE utype = type;
×
2775

2776
  taskDbRefChkp(pDb, chkpId);
×
2777
  if (utype == DATA_UPLOAD_RSYNC) {
×
2778
    code = taskDbGenChkpUploadData__rsync(pDb, chkpId, path);
×
2779
  } else if (utype == DATA_UPLOAD_S3) {
×
2780
    code = taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path, list, idStr);
×
2781
  }
2782
  taskDbUnRefChkp(pDb, chkpId);
×
2783
  return code;
×
2784
}
2785

2786
int32_t taskDbOpenCfByKey(STaskDbWrapper* pDb, const char* key) {
×
2787
  int32_t code = 0;
×
2788
  char*   err = NULL;
×
2789
  int8_t  idx = getCfIdx(key);
×
2790

2791
  if (idx == -1) return -1;
×
2792

2793
  if (pDb->pCf[idx] != NULL) return code;
×
2794

2795
  rocksdb_column_family_handle_t* cf =
2796
      rocksdb_create_column_family(pDb->db, pDb->pCfOpts[idx], ginitDict[idx].key, &err);
×
2797
  if (err != NULL) {
×
2798
    stError("failed to open cf, key:%s, reason: %s", key, err);
×
2799
    taosMemoryFree(err);
×
2800
    code = -1;
×
2801
    return code;
×
2802
  }
2803

2804
  pDb->pCf[idx] = cf;
×
2805
  return code;
×
2806
}
2807
int32_t copyDataAt(RocksdbCfInst* pSrc, STaskDbWrapper* pDst, int8_t i) {
×
2808
  int32_t WRITE_BATCH = 1024;
×
2809
  char*   err = NULL;
×
2810
  int     code = 0;
×
2811

2812
  rocksdb_readoptions_t* pRdOpt = rocksdb_readoptions_create();
×
2813

2814
  rocksdb_writebatch_t* wb = rocksdb_writebatch_create();
×
2815
  rocksdb_iterator_t*   pIter = rocksdb_create_iterator_cf(pSrc->db, pRdOpt, pSrc->pHandle[i]);
×
2816
  rocksdb_iter_seek_to_first(pIter);
×
2817
  while (rocksdb_iter_valid(pIter)) {
×
2818
    if (rocksdb_writebatch_count(wb) >= WRITE_BATCH) {
×
2819
      rocksdb_write(pDst->db, pDst->writeOpt, wb, &err);
×
2820
      if (err != NULL) {
×
2821
        code = -1;
×
2822
        goto _EXIT;
×
2823
      }
2824
      rocksdb_writebatch_clear(wb);
×
2825
    }
2826

2827
    size_t klen = 0, vlen = 0;
×
2828
    char*  key = (char*)rocksdb_iter_key(pIter, &klen);
×
2829
    char*  val = (char*)rocksdb_iter_value(pIter, &vlen);
×
2830

2831
    rocksdb_writebatch_put_cf(wb, pDst->pCf[i], key, klen, val, vlen);
×
2832
    rocksdb_iter_next(pIter);
×
2833
  }
2834

2835
  if (rocksdb_writebatch_count(wb) > 0) {
×
2836
    rocksdb_write(pDst->db, pDst->writeOpt, wb, &err);
×
2837
    if (err != NULL) {
×
2838
      code = -1;
×
2839
      goto _EXIT;
×
2840
    }
2841
  }
2842

2843
_EXIT:
×
2844
  rocksdb_writebatch_destroy(wb);
×
2845
  rocksdb_iter_destroy(pIter);
×
2846
  rocksdb_readoptions_destroy(pRdOpt);
×
2847
  taosMemoryFree(err);
×
2848

2849
  return code;
×
2850
}
2851

2852
int32_t streamStateCvtDataFormat(char* path, char* key, void* pCfInst) {
×
2853
  int nCf = sizeof(ginitDict) / sizeof(ginitDict[0]);
×
2854

2855
  int32_t code = 0;
×
2856

2857
  int64_t         processVer = -1;
×
2858
  STaskDbWrapper* pTaskDb = NULL;
×
2859

2860
  code = taskDbOpen(path, key, 0, &processVer, &pTaskDb);
×
2861
  RocksdbCfInst* pSrcBackend = pCfInst;
×
2862

2863
  for (int i = 0; i < nCf; i++) {
×
2864
    rocksdb_column_family_handle_t* pSrcCf = pSrcBackend->pHandle[i];
×
2865
    if (pSrcCf == NULL) continue;
×
2866

2867
    code = taskDbOpenCfByKey(pTaskDb, ginitDict[i].key);
×
2868
    if (code != 0) goto _EXIT;
×
2869

2870
    code = copyDataAt(pSrcBackend, pTaskDb, i);
×
2871
    if (code != 0) goto _EXIT;
×
2872
  }
2873

2874
_EXIT:
×
2875
  taskDbDestroy(pTaskDb, true);
×
2876

2877
  return code;
×
2878
}
2879
int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf) {
×
2880
  SBackendWrapper* handle = backend;
×
2881
  char*            err = NULL;
×
2882
  int64_t          streamId;
2883
  int32_t          taskId, dummy = 0;
×
2884
  char             suffix[64] = {0};
×
2885
  int32_t          code = 0;
×
2886

2887
  rocksdb_options_t**              cfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*));
×
2888
  RocksdbCfParam*                  params = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam));
×
2889
  rocksdb_comparator_t**           pCompare = taosMemoryCalloc(nCf, sizeof(rocksdb_comparator_t*));
×
2890
  rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
×
2891

2892
  for (int i = 0; i < nCf; i++) {
×
2893
    char* cf = cfs[i];
×
2894
    char  funcname[64] = {0};
×
2895

2896
    cfOpts[i] = rocksdb_options_create_copy(handle->dbOpt);
×
2897
    if (i == 0) continue;
×
2898
    if (3 == sscanf(cf, "0x%" PRIx64 "-%d_%s", &streamId, &taskId, funcname)) {
×
2899
      rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create();
×
2900
      rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache);
×
2901
      rocksdb_block_based_options_set_partition_filters(tableOpt, 1);
×
2902

2903
      rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15);
×
2904
      rocksdb_block_based_options_set_filter_policy(tableOpt, filter);
×
2905

2906
      rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpts[i], tableOpt);
×
2907
      params[i].tableOpt = tableOpt;
×
2908

2909
      int idx = streamStateGetCfIdx(NULL, funcname);
×
2910
      if (idx < 0 || idx >= sizeof(ginitDict) / sizeof(ginitDict[0])) {
×
2911
        stError("failed to open cf");
×
2912
        return -1;
×
2913
      }
2914
      SCfInit* cfPara = &ginitDict[idx];
×
2915

2916
      rocksdb_comparator_t* compare =
2917
          rocksdb_comparator_create(NULL, cfPara->destroyCmp, cfPara->cmpKey, cfPara->cmpName);
×
2918
      rocksdb_options_set_comparator((rocksdb_options_t*)cfOpts[i], compare);
×
2919
      pCompare[i] = compare;
×
2920
    }
2921
  }
2922
  rocksdb_t* db = rocksdb_open_column_families(handle->dbOpt, name, nCf, (const char* const*)cfs,
×
2923
                                               (const rocksdb_options_t* const*)cfOpts, cfHandle, &err);
2924
  if (err != NULL) {
×
2925
    stError("failed to open rocksdb cf, reason:%s", err);
×
2926
    taosMemoryFree(err);
×
2927
    taosMemoryFree(cfHandle);
×
2928
    taosMemoryFree(pCompare);
×
2929
    taosMemoryFree(params);
×
2930
    taosMemoryFree(cfOpts);
×
2931
    // fix other leak
2932
    return TSDB_CODE_THIRDPARTY_ERROR;
×
2933
  } else {
2934
    stDebug("succ to open rocksdb cf");
×
2935
  }
2936
  // close default cf
2937
  if (((rocksdb_column_family_handle_t**)cfHandle)[0] != 0) {
×
2938
    rocksdb_column_family_handle_destroy(cfHandle[0]);
×
2939
    cfHandle[0] = NULL;
×
2940
  }
2941
  rocksdb_options_destroy(cfOpts[0]);
×
2942

2943
  handle->db = db;
×
2944

2945
  static int32_t cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
2946
  for (int i = 0; i < nCf; i++) {
×
2947
    char* cf = cfs[i];
×
2948
    if (i == 0) continue;  // skip default column family, not set opt
×
2949

2950
    char funcname[64] = {0};
×
2951
    if (3 == sscanf(cf, "0x%" PRIx64 "-%d_%s", &streamId, &taskId, funcname)) {
×
2952
      char    idstr[128] = {0};
×
2953
      int32_t nBytes = snprintf(idstr, sizeof(idstr), "0x%" PRIx64 "-%d", streamId, taskId);
×
2954
      if (nBytes <= 0 || nBytes >= sizeof(idstr)) {
×
2955
        code = TSDB_CODE_OUT_OF_RANGE;
×
2956
        stError("failed to open cf since %s", tstrerror(code));
×
2957
        return code;
×
2958
      }
2959

2960
      int idx = streamStateGetCfIdx(NULL, funcname);
×
2961

2962
      RocksdbCfInst*  inst = NULL;
×
2963
      RocksdbCfInst** pInst = taosHashGet(handle->cfInst, idstr, strlen(idstr) + 1);
×
2964
      if (pInst == NULL || *pInst == NULL) {
×
2965
        inst = taosMemoryCalloc(1, sizeof(RocksdbCfInst));
×
2966
        inst->pHandle = taosMemoryCalloc(cfLen, sizeof(rocksdb_column_family_handle_t*));
×
2967
        inst->cfOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*));
×
2968
        inst->wOpt = rocksdb_writeoptions_create();
×
2969
        inst->rOpt = rocksdb_readoptions_create();
×
2970
        inst->param = taosMemoryCalloc(cfLen, sizeof(RocksdbCfParam));
×
2971
        inst->pBackend = handle;
×
2972
        inst->db = db;
×
2973
        inst->pCompares = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*));
×
2974

2975
        inst->dbOpt = handle->dbOpt;
×
2976
        rocksdb_writeoptions_disable_WAL(inst->wOpt, 1);
×
2977
        TAOS_UNUSED(taosHashPut(handle->cfInst, idstr, strlen(idstr) + 1, &inst, sizeof(void*)));
×
2978
      } else {
2979
        inst = *pInst;
×
2980
      }
2981
      inst->cfOpt[idx] = cfOpts[i];
×
2982
      inst->pCompares[idx] = pCompare[i];
×
2983
      memcpy(&(inst->param[idx]), &(params[i]), sizeof(RocksdbCfParam));
×
2984
      inst->pHandle[idx] = cfHandle[i];
×
2985
    }
2986
  }
2987
  void* pIter = taosHashIterate(handle->cfInst, NULL);
×
2988
  while (pIter) {
×
2989
    RocksdbCfInst* inst = *(RocksdbCfInst**)pIter;
×
2990

2991
    for (int i = 0; i < cfLen; i++) {
×
2992
      if (inst->cfOpt[i] == NULL) {
×
2993
        rocksdb_options_t*                   opt = rocksdb_options_create_copy(handle->dbOpt);
×
2994
        rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create();
×
2995
        rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache);
×
2996
        rocksdb_block_based_options_set_partition_filters(tableOpt, 1);
×
2997

2998
        rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15);
×
2999
        rocksdb_block_based_options_set_filter_policy(tableOpt, filter);
×
3000

3001
        rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)opt, tableOpt);
×
3002

3003
        SCfInit* cfPara = &ginitDict[i];
×
3004

3005
        rocksdb_comparator_t* compare =
3006
            rocksdb_comparator_create(NULL, cfPara->destroyCmp, cfPara->cmpKey, cfPara->cmpName);
×
3007
        rocksdb_options_set_comparator((rocksdb_options_t*)opt, compare);
×
3008

3009
        inst->pCompares[i] = compare;
×
3010
        inst->cfOpt[i] = opt;
×
3011
        inst->param[i].tableOpt = tableOpt;
×
3012
      }
3013
    }
3014
    SCfComparator compare = {.comp = inst->pCompares, .numOfComp = cfLen};
×
3015
    inst->pCompareNode = streamBackendAddCompare(handle, &compare);
×
3016
    pIter = taosHashIterate(handle->cfInst, pIter);
×
3017
  }
3018

3019
  taosMemoryFree(cfHandle);
×
3020
  taosMemoryFree(pCompare);
×
3021
  taosMemoryFree(params);
×
3022
  taosMemoryFree(cfOpts);
×
3023
  return 0;
×
3024
}
3025

3026
void streamStateDestroyCompar(void* arg) {
×
3027
  SCfComparator* comp = (SCfComparator*)arg;
×
3028
  for (int i = 0; i < comp->numOfComp; i++) {
×
3029
    if (comp->comp[i]) rocksdb_comparator_destroy(comp->comp[i]);
×
3030
  }
3031
  taosMemoryFree(comp->comp);
×
3032
}
×
3033

3034
int streamStateGetCfIdx(SStreamState* pState, const char* funcName) {
800,153✔
3035
  int    idx = -1;
800,153✔
3036
  size_t len = strlen(funcName);
800,153✔
3037
  for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
3,907,806!
3038
    if (len == ginitDict[i].len && strncmp(funcName, ginitDict[i].key, strlen(funcName)) == 0) {
3,908,702✔
3039
      idx = i;
801,049✔
3040
      break;
801,049✔
3041
    }
3042
  }
3043
  if (pState != NULL && idx != -1) {
800,153!
3044
    STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
801,037✔
3045
    if (wrapper == NULL) {
801,037!
3046
      return -1;
×
3047
    }
3048

3049
    streamMutexLock(&wrapper->mutex);
801,037✔
3050

3051
    rocksdb_column_family_handle_t* cf = wrapper->pCf[idx];
801,098✔
3052
    if (cf == NULL) {
801,098✔
3053
      char* err = NULL;
6,260✔
3054
      cf = rocksdb_create_column_family(wrapper->db, wrapper->pCfOpts[idx], ginitDict[idx].key, &err);
6,260✔
3055
      if (err != NULL) {
6,262!
3056
        idx = -1;
×
3057
        stError("failed to open cf, %p %s_%s, reason:%s", pState, wrapper->idstr, funcName, err);
×
3058
        rocksdb_column_family_handle_destroy(cf);
×
3059
        taosMemoryFree(err);
×
3060
      } else {
3061
        stDebug("succ to open cf, %p %s_%s", pState, wrapper->idstr, funcName);
6,262✔
3062
        wrapper->pCf[idx] = cf;
6,262✔
3063
      }
3064
    }
3065
    streamMutexUnlock(&wrapper->mutex);
801,100✔
3066
  }
3067

3068
  return idx;
801,096✔
3069
}
3070
bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len) {
16,163✔
3071
  rocksdb_iter_seek(iter, buf, len);
16,163✔
3072
  if (!rocksdb_iter_valid(iter)) {
16,165✔
3073
    rocksdb_iter_seek_for_prev(iter, buf, len);
9,743✔
3074
    if (!rocksdb_iter_valid(iter)) {
9,743✔
3075
      return false;
8,528✔
3076
    }
3077
  }
3078
  return true;
7,637✔
3079
}
3080
rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKeyName, rocksdb_snapshot_t** snapshot,
21,883✔
3081
                                          rocksdb_readoptions_t** readOpt) {
3082
  int idx = streamStateGetCfIdx(pState, cfKeyName);
21,883✔
3083

3084
  *readOpt = rocksdb_readoptions_create();
21,886✔
3085

3086
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
21,881✔
3087
  if (snapshot != NULL) {
21,881!
3088
    *snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(wrapper->db);
21,882✔
3089
    rocksdb_readoptions_set_snapshot(*readOpt, *snapshot);
21,887✔
3090
    rocksdb_readoptions_set_fill_cache(*readOpt, 0);
21,887✔
3091
  }
3092

3093
  return rocksdb_create_iterator_cf(wrapper->db, *readOpt, ((rocksdb_column_family_handle_t**)wrapper->pCf)[idx]);
21,886✔
3094
}
3095

3096
#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen)                                              \
3097
  do {                                                                                                            \
3098
    code = 0;                                                                                                     \
3099
    char  buf[128] = {0};                                                                                         \
3100
    char* err = NULL;                                                                                             \
3101
    int   i = streamStateGetCfIdx(pState, funcname);                                                              \
3102
    if (i < 0) {                                                                                                  \
3103
      stWarn("streamState failed to get cf name: %s", funcname);                                                  \
3104
      code = TSDB_CODE_THIRDPARTY_ERROR;                                                                          \
3105
      break;                                                                                                      \
3106
    }                                                                                                             \
3107
    STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;                                                \
3108
    TAOS_UNUSED(atomic_add_fetch_64(&wrapper->dataWritten, 1));                                                   \
3109
    char toString[128] = {0};                                                                                     \
3110
    if (stDebugFlag & DEBUG_TRACE) TAOS_UNUSED((ginitDict[i].toStrFunc((void*)key, toString)));                   \
3111
    int32_t                         klen = ginitDict[i].enFunc((void*)key, buf);                                  \
3112
    rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \
3113
    rocksdb_writeoptions_t*         opts = wrapper->writeOpt;                                                     \
3114
    rocksdb_t*                      db = wrapper->db;                                                             \
3115
    char*                           ttlV = NULL;                                                                  \
3116
    int32_t                         ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV);             \
3117
    rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err);          \
3118
    if (err != NULL) {                                                                                            \
3119
      stError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err);                     \
3120
      taosMemoryFree(err);                                                                                        \
3121
      code = TSDB_CODE_THIRDPARTY_ERROR;                                                                          \
3122
    } else {                                                                                                      \
3123
      stTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d, %p", toString, funcname, vLen, \
3124
              ttlVLen, wrapper);                                                                                  \
3125
    }                                                                                                             \
3126
    taosMemoryFree(ttlV);                                                                                         \
3127
  } while (0);
3128

3129
#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen)                                                   \
3130
  do {                                                                                                                \
3131
    code = 0;                                                                                                         \
3132
    char  buf[128] = {0};                                                                                             \
3133
    char* err = NULL;                                                                                                 \
3134
    int   i = streamStateGetCfIdx(pState, funcname);                                                                  \
3135
    if (i < 0) {                                                                                                      \
3136
      stWarn("streamState failed to get cf name: %s", funcname);                                                      \
3137
      code = -1;                                                                                                      \
3138
      break;                                                                                                          \
3139
    }                                                                                                                 \
3140
    STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;                                                    \
3141
    char            toString[128] = {0};                                                                              \
3142
    if (stDebugFlag & DEBUG_TRACE) TAOS_UNUSED((ginitDict[i].toStrFunc((void*)key, toString)));                       \
3143
    int32_t                         klen = ginitDict[i].enFunc((void*)key, buf);                                      \
3144
    rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx];     \
3145
    rocksdb_t*                      db = wrapper->db;                                                                 \
3146
    rocksdb_readoptions_t*          opts = wrapper->readOpt;                                                          \
3147
    size_t                          len = 0;                                                                          \
3148
    char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err);                       \
3149
    if (val == NULL || len == 0) {                                                                                    \
3150
      if (err == NULL) {                                                                                              \
3151
        stTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, wrapper->idstr, funcname); \
3152
      } else {                                                                                                        \
3153
        stError("streamState str: %s failed to read from %s_%s, err: %s", toString, wrapper->idstr, funcname, err);   \
3154
        taosMemoryFreeClear(err);                                                                                     \
3155
      }                                                                                                               \
3156
      code = -1;                                                                                                      \
3157
    } else {                                                                                                          \
3158
      char*   p = NULL;                                                                                               \
3159
      int32_t tlen = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal);                                          \
3160
      if (tlen <= 0) {                                                                                                \
3161
        stError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, wrapper->idstr,         \
3162
                funcname);                                                                                            \
3163
        code = -1;                                                                                                    \
3164
      } else {                                                                                                        \
3165
        stTrace("streamState str: %s succ to read from %s_%s, valLen:%d, %p", toString, wrapper->idstr, funcname,     \
3166
                tlen, wrapper);                                                                                       \
3167
      }                                                                                                               \
3168
      taosMemoryFree(val);                                                                                            \
3169
      if (vLen != NULL) *vLen = tlen;                                                                                 \
3170
    }                                                                                                                 \
3171
  } while (0);
3172

3173
#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key)                                                           \
3174
  do {                                                                                                            \
3175
    code = 0;                                                                                                     \
3176
    char  buf[128] = {0};                                                                                         \
3177
    char* err = NULL;                                                                                             \
3178
    int   i = streamStateGetCfIdx(pState, funcname);                                                              \
3179
    if (i < 0) {                                                                                                  \
3180
      stWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname);                     \
3181
      code = TSDB_CODE_THIRDPARTY_ERROR;                                                                          \
3182
      break;                                                                                                      \
3183
    }                                                                                                             \
3184
    STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;                                                \
3185
    TAOS_UNUSED(atomic_add_fetch_64(&wrapper->dataWritten, 1));                                                   \
3186
    char toString[128] = {0};                                                                                     \
3187
    if (stDebugFlag & DEBUG_TRACE) TAOS_UNUSED(ginitDict[i].toStrFunc((void*)key, toString));                     \
3188
    int32_t                         klen = ginitDict[i].enFunc((void*)key, buf);                                  \
3189
    rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \
3190
    rocksdb_t*                      db = wrapper->db;                                                             \
3191
    rocksdb_writeoptions_t*         opts = wrapper->writeOpt;                                                     \
3192
    rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, klen, &err);                                           \
3193
    if (err != NULL) {                                                                                            \
3194
      stError("streamState str: %s failed to del from %s_%s, err: %s", toString, wrapper->idstr, funcname, err);  \
3195
      taosMemoryFree(err);                                                                                        \
3196
      code = TSDB_CODE_THIRDPARTY_ERROR;                                                                          \
3197
    } else {                                                                                                      \
3198
      stTrace("streamState str: %s succ to del from %s_%s", toString, wrapper->idstr, funcname);                  \
3199
    }                                                                                                             \
3200
  } while (0);
3201

3202
// state cf
3203
int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
201✔
3204
  int code = 0;
201✔
3205

3206
  SStateKey sKey = {.key = *key, .opNum = pState->number};
201✔
3207
  char*     dst = NULL;
201✔
3208
  size_t    size = 0;
201✔
3209
  if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) {
201!
3210
    STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)value, (int32_t)vLen);
201!
3211
  } else {
3212
    code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size);
×
3213
    if (code != 0) {
×
3214
      return code;
×
3215
    }
3216
    STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)dst, (int32_t)size);
×
3217
    taosMemoryFree(dst);
×
3218
  }
3219
  return code;
201✔
3220
}
3221
int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
154,928✔
3222
  int       code = 0;
154,928✔
3223
  SStateKey sKey = {.key = *key, .opNum = pState->number};
154,928✔
3224

3225
  char*  tVal = NULL;
154,928✔
3226
  size_t tValLen = 0;
154,928✔
3227
  STREAM_STATE_GET_ROCKSDB(pState, "state", &sKey, &tVal, &tValLen);
155,169!
3228
  if (code != 0) {
154,928✔
3229
    taosMemoryFree(tVal);
154,687!
3230
    return code;
154,687✔
3231
  }
3232
  if (pState->pResultRowStore.resultRowGet == NULL || pState->pExprSupp == NULL) {
241!
3233
    *pVal = tVal;
100✔
3234
    *pVLen = tValLen;
100✔
3235
    return code;
100✔
3236
  }
3237
  size_t pValLen = 0;
141✔
3238
  code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, &pValLen);
141✔
3239
  *pVLen = (int32_t)pValLen;
141✔
3240
  taosMemoryFree(tVal);
141!
3241
  return code;
141✔
3242
}
3243
int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key) {
3,657✔
3244
  int       code = 0;
3,657✔
3245
  SStateKey sKey = {.key = *key, .opNum = pState->number};
3,657✔
3246
  STREAM_STATE_DEL_ROCKSDB(pState, "state", &sKey);
3,657!
3247
  return code;
3,657✔
3248
}
3249
int32_t streamStateClear_rocksdb(SStreamState* pState) {
1✔
3250
  stDebug("streamStateClear_rocksdb");
1!
3251

3252
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
1✔
3253
  TAOS_UNUSED(atomic_add_fetch_64(&wrapper->dataWritten, 1));
1✔
3254

3255
  char      sKeyStr[128] = {0};
1✔
3256
  char      eKeyStr[128] = {0};
1✔
3257
  SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number};
1✔
3258
  SStateKey eKey = {.key = {.ts = INT64_MAX, .groupId = UINT64_MAX}, .opNum = pState->number};
1✔
3259

3260
  int sLen = stateKeyEncode(&sKey, sKeyStr);
1✔
3261
  int eLen = stateKeyEncode(&eKey, eKeyStr);
1✔
3262

3263
  if (wrapper->pCf[1] != NULL) {
1!
3264
    char* err = NULL;
1✔
3265
    rocksdb_delete_range_cf(wrapper->db, wrapper->writeOpt, wrapper->pCf[1], sKeyStr, sLen, eKeyStr, eLen, &err);
1✔
3266
    if (err != NULL) {
1!
3267
      char toStringStart[128] = {0};
×
3268
      char toStringEnd[128] = {0};
×
3269
      TAOS_UNUSED(stateKeyToString(&sKey, toStringStart));
×
3270
      TAOS_UNUSED(stateKeyToString(&eKey, toStringEnd));
×
3271

3272
      stWarn("failed to delete range cf(state) start: %s, end:%s, reason:%s", toStringStart, toStringEnd, err);
×
3273
      taosMemoryFree(err);
×
3274
    } else {
3275
      rocksdb_compact_range_cf(wrapper->db, wrapper->pCf[1], sKeyStr, sLen, eKeyStr, eLen);
1✔
3276
    }
3277
  }
3278

3279
  return 0;
1✔
3280
}
3281
void streamStateCurNext_rocksdb(SStreamStateCur* pCur) {
4,174✔
3282
  if (pCur && pCur->iter && rocksdb_iter_valid(pCur->iter)) {
4,174!
3283
    rocksdb_iter_next(pCur->iter);
4,091✔
3284
  }
3285
}
4,174✔
3286
int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) {
1✔
3287
  int code = 0;
1✔
3288
  stDebug("streamStateGetFirst_rocksdb");
1!
3289
  SWinKey tmp = {.ts = 0, .groupId = 0};
1✔
3290
  code = streamStatePut_rocksdb(pState, &tmp, NULL, 0);
1✔
3291
  if (code != 0) {
1!
3292
    return code;
×
3293
  }
3294

3295
  SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp);
1✔
3296
  code = streamStateGetKVByCur_rocksdb(pState, pCur, key, NULL, 0);
1✔
3297
  if (code != 0) {
1!
3298
    return code;
×
3299
  }
3300
  streamStateFreeCur(pCur);
1✔
3301
  return streamStateDel_rocksdb(pState, &tmp);
1✔
3302
}
3303

3304
int32_t streamStateFillGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal,
4,585✔
3305
                                               int32_t* pVLen) {
3306
  if (!pCur) {
4,585✔
3307
    return -1;
24✔
3308
  }
3309
  uint64_t groupId = pKey->groupId;
4,561✔
3310

3311
  int32_t code = streamStateFillGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
4,561✔
3312
  if (code == 0) {
4,561✔
3313
    if (pKey->groupId == groupId) {
2,950✔
3314
      return 0;
2,928✔
3315
    }
3316
    if (pVal != NULL) {
22✔
3317
      taosMemoryFree((void*)*pVal);
20!
3318
      *pVal = NULL;
20✔
3319
    }
3320
  }
3321
  return -1;
1,633✔
3322
}
3323
int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
×
3324
  stDebug("streamStateAddIfNotExist_rocksdb");
×
3325
  int32_t size = *pVLen;
×
3326
  if (streamStateGet_rocksdb(pState, key, pVal, pVLen) == 0) {
×
3327
    return 0;
×
3328
  }
3329
  *pVal = taosMemoryMalloc(size);
×
3330
  if (*pVal == NULL) {
×
3331
    return terrno;
×
3332
  }
3333
  memset(*pVal, 0, size);
×
3334
  return 0;
×
3335
}
3336
void streamStateCurPrev_rocksdb(SStreamStateCur* pCur) {
×
3337
  if (pCur) {
×
3338
    rocksdb_iter_prev(pCur->iter);
×
3339
  }
3340
}
×
3341
int32_t streamStateGetKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SWinKey* pKey, const void** pVal,
4,030✔
3342
                                      int32_t* pVLen) {
3343
  if (!pCur) return -1;
4,030✔
3344
  SStateKey  tkey;
3345
  SStateKey* pKtmp = &tkey;
16✔
3346

3347
  if (rocksdb_iter_valid(pCur->iter) && !iterValueIsStale(pCur->iter)) {
16!
3348
    size_t tlen;
3349
    char*  keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen);
16✔
3350
    TAOS_UNUSED(stateKeyDecode((void*)pKtmp, keyStr));
16✔
3351
    if (pKtmp->opNum != pCur->number) {
16!
3352
      return -1;
×
3353
    }
3354

3355
    if (pVLen != NULL) {
16✔
3356
      size_t      vlen = 0;
15✔
3357
      const char* valStr = rocksdb_iter_value(pCur->iter, &vlen);
15✔
3358
      char*       val = NULL;
15✔
3359
      int32_t     len = valueDecode((void*)valStr, vlen, NULL, (char**)val);
15✔
3360
      if (len <= 0) {
15!
3361
        taosMemoryFree(val);
×
3362
        return -1;
15✔
3363
      }
3364

3365
      char*  tVal = val;
15✔
3366
      size_t tVlen = len;
15✔
3367

3368
      if (pVal != NULL) {
15!
3369
        if (pState != NULL && pState->pResultRowStore.resultRowGet != NULL && pState->pExprSupp != NULL) {
15!
3370
          int code =
3371
              (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, val, len, (char**)&tVal, (size_t*)&tVlen);
15✔
3372
          if (code != 0) {
15!
3373
            taosMemoryFree(val);
15!
3374
            return code;
15✔
3375
          }
3376
          taosMemoryFree(val);
×
3377
          *pVal = (char*)tVal;
×
3378
        } else {
3379
          stInfo("streamStateGetKVByCur_rocksdb, pState = %p, pResultRowStore = %p, pExprSupp = %p", pState,
×
3380
                 pState->pResultRowStore.resultRowGet, pState->pExprSupp);
3381
          *pVal = (char*)tVal;
×
3382
        }
3383
      } else {
3384
        taosMemoryFree(val);
×
3385
      }
3386
      *pVLen = (int32_t)tVlen;
×
3387
    }
3388

3389
    *pKey = pKtmp->key;
1✔
3390
    return 0;
1✔
3391
  }
3392
  return -1;
×
3393
}
3394
SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key) {
305✔
3395
  SStreamStateCur* pCur = streamStateFillGetCur_rocksdb(pState, key);
305✔
3396
  if (pCur) {
306✔
3397
    int32_t code = streamStateFillGetGroupKVByCur_rocksdb(pCur, key, NULL, 0);
243✔
3398
    if (code == 0) return pCur;
242!
3399
    streamStateFreeCur(pCur);
×
3400
  }
3401
  return NULL;
63✔
3402
}
3403

3404
SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) {
2✔
3405
  SStreamStateCur* pCur = createStreamStateCursor();
2✔
3406
  if (pCur == NULL) {
2!
3407
    return NULL;
×
3408
  }
3409
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
2✔
3410
  pCur->number = pState->number;
2✔
3411
  pCur->db = wrapper->db;
2✔
3412
  pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
4✔
3413
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
2✔
3414

3415
  SStateKey sKey = {.key = *key, .opNum = pState->number};
2✔
3416
  char      buf[128] = {0};
2✔
3417
  int       len = stateKeyEncode((void*)&sKey, buf);
2✔
3418
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
2!
3419
    streamStateFreeCur(pCur);
×
3420
    return NULL;
×
3421
  }
3422
  // skip ttl expired data
3423
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
2!
3424
    rocksdb_iter_next(pCur->iter);
×
3425
  }
3426

3427
  if (rocksdb_iter_valid(pCur->iter)) {
2!
3428
    SStateKey curKey;
3429
    size_t    kLen;
3430
    char*     keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
2✔
3431
    TAOS_UNUSED(stateKeyDecode((void*)&curKey, keyStr));
2✔
3432
    if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) > 0) {
2!
3433
      return pCur;
×
3434
    }
3435
    rocksdb_iter_next(pCur->iter);
2✔
3436
    return pCur;
2✔
3437
  }
3438
  streamStateFreeCur(pCur);
×
3439
  return NULL;
×
3440
}
3441

3442
SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState) {
4,028✔
3443
  int32_t code = 0;
4,028✔
3444

3445
  const SStateKey maxStateKey = {.key = {.groupId = UINT64_MAX, .ts = INT64_MAX}, .opNum = INT64_MAX};
4,028✔
3446
  STREAM_STATE_PUT_ROCKSDB(pState, "state", &maxStateKey, "", 0);
4,028!
3447
  if (code != 0) {
4,028!
3448
    return NULL;
×
3449
  }
3450

3451
  {
3452
    char tbuf[256] = {0};
4,028✔
3453
    TAOS_UNUSED(stateKeyToString((void*)&maxStateKey, tbuf));
4,028✔
3454
    stDebug("seek to last:%s", tbuf);
4,028✔
3455
  }
3456

3457
  SStreamStateCur* pCur = createStreamStateCursor();
4,028✔
3458
  if (pCur == NULL) return NULL;
4,028!
3459

3460
  pCur->number = pState->number;
4,028✔
3461
  pCur->db = ((STaskDbWrapper*)pState->pTdbState->pOwner->pBackend)->db;
4,028✔
3462
  pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
8,056✔
3463
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
4,028✔
3464

3465
  char    buf[128] = {0};
4,028✔
3466
  int32_t klen = stateKeyEncode((void*)&maxStateKey, buf);
4,028✔
3467
  rocksdb_iter_seek(pCur->iter, buf, (size_t)klen);
4,028✔
3468
  rocksdb_iter_prev(pCur->iter);
4,028✔
3469
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
4,028!
3470
    rocksdb_iter_prev(pCur->iter);
×
3471
  }
3472

3473
  if (!rocksdb_iter_valid(pCur->iter)) {
4,028✔
3474
    streamStateFreeCur(pCur);
4,014✔
3475
    pCur = NULL;
4,014✔
3476
  }
3477

3478
  STREAM_STATE_DEL_ROCKSDB(pState, "state", &maxStateKey);
4,028!
3479
  return pCur;
4,028✔
3480
}
3481

3482
SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
×
3483
  stDebug("streamStateGetCur_rocksdb");
×
3484
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
×
3485

3486
  SStreamStateCur* pCur = createStreamStateCursor();
×
3487
  if (pCur == NULL) return NULL;
×
3488

3489
  pCur->db = wrapper->db;
×
3490
  pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
×
3491
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
×
3492
  pCur->number = pState->number;
×
3493

3494
  SStateKey sKey = {.key = *key, .opNum = pState->number};
×
3495
  char      buf[128] = {0};
×
3496
  int       len = stateKeyEncode((void*)&sKey, buf);
×
3497

3498
  rocksdb_iter_seek(pCur->iter, buf, len);
×
3499

3500
  if (rocksdb_iter_valid(pCur->iter) && !iterValueIsStale(pCur->iter)) {
×
3501
    SStateKey curKey;
3502
    size_t    kLen = 0;
×
3503
    char*     keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
×
3504
    TAOS_UNUSED(stateKeyDecode((void*)&curKey, keyStr));
×
3505

3506
    if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) == 0) {
×
3507
      pCur->number = pState->number;
×
3508
      return pCur;
×
3509
    }
3510
  }
3511
  streamStateFreeCur(pCur);
×
3512
  return NULL;
×
3513
}
3514

3515
// func cf
3516
int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) {
100✔
3517
  int    code = 0;
100✔
3518
  char*  dst = NULL;
100✔
3519
  size_t size = 0;
100✔
3520
  if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) {
100!
3521
    STREAM_STATE_PUT_ROCKSDB(pState, "func", key, (void*)value, (int32_t)vLen);
100!
3522
    return code;
100✔
3523
  }
3524
  code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size);
×
3525
  if (code != 0) {
×
3526
    return code;
×
3527
  }
3528
  STREAM_STATE_PUT_ROCKSDB(pState, "func", key, (void*)dst, (int32_t)size);
×
3529
  taosMemoryFree(dst);
×
3530

3531
  return code;
×
3532
}
3533
int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) {
100✔
3534
  int    code = 0;
100✔
3535
  char*  tVal = NULL;
100✔
3536
  size_t tValLen = 0;
100✔
3537
  STREAM_STATE_GET_ROCKSDB(pState, "func", key, tVal, &tValLen);
200!
3538
  if (code != 0) {
100!
3539
    taosMemoryFree(tVal);
×
3540
    return code;
×
3541
  }
3542

3543
  if (pState->pResultRowStore.resultRowGet == NULL || pState->pExprSupp == NULL) {
100!
3544
    *pVal = tVal;
100✔
3545
    *pVLen = tValLen;
100✔
3546
    return code;
100✔
3547
  }
3548

3549
  size_t pValLen = 0;
×
3550
  code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, &pValLen);
×
3551
  *pVLen = (int32_t)pValLen;
×
3552

3553
  taosMemoryFree(tVal);
×
3554
  return code;
×
3555
}
3556
int32_t streamStateFuncDel_rocksdb(SStreamState* pState, const STupleKey* key) {
100✔
3557
  int code = 0;
100✔
3558
  STREAM_STATE_DEL_ROCKSDB(pState, "func", key);
100!
3559
  return 0;
100✔
3560
}
3561

3562
// session cf
3563
int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
1,285✔
3564
  int              code = 0;
1,285✔
3565
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
1,285✔
3566
  if (value == NULL || vLen == 0) {
1,285!
3567
    stError("streamStateSessionPut_rocksdb val: %p, len: %d", value, vLen);
×
3568
  }
3569
  char*  dst = NULL;
1,285✔
3570
  size_t size = 0;
1,285✔
3571
  if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) {
1,285!
3572
    STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, (void*)value, (int32_t)vLen);
100!
3573
    return code;
100✔
3574
  }
3575

3576
  code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size);
1,185✔
3577
  if (code != 0) {
1,185!
3578
    return code;
×
3579
  }
3580
  STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, dst, (int32_t)size);
1,185!
3581
  taosMemoryFree(dst);
1,185!
3582

3583
  return code;
1,185✔
3584
}
3585
int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
753✔
3586
  stDebug("streamStateSessionGet_rocksdb");
753✔
3587
  int              code = 0;
753✔
3588
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, key);
753✔
3589
  SSessionKey      resKey = *key;
753✔
3590
  void*            tmp = NULL;
753✔
3591
  int32_t          vLen = 0;
753✔
3592

3593
  code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &resKey, &tmp, &vLen);
753✔
3594
  if (code == 0 && key->win.skey == resKey.win.skey) {
753!
3595
    *key = resKey;
753✔
3596

3597
    if (pVal) {
753!
3598
      *pVal = tmp;
753✔
3599
      tmp = NULL;
753✔
3600
    };
3601
    if (pVLen) *pVLen = vLen;
753!
3602
  } else {
3603
    code = -1;
×
3604
  }
3605

3606
  taosMemoryFree(tmp);
753!
3607
  streamStateFreeCur(pCur);
753✔
3608
  return code;
753✔
3609
}
3610

3611
int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* key) {
2,005✔
3612
  int              code = 0;
2,005✔
3613
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
2,005✔
3614
  STREAM_STATE_DEL_ROCKSDB(pState, "sess", &sKey);
2,005!
3615
  return code;
2,005✔
3616
}
3617

3618
SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState, int64_t groupId) {
1,690✔
3619
  stDebug("streamStateSessionSeekToLast_rocksdb");
1,690✔
3620

3621
  int32_t code = 0;
1,690✔
3622

3623
  SSessionKey      maxSessionKey = {.groupId = groupId, .win = {.skey = INT64_MAX, .ekey = INT64_MAX}};
1,690✔
3624
  SStateSessionKey maxKey = {.key = maxSessionKey, .opNum = pState->number};
1,690✔
3625

3626
  STREAM_STATE_PUT_ROCKSDB(pState, "sess", &maxKey, "", 0);
1,690!
3627
  if (code != 0) {
1,692!
3628
    return NULL;
×
3629
  }
3630
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
1,692✔
3631

3632
  SStreamStateCur* pCur = createStreamStateCursor();
1,692✔
3633
  pCur->number = pState->number;
1,692✔
3634
  pCur->db = wrapper->db;
1,692✔
3635
  pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
3,384✔
3636
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
1,692✔
3637

3638
  char    buf[128] = {0};
1,692✔
3639
  int32_t klen = stateSessionKeyEncode((void*)&maxKey, buf);
1,692✔
3640
  rocksdb_iter_seek(pCur->iter, buf, (size_t)klen);
1,692✔
3641
  rocksdb_iter_prev(pCur->iter);
1,692✔
3642
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
1,692!
3643
    rocksdb_iter_prev(pCur->iter);
×
3644
  }
3645

3646
  if (!rocksdb_iter_valid(pCur->iter)) {
1,692✔
3647
    streamStateFreeCur(pCur);
1,690✔
3648
    pCur = NULL;
1,690✔
3649
  }
3650

3651
  STREAM_STATE_DEL_ROCKSDB(pState, "sess", &maxKey);
1,692!
3652
  return pCur;
1,692✔
3653
}
3654

3655
int32_t streamStateSessionCurPrev_rocksdb(SStreamStateCur* pCur) {
4✔
3656
  stDebug("streamStateCurPrev_rocksdb");
4!
3657
  if (!pCur) return -1;
4!
3658

3659
  rocksdb_iter_prev(pCur->iter);
4✔
3660
  return 0;
4✔
3661
}
3662

3663
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) {
3,721✔
3664
  stDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb");
3,721✔
3665

3666
  STaskDbWrapper*  wrapper = pState->pTdbState->pOwner->pBackend;
3,721✔
3667
  SStreamStateCur* pCur = createStreamStateCursor();
3,721✔
3668
  if (pCur == NULL) {
3,722!
3669
    return NULL;
×
3670
  }
3671

3672
  pCur->number = pState->number;
3,722✔
3673
  pCur->db = wrapper->db;
3,722✔
3674
  pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
7,444✔
3675
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
3,722✔
3676
  if (pCur->iter == NULL) {
3,722!
3677
    streamStateFreeCur(pCur);
×
3678
    return NULL;
×
3679
  }
3680

3681
  char             buf[128] = {0};
3,722✔
3682
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
3,722✔
3683
  int              len = stateSessionKeyEncode(&sKey, buf);
3,722✔
3684
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
3,720✔
3685
    streamStateFreeCur(pCur);
1,973✔
3686
    return NULL;
1,973✔
3687
  }
3688
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) rocksdb_iter_prev(pCur->iter);
1,748!
3689

3690
  if (!rocksdb_iter_valid(pCur->iter)) {
1,748!
3691
    streamStateFreeCur(pCur);
×
3692
    return NULL;
×
3693
  }
3694

3695
  int32_t          c = 0;
1,749✔
3696
  size_t           klen;
3697
  const char*      iKey = rocksdb_iter_key(pCur->iter, &klen);
1,749✔
3698
  SStateSessionKey curKey = {0};
1,747✔
3699
  TAOS_UNUSED(stateSessionKeyDecode(&curKey, (char*)iKey));
1,747✔
3700
  if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) >= 0) return pCur;
1,747✔
3701

3702
  if (!rocksdb_iter_valid(pCur->iter)) {
765!
3703
    streamStateFreeCur(pCur);
×
3704
    return NULL;
×
3705
  }
3706

3707
  rocksdb_iter_prev(pCur->iter);
764✔
3708
  if (!rocksdb_iter_valid(pCur->iter)) {
765✔
3709
    streamStateFreeCur(pCur);
187✔
3710
    return NULL;
187✔
3711
  }
3712
  return pCur;
578✔
3713
}
3714
SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) {
3,057✔
3715
  stDebug("streamStateSessionSeekKeyCurrentNext_rocksdb");
3,057✔
3716
  STaskDbWrapper*  wrapper = pState->pTdbState->pOwner->pBackend;
3,057✔
3717
  SStreamStateCur* pCur = createStreamStateCursor();
3,057✔
3718
  if (pCur == NULL) {
3,057!
3719
    return NULL;
×
3720
  }
3721
  pCur->db = wrapper->db;
3,057✔
3722
  pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
6,114✔
3723
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
3,057✔
3724
  pCur->number = pState->number;
3,057✔
3725

3726
  char             buf[128] = {0};
3,057✔
3727
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
3,057✔
3728
  int              len = stateSessionKeyEncode(&sKey, buf);
3,057✔
3729

3730
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
3,057✔
3731
    streamStateFreeCur(pCur);
1,789✔
3732
    return NULL;
1,789✔
3733
  }
3734
  if (iterValueIsStale(pCur->iter)) {
1,268!
3735
    streamStateFreeCur(pCur);
×
3736
    return NULL;
×
3737
  }
3738
  size_t           klen;
3739
  const char*      iKey = rocksdb_iter_key(pCur->iter, &klen);
1,268✔
3740
  SStateSessionKey curKey = {0};
1,268✔
3741
  TAOS_UNUSED(stateSessionKeyDecode(&curKey, (char*)iKey));
1,268✔
3742
  if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) <= 0) return pCur;
1,268✔
3743

3744
  rocksdb_iter_next(pCur->iter);
73✔
3745
  if (!rocksdb_iter_valid(pCur->iter)) {
73!
3746
    streamStateFreeCur(pCur);
73✔
3747
    return NULL;
73✔
3748
  }
3749
  return pCur;
×
3750
}
3751

3752
SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) {
3,538✔
3753
  stDebug("streamStateSessionSeekKeyNext_rocksdb");
3,538✔
3754
  STaskDbWrapper*  wrapper = pState->pTdbState->pOwner->pBackend;
3,538✔
3755
  SStreamStateCur* pCur = createStreamStateCursor();
3,538✔
3756
  if (pCur == NULL) {
3,539!
3757
    return NULL;
×
3758
  }
3759
  pCur->db = wrapper->db;
3,539✔
3760
  pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
7,079✔
3761
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
3,539✔
3762
  pCur->number = pState->number;
3,540✔
3763

3764
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
3,540✔
3765

3766
  char buf[128] = {0};
3,540✔
3767
  int  len = stateSessionKeyEncode(&sKey, buf);
3,540✔
3768
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
3,540✔
3769
    streamStateFreeCur(pCur);
2,257✔
3770
    return NULL;
2,257✔
3771
  }
3772
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) rocksdb_iter_next(pCur->iter);
1,283!
3773
  if (!rocksdb_iter_valid(pCur->iter)) {
1,282!
3774
    streamStateFreeCur(pCur);
×
3775
    return NULL;
×
3776
  }
3777

3778
  size_t           klen;
3779
  const char*      iKey = rocksdb_iter_key(pCur->iter, &klen);
1,283✔
3780
  SStateSessionKey curKey = {0};
1,283✔
3781
  TAOS_UNUSED(stateSessionKeyDecode(&curKey, (char*)iKey));
1,283✔
3782
  if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) < 0) return pCur;
1,283✔
3783

3784
  rocksdb_iter_next(pCur->iter);
355✔
3785
  if (!rocksdb_iter_valid(pCur->iter)) {
355✔
3786
    streamStateFreeCur(pCur);
353✔
3787
    return NULL;
353✔
3788
  }
3789
  return pCur;
2✔
3790
}
3791

3792
SStreamStateCur* streamStateSessionSeekKeyPrev_rocksdb(SStreamState* pState, const SSessionKey* key) {
1,882✔
3793
  stDebug("streamStateSessionSeekKeyPrev_rocksdb");
1,882✔
3794
  STaskDbWrapper*  wrapper = pState->pTdbState->pOwner->pBackend;
1,882✔
3795
  SStreamStateCur* pCur = createStreamStateCursor();
1,882✔
3796
  if (pCur == NULL) {
1,882!
3797
    return NULL;
×
3798
  }
3799
  pCur->db = wrapper->db;
1,882✔
3800
  pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
3,764✔
3801
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
1,882✔
3802
  pCur->number = pState->number;
1,882✔
3803

3804
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
1,882✔
3805

3806
  char buf[128] = {0};
1,882✔
3807
  int  len = stateSessionKeyEncode(&sKey, buf);
1,882✔
3808
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
1,882✔
3809
    streamStateFreeCur(pCur);
1,534✔
3810
    return NULL;
1,534✔
3811
  }
3812
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) rocksdb_iter_prev(pCur->iter);
348!
3813
  if (!rocksdb_iter_valid(pCur->iter)) {
348!
3814
    streamStateFreeCur(pCur);
×
3815
    return NULL;
×
3816
  }
3817

3818
  size_t           klen;
3819
  const char*      iKey = rocksdb_iter_key(pCur->iter, &klen);
348✔
3820
  SStateSessionKey curKey = {0};
348✔
3821
  TAOS_UNUSED(stateSessionKeyDecode(&curKey, (char*)iKey));
348✔
3822
  if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) > 0) return pCur;
348✔
3823

3824
  rocksdb_iter_prev(pCur->iter);
322✔
3825
  if (!rocksdb_iter_valid(pCur->iter)) {
322✔
3826
    streamStateFreeCur(pCur);
9✔
3827
    return NULL;
9✔
3828
  }
3829
  return pCur;
313✔
3830
}
3831

3832
int32_t streamStateSessionGetKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SSessionKey* pKey,
14,025✔
3833
                                             void** pVal, int32_t* pVLen) {
3834
  if (!pCur) {
14,025✔
3835
    return -1;
8,817✔
3836
  }
3837
  SStateSessionKey ktmp = {0};
5,208✔
3838
  size_t           kLen = 0, vLen = 0;
5,208✔
3839

3840
  if (!rocksdb_iter_valid(pCur->iter) || iterValueIsStale(pCur->iter)) {
5,208!
3841
    return -1;
175✔
3842
  }
3843
  const char* curKey = rocksdb_iter_key(pCur->iter, (size_t*)&kLen);
5,034✔
3844
  TAOS_UNUSED(stateSessionKeyDecode((void*)&ktmp, (char*)curKey));
5,034✔
3845

3846
  if (pVal != NULL) *pVal = NULL;
5,034✔
3847
  if (pVLen != NULL) *pVLen = 0;
5,034✔
3848

3849
  SStateSessionKey* pKTmp = &ktmp;
5,034✔
3850
  const char*       vval = rocksdb_iter_value(pCur->iter, (size_t*)&vLen);
5,034✔
3851
  char*             val = NULL;
5,034✔
3852
  int32_t           len = valueDecode((void*)vval, vLen, NULL, &val);
5,034✔
3853
  if (len < 0) {
5,032!
3854
    taosMemoryFree(val);
×
3855
    return -1;
×
3856
  }
3857

3858
  if (pKTmp->opNum != pCur->number) {
5,032✔
3859
    taosMemoryFree(val);
68!
3860
    return -1;
68✔
3861
  }
3862
  if (pKey->groupId != 0 && pKey->groupId != pKTmp->key.groupId) {
4,964✔
3863
    taosMemoryFree(val);
2,538!
3864
    return -1;
2,539✔
3865
  }
3866

3867
  char*  tVal = val;
2,426✔
3868
  size_t tVlen = len;
2,426✔
3869

3870
  if (pVal != NULL) {
2,426✔
3871
    if (pState != NULL && pState->pResultRowStore.resultRowGet != NULL && pState->pExprSupp != NULL) {
2,357!
3872
      int code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, val, len, (char**)&tVal, (size_t*)&tVlen);
2,152✔
3873
      if (code != 0) {
2,152!
3874
        taosMemoryFree(val);
×
3875
        return code;
×
3876
      }
3877
      taosMemoryFree(val);
2,152!
3878
      *pVal = (char*)tVal;
2,152✔
3879
    } else {
3880
      *pVal = (char*)tVal;
205✔
3881
    }
3882
  } else {
3883
    taosMemoryFree(val);
69!
3884
  }
3885

3886
  if (pVLen != NULL) *pVLen = (int32_t)tVlen;
2,427✔
3887

3888
  *pKey = pKTmp->key;
2,427✔
3889
  return 0;
2,427✔
3890
}
3891
// fill cf
3892
int32_t streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
1,753✔
3893
  int code = 0;
1,753✔
3894

3895
  STREAM_STATE_PUT_ROCKSDB(pState, "fill", key, value, vLen);
1,753!
3896
  return code;
1,753✔
3897
}
3898

3899
int32_t streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
2,233✔
3900
  int code = 0;
2,233✔
3901

3902
  STREAM_STATE_GET_ROCKSDB(pState, "fill", key, pVal, pVLen);
2,233!
3903
  return code;
2,233✔
3904
}
3905
int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) {
437✔
3906
  int code = 0;
437✔
3907
  STREAM_STATE_DEL_ROCKSDB(pState, "fill", key);
437!
3908
  return code;
438✔
3909
}
3910

3911
SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
306✔
3912
  stDebug("streamStateFillGetCur_rocksdb");
306✔
3913
  SStreamStateCur* pCur = createStreamStateCursor();
306✔
3914
  STaskDbWrapper*  wrapper = pState->pTdbState->pOwner->pBackend;
305✔
3915

3916
  if (pCur == NULL) return NULL;
305!
3917

3918
  pCur->db = wrapper->db;
305✔
3919
  pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot,
612✔
3920
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
305✔
3921
  pCur->number = pState->number;
307✔
3922

3923
  char buf[128] = {0};
307✔
3924
  int  len = winKeyEncode((void*)key, buf);
307✔
3925
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
307✔
3926
    streamStateFreeCur(pCur);
5✔
3927
    return NULL;
5✔
3928
  }
3929
  if (iterValueIsStale(pCur->iter)) {
302!
3930
    streamStateFreeCur(pCur);
×
3931
    return NULL;
×
3932
  }
3933

3934
  if (rocksdb_iter_valid(pCur->iter)) {
302!
3935
    size_t  kLen;
3936
    SWinKey curKey;
3937
    char*   keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
302✔
3938
    TAOS_UNUSED(winKeyDecode((void*)&curKey, keyStr));
302✔
3939
    if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) == 0) {
302✔
3940
      return pCur;
244✔
3941
    }
3942
  }
3943

3944
  streamStateFreeCur(pCur);
58✔
3945
  return NULL;
58✔
3946
}
3947
int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
4,563✔
3948
  if (!pCur) {
4,563!
3949
    return -1;
×
3950
  }
3951
  SWinKey winKey;
3952
  if (!rocksdb_iter_valid(pCur->iter) || iterValueIsStale(pCur->iter)) {
4,563!
3953
    return -1;
1,611✔
3954
  }
3955
  size_t klen, vlen;
3956
  char*  keyStr = (char*)rocksdb_iter_key(pCur->iter, &klen);
2,954✔
3957
  TAOS_UNUSED(winKeyDecode(&winKey, keyStr));
2,953✔
3958

3959
  const char* valStr = rocksdb_iter_value(pCur->iter, &vlen);
2,954✔
3960
  int32_t     len = valueDecode((void*)valStr, vlen, NULL, (char**)pVal);
2,954✔
3961
  if (len < 0) {
2,953!
3962
    return -1;
×
3963
  }
3964
  if (pVLen != NULL) *pVLen = len;
2,953✔
3965

3966
  *pKey = winKey;
2,953✔
3967
  return 0;
2,953✔
3968
}
3969

3970
SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) {
436✔
3971
  stDebug("streamStateFillSeekKeyNext_rocksdb");
436✔
3972
  STaskDbWrapper*  wrapper = pState->pTdbState->pOwner->pBackend;
436✔
3973
  SStreamStateCur* pCur = createStreamStateCursor();
436✔
3974
  if (!pCur) {
437!
3975
    return NULL;
×
3976
  }
3977

3978
  pCur->db = wrapper->db;
437✔
3979
  pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot,
874✔
3980
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
437✔
3981
  pCur->number = pState->number;
437✔
3982

3983
  char buf[128] = {0};
437✔
3984
  int  len = winKeyEncode((void*)key, buf);
437✔
3985
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
437✔
3986
    streamStateFreeCur(pCur);
8✔
3987
    return NULL;
8✔
3988
  }
3989
  // skip stale data
3990
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
429!
3991
    rocksdb_iter_next(pCur->iter);
×
3992
  }
3993

3994
  if (rocksdb_iter_valid(pCur->iter)) {
429!
3995
    SWinKey curKey;
3996
    size_t  kLen = 0;
429✔
3997
    char*   keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
429✔
3998
    TAOS_UNUSED(winKeyDecode((void*)&curKey, keyStr));
429✔
3999
    if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) < 0) {
429!
4000
      return pCur;
×
4001
    }
4002
    rocksdb_iter_next(pCur->iter);
429✔
4003
    return pCur;
429✔
4004
  }
4005
  streamStateFreeCur(pCur);
×
4006
  return NULL;
×
4007
}
4008
SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) {
1,933✔
4009
  stDebug("streamStateFillSeekKeyPrev_rocksdb");
1,933✔
4010
  STaskDbWrapper*  wrapper = pState->pTdbState->pOwner->pBackend;
1,933✔
4011
  SStreamStateCur* pCur = createStreamStateCursor();
1,933✔
4012
  if (pCur == NULL) {
1,934!
4013
    return NULL;
×
4014
  }
4015

4016
  pCur->db = wrapper->db;
1,934✔
4017
  pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot,
3,870✔
4018
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
1,934✔
4019
  pCur->number = pState->number;
1,936✔
4020

4021
  char buf[128] = {0};
1,936✔
4022
  int  len = winKeyEncode((void*)key, buf);
1,936✔
4023
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
1,936✔
4024
    streamStateFreeCur(pCur);
77✔
4025
    return NULL;
77✔
4026
  }
4027
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
1,859!
4028
    rocksdb_iter_prev(pCur->iter);
×
4029
  }
4030

4031
  if (rocksdb_iter_valid(pCur->iter)) {
1,859!
4032
    SWinKey curKey;
4033
    size_t  kLen = 0;
1,859✔
4034
    char*   keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
1,859✔
4035
    TAOS_UNUSED(winKeyDecode((void*)&curKey, keyStr));
1,859✔
4036
    if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) > 0) {
1,859✔
4037
      return pCur;
2✔
4038
    }
4039
    rocksdb_iter_prev(pCur->iter);
1,856✔
4040
    return pCur;
1,857✔
4041
  }
4042

4043
  streamStateFreeCur(pCur);
×
4044
  return NULL;
×
4045
}
4046

4047
SStreamStateCur* streamStateFillSeekToLast_rocksdb(SStreamState* pState) {
61✔
4048
  SWinKey key = {.groupId = UINT64_MAX, .ts = INT64_MAX};
61✔
4049
  return streamStateFillSeekKeyPrev_rocksdb(pState, &key);
61✔
4050
}
4051

4052
#ifdef BUILD_NO_CALL
4053
int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
4054
  stDebug("streamStateSessionGetKeyByRange_rocksdb");
4055
  STaskDbWrapper*  wrapper = pState->pTdbState->pOwner->pBackend;
4056
  SStreamStateCur* pCur = createStreamStateCursor();
4057
  if (pCur == NULL) {
4058
    return -1;
4059
  }
4060
  pCur->db = wrapper->db;
4061
  pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
4062
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
4063
  pCur->number = pState->number;
4064

4065
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
4066
  int32_t          c = 0;
4067
  char             buf[128] = {0};
4068
  int              len = stateSessionKeyEncode(&sKey, buf);
4069
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
4070
    streamStateFreeCur(pCur);
4071
    return -1;
4072
  }
4073

4074
  size_t           kLen;
4075
  const char*      iKeyStr = rocksdb_iter_key(pCur->iter, (size_t*)&kLen);
4076
  SStateSessionKey iKey = {0};
4077
  stateSessionKeyDecode(&iKey, (char*)iKeyStr);
4078

4079
  c = stateSessionKeyCmpr(&sKey, sizeof(sKey), &iKey, sizeof(iKey));
4080

4081
  SSessionKey resKey = *key;
4082
  int32_t     code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &resKey, NULL, NULL);
4083
  if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
4084
    *curKey = resKey;
4085
    streamStateFreeCur(pCur);
4086
    return code;
4087
  }
4088

4089
  if (c > 0) {
4090
    streamStateCurNext_rocksdb(pCur);
4091
    code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &resKey, NULL, NULL);
4092
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
4093
      *curKey = resKey;
4094
      streamStateFreeCur(pCur);
4095
      return code;
4096
    }
4097
  } else if (c < 0) {
4098
    streamStateCurPrev(pState, pCur);
4099
    code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &resKey, NULL, NULL);
4100
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
4101
      *curKey = resKey;
4102
      streamStateFreeCur(pCur);
4103
      return code;
4104
    }
4105
  }
4106

4107
  streamStateFreeCur(pCur);
4108
  return -1;
4109
}
4110
#endif
4111

4112
int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
1,421✔
4113
                                                int32_t* pVLen) {
4114
  stDebug("streamStateSessionAddIfNotExist_rocksdb");
1,421✔
4115
  // todo refactor
4116
  int32_t     res = 0;
1,421✔
4117
  SSessionKey originKey = *key;
1,421✔
4118
  SSessionKey searchKey = *key;
1,421✔
4119
  searchKey.win.skey = key->win.skey - gap;
1,421✔
4120
  searchKey.win.ekey = key->win.ekey + gap;
1,421✔
4121
  int32_t valSize = *pVLen;
1,421✔
4122

4123
  void* tmp = taosMemoryMalloc(valSize);
1,421!
4124
  if (tmp == NULL) {
1,421!
4125
    return terrno;
×
4126
  }
4127

4128
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key);
1,421✔
4129
  int32_t          code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, key, pVal, pVLen);
1,421✔
4130

4131
  if (code == 0) {
1,421✔
4132
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
174✔
4133
      memcpy(tmp, *pVal, *pVLen);
139✔
4134
      taosMemoryFreeClear(*pVal);
139!
4135
      goto _end;
139✔
4136
    }
4137
    taosMemoryFreeClear(*pVal);
35!
4138
    streamStateCurNext_rocksdb(pCur);
35✔
4139
  } else {
4140
    *key = originKey;
1,247✔
4141
    streamStateFreeCur(pCur);
1,247✔
4142
    taosMemoryFreeClear(*pVal);
1,247!
4143
    pCur = streamStateSessionSeekKeyNext_rocksdb(pState, key);
1,247✔
4144
  }
4145

4146
  code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, key, pVal, pVLen);
1,282✔
4147
  if (code == 0) {
1,282✔
4148
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
16!
4149
      memcpy(tmp, *pVal, *pVLen);
×
4150
      goto _end;
×
4151
    }
4152
  }
4153

4154
  *key = originKey;
1,281✔
4155
  res = 1;
1,281✔
4156
  memset(tmp, 0, valSize);
1,281✔
4157

4158
_end:
1,420✔
4159
  taosMemoryFree(*pVal);
1,420!
4160
  *pVal = tmp;
1,421✔
4161
  streamStateFreeCur(pCur);
1,421✔
4162
  return res;
1,421✔
4163
}
4164
void streamStateSessionClear_rocksdb(SStreamState* pState) {
588✔
4165
  stDebug("streamStateSessionClear_rocksdb");
588✔
4166
  SSessionKey      key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
588✔
4167
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, &key);
588✔
4168

4169
  while (1) {
577✔
4170
    SSessionKey delKey = {0};
1,165✔
4171
    void*       buf = NULL;
1,165✔
4172
    int32_t     size = 0;
1,165✔
4173
    int32_t     code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &delKey, &buf, &size);
1,165✔
4174
    if (code == 0 && size > 0) {
1,165!
4175
      memset(buf, 0, size);
577✔
4176
      // refactor later
4177
      TAOS_UNUSED(streamStateSessionPut_rocksdb(pState, &delKey, buf, size));
577✔
4178
    } else {
4179
      taosMemoryFreeClear(buf);
588!
4180
      break;
588✔
4181
    }
4182
    taosMemoryFreeClear(buf);
577!
4183

4184
    streamStateCurNext_rocksdb(pCur);
577✔
4185
  }
4186
  streamStateFreeCur(pCur);
588✔
4187
}
588✔
4188
int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, char* pKeyData,
742✔
4189
                                              int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) {
4190
  stDebug("streamStateStateAddIfNotExist_rocksdb");
742✔
4191
  // todo refactor
4192
  int32_t     res = 0;
743✔
4193
  SSessionKey tmpKey = *key;
743✔
4194
  int32_t     valSize = *pVLen;
743✔
4195
  void*       tmp = taosMemoryMalloc(valSize);
743!
4196
  if (!tmp) {
743!
4197
    return -1;
×
4198
  }
4199

4200
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key);
743✔
4201
  int32_t          code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, key, pVal, pVLen);
742✔
4202
  if (code == 0) {
742✔
4203
    if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) {
33!
4204
      memcpy(tmp, *pVal, valSize);
5✔
4205
      goto _end;
5✔
4206
    }
4207

4208
    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
28✔
4209
    if (fn(pKeyData, stateKey) == true) {
28✔
4210
      memcpy(tmp, *pVal, valSize);
4✔
4211
      goto _end;
4✔
4212
    }
4213

4214
    streamStateCurNext_rocksdb(pCur);
24✔
4215
  } else {
4216
    *key = tmpKey;
709✔
4217
    streamStateFreeCur(pCur);
709✔
4218
    pCur = streamStateSessionSeekKeyNext_rocksdb(pState, key);
709✔
4219
  }
4220
  taosMemoryFreeClear(*pVal);
734!
4221
  code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, key, pVal, pVLen);
734✔
4222
  if (code == 0) {
734✔
4223
    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
5✔
4224
    if (fn(pKeyData, stateKey) == true) {
5!
4225
      memcpy(tmp, *pVal, valSize);
×
4226
      goto _end;
×
4227
    }
4228
  }
4229
  taosMemoryFreeClear(*pVal);
734!
4230

4231
  *key = tmpKey;
734✔
4232
  res = 1;
734✔
4233
  memset(tmp, 0, valSize);
734✔
4234

4235
_end:
743✔
4236
  taosMemoryFreeClear(*pVal);
743!
4237
  *pVal = tmp;
743✔
4238
  streamStateFreeCur(pCur);
743✔
4239
  return res;
743✔
4240
}
4241

4242
//  partag cf
4243
int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen) {
42✔
4244
  int    code = 0;
42✔
4245
  char*  dst = NULL;
42✔
4246
  size_t size = 0;
42✔
4247
  if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL || tag == NULL) {
42!
4248
    STREAM_STATE_PUT_ROCKSDB(pState, "partag", &groupId, tag, tagLen);
42!
4249
    return code;
42✔
4250
  }
4251
  code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, tag, tagLen, &dst, &size);
×
4252
  if (code != 0) {
×
4253
    return code;
×
4254
  }
4255
  STREAM_STATE_PUT_ROCKSDB(pState, "partag", &groupId, dst, (int32_t)size);
×
4256
  taosMemoryFree(dst);
×
4257
  return code;
×
4258
}
4259

4260
void streamStateParTagSeekKeyNext_rocksdb(SStreamState* pState, const int64_t groupId, SStreamStateCur* pCur) {
1,258✔
4261
  if (pCur == NULL) {
1,258!
4262
    return;
862✔
4263
  }
4264
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
1,258✔
4265
  pCur->number = pState->number;
1,258✔
4266
  pCur->db = wrapper->db;
1,258✔
4267
  pCur->iter = streamStateIterCreate(pState, "partag", (rocksdb_snapshot_t**)&pCur->snapshot,
2,516✔
4268
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
1,258✔
4269
  int i = streamStateGetCfIdx(pState, "partag");
1,258✔
4270
  if (i < 0) {
1,258!
4271
    stError("streamState failed to put to cf name:%s", "partag");
×
4272
    return;
×
4273
  }
4274

4275
  char    buf[128] = {0};
1,258✔
4276
  int32_t klen = ginitDict[i].enFunc((void*)&groupId, buf);
1,258✔
4277
  if (!streamStateIterSeekAndValid(pCur->iter, buf, klen)) {
1,258✔
4278
    return;
862✔
4279
  }
4280
  // skip ttl expired data
4281
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
396!
4282
    rocksdb_iter_next(pCur->iter);
×
4283
  }
4284

4285
  if (rocksdb_iter_valid(pCur->iter)) {
396!
4286
    int64_t curGroupId;
4287
    size_t  kLen = 0;
396✔
4288
    char*   keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
396✔
4289
    TAOS_UNUSED(parKeyDecode((void*)&curGroupId, keyStr));
396✔
4290
    if (curGroupId > groupId) return;
396!
4291

4292
    rocksdb_iter_next(pCur->iter);
396✔
4293
  }
4294
}
4295

4296
int32_t streamStateParTagGetKVByCur_rocksdb(SStreamStateCur* pCur, int64_t* pGroupId, const void** pVal,
1,258✔
4297
                                            int32_t* pVLen) {
4298
  stDebug("streamStateFillGetKVByCur_rocksdb");
1,258!
4299
  if (!pCur) {
1,258!
4300
    return -1;
×
4301
  }
4302
  SWinKey winKey;
4303
  if (!rocksdb_iter_valid(pCur->iter) || iterValueIsStale(pCur->iter)) {
1,258!
4304
    return -1;
1,258✔
4305
  }
4306

4307
  size_t klen, vlen;
4308
  char*  keyStr = (char*)rocksdb_iter_key(pCur->iter, &klen);
×
4309
  (void)parKeyDecode(pGroupId, keyStr);
×
4310

4311
  if (pVal) {
×
4312
    const char* valStr = rocksdb_iter_value(pCur->iter, &vlen);
×
4313
    int32_t     len = valueDecode((void*)valStr, vlen, NULL, (char**)pVal);
×
4314
    if (len < 0) {
×
4315
      return -1;
×
4316
    }
4317
    if (pVLen != NULL) *pVLen = len;
×
4318
  }
4319

4320
  return 0;
×
4321
}
4322

4323
#ifdef BUILD_NO_CALL
4324
int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen) {
4325
  int    code = 0;
4326
  char*  tVal;
4327
  size_t tValLen = 0;
4328
  STREAM_STATE_GET_ROCKSDB(pState, "partag", &groupId, &tVal, &tValLen);
4329
  if (code != 0) {
4330
    taosMemoryFree(tVal);
4331
    return code;
4332
  }
4333
  code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)tagVal, (size_t*)tagLen);
4334
  taosMemoryFree(tVal);
4335

4336
  return code;
4337
}
4338
#endif
4339
// parname cfg
4340
int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
5,462✔
4341
  int code = 0;
5,462✔
4342
  STREAM_STATE_PUT_ROCKSDB(pState, "parname", &groupId, (char*)tbname, TSDB_TABLE_NAME_LEN);
5,462!
4343
  return code;
5,465✔
4344
}
4345
int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, void** pVal) {
561,044✔
4346
  int    code = 0;
561,044✔
4347
  size_t tagLen;
4348
  STREAM_STATE_GET_ROCKSDB(pState, "parname", &groupId, pVal, &tagLen);
561,311!
4349
  return code;
561,965✔
4350
}
4351

4352
int32_t streamStateDeleteParName_rocksdb(SStreamState* pState, int64_t groupId) {
55✔
4353
  int    code = 0;
55✔
4354
  STREAM_STATE_DEL_ROCKSDB(pState, "parname", &groupId);
55!
4355
  return code;
55✔
4356
}
4357

4358
int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen) {
100✔
4359
  int code = 0;
100✔
4360
  STREAM_STATE_PUT_ROCKSDB(pState, "default", key, pVal, pVLen);
100!
4361
  return code;
100✔
4362
}
4363
int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen) {
21,936✔
4364
  int code = 0;
21,936✔
4365
  STREAM_STATE_GET_ROCKSDB(pState, "default", key, pVal, pVLen);
21,936!
4366
  return code;
21,937✔
4367
}
4368
int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key) {
×
4369
  int code = 0;
×
4370
  STREAM_STATE_DEL_ROCKSDB(pState, "default", key);
×
4371
  return code;
×
4372
}
4373

4374
int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result) {
1✔
4375
  int   code = 0;
1✔
4376
  char* err = NULL;
1✔
4377

4378
  STaskDbWrapper*        wrapper = pState->pTdbState->pOwner->pBackend;
1✔
4379
  rocksdb_snapshot_t*    snapshot = NULL;
1✔
4380
  rocksdb_readoptions_t* readopts = NULL;
1✔
4381
  rocksdb_iterator_t*    pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts);
1✔
4382
  if (pIter == NULL) {
1!
4383
    return -1;
×
4384
  }
4385
  size_t klen = 0;
1✔
4386
  rocksdb_iter_seek(pIter, start, strlen(start));
1✔
4387
  while (rocksdb_iter_valid(pIter)) {
101✔
4388
    const char* key = rocksdb_iter_key(pIter, &klen);
100✔
4389
    int32_t     vlen = 0;
100✔
4390
    const char* vval = rocksdb_iter_value(pIter, (size_t*)&vlen);
100✔
4391
    char*       val = NULL;
100✔
4392
    int32_t     len = valueDecode((void*)vval, vlen, NULL, NULL);
100✔
4393
    if (len < 0) {
100!
4394
      rocksdb_iter_next(pIter);
×
4395
      continue;
×
4396
    }
4397

4398
    if (end != NULL && strcmp(key, end) > 0) {
100!
4399
      break;
×
4400
    }
4401
    if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) {
100!
4402
      int64_t checkPoint = 0;
100✔
4403
      if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) {
100!
4404
        if (taosArrayPush(result, &checkPoint) == NULL) {
×
4405
          code = terrno;
×
4406
          break;
×
4407
        }
4408
      }
4409
    } else {
4410
      break;
4411
    }
4412
    rocksdb_iter_next(pIter);
100✔
4413
  }
4414
  rocksdb_release_snapshot(wrapper->db, snapshot);
1✔
4415
  rocksdb_readoptions_destroy(readopts);
1✔
4416
  rocksdb_iter_destroy(pIter);
1✔
4417
  return code;
1✔
4418
}
4419
#ifdef BUILD_NO_CALL
4420
void* streamDefaultIterCreate_rocksdb(SStreamState* pState) {
4421
  SStreamStateCur* pCur = createStreamStateCursor();
4422
  STaskDbWrapper*  wrapper = pState->pTdbState->pOwner->pBackend;
4423

4424
  pCur->db = wrapper->db;
4425
  pCur->iter = streamStateIterCreate(pState, "default", (rocksdb_snapshot_t**)&pCur->snapshot,
4426
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
4427
  pCur->number = pState->number;
4428
  return pCur;
4429
}
4430
bool streamDefaultIterValid_rocksdb(void* iter) {
4431
  if (iter) {
4432
    return false;
4433
  }
4434
  SStreamStateCur* pCur = iter;
4435
  return (rocksdb_iter_valid(pCur->iter) && !iterValueIsStale(pCur->iter)) ? true : false;
4436
}
4437
void streamDefaultIterSeek_rocksdb(void* iter, const char* key) {
4438
  SStreamStateCur* pCur = iter;
4439
  rocksdb_iter_seek(pCur->iter, key, strlen(key));
4440
}
4441
void streamDefaultIterNext_rocksdb(void* iter) {
4442
  SStreamStateCur* pCur = iter;
4443
  rocksdb_iter_next(pCur->iter);
4444
}
4445
char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len) {
4446
  SStreamStateCur* pCur = iter;
4447
  return (char*)rocksdb_iter_key(pCur->iter, (size_t*)len);
4448
}
4449
char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len) {
4450
  SStreamStateCur* pCur = iter;
4451
  char*            ret = NULL;
4452

4453
  int32_t     vlen = 0;
4454
  const char* val = rocksdb_iter_value(pCur->iter, (size_t*)&vlen);
4455
  *len = valueDecode((void*)val, vlen, NULL, &ret);
4456
  if (*len < 0) {
4457
    taosMemoryFree(ret);
4458
    return NULL;
4459
  }
4460

4461
  return ret;
4462
}
4463
#endif
4464
// batch func
4465
void* streamStateCreateBatch() {
8,996✔
4466
  rocksdb_writebatch_t* pBatch = rocksdb_writebatch_create();
8,996✔
4467
  return pBatch;
8,998✔
4468
}
4469
int32_t streamStateGetBatchSize(void* pBatch) {
1,813,200✔
4470
  if (pBatch == NULL) return 0;
1,813,200!
4471
  return rocksdb_writebatch_count(pBatch);
1,813,200✔
4472
}
4473

4474
void    streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_writebatch_t*)pBatch); }
8,463✔
4475
void    streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); }
8,997✔
4476
int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb_writebatch_t* pBatch, void* key,
4,558✔
4477
                            void* val, int32_t vlen, int64_t ttl) {
4478
  int32_t         code = 0;
4,558✔
4479
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
4,558✔
4480
  TAOS_UNUSED(atomic_add_fetch_64(&wrapper->dataWritten, 1));
4,558✔
4481

4482
  int i = streamStateGetCfIdx(pState, cfKeyName);
4,559✔
4483
  if (i < 0) {
4,559!
4484
    stError("streamState failed to put to cf name:%s", cfKeyName);
×
4485
    return -1;
×
4486
  }
4487

4488
  char    buf[128] = {0};
4,559✔
4489
  int32_t klen = ginitDict[i].enFunc((void*)key, buf);
4,559✔
4490

4491
  char*   ttlV = NULL;
4,557✔
4492
  int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, ttl, &ttlV);
4,557✔
4493

4494
  rocksdb_column_family_handle_t* pCf = wrapper->pCf[ginitDict[i].idx];
4,557✔
4495
  rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
4,557✔
4496
  taosMemoryFree(ttlV);
4,559!
4497

4498
  {
4499
    char tbuf[256] = {0};
4,559✔
4500
    TAOS_UNUSED(ginitDict[i].toStrFunc((void*)key, tbuf));
4,559✔
4501
    stTrace("streamState str: %s succ to write to %s_%s, len: %d", tbuf, wrapper->idstr, ginitDict[i].key, vlen);
4,559✔
4502
  }
4503
  return 0;
4,559✔
4504
}
4505

4506
int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb_writebatch_t* pBatch, void* key,
1,807,918✔
4507
                                    void* val, int32_t vlen, int64_t ttl, void* tmpBuf) {
4508
  int32_t code = 0;
1,807,918✔
4509
  char    buf[128] = {0};
1,807,918✔
4510

4511
  char*  dst = NULL;
1,807,918✔
4512
  size_t size = 0;
1,807,918✔
4513
  if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) {
1,807,918✔
4514
    dst = val;
394✔
4515
    size = vlen;
394✔
4516
  } else {
4517
    code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, val, vlen, &dst, &size);
1,807,524✔
4518
    if (code != 0) {
1,804,242!
4519
      return code;
×
4520
    }
4521
  }
4522
  int32_t klen = ginitDict[cfIdx].enFunc((void*)key, buf);
1,804,636✔
4523
  char*   ttlV = tmpBuf;
1,805,377✔
4524
  int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(dst, size, ttl, &ttlV);
1,805,377✔
4525

4526
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
1,807,068✔
4527

4528
  TAOS_UNUSED(atomic_add_fetch_64(&wrapper->dataWritten, 1));
1,807,068✔
4529

4530
  rocksdb_column_family_handle_t* pCf = wrapper->pCf[ginitDict[cfIdx].idx];
1,808,309✔
4531
  rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
1,808,309✔
4532

4533
  if (pState->pResultRowStore.resultRowPut != NULL && pState->pExprSupp != NULL) {
1,808,449!
4534
    taosMemoryFree(dst);
1,808,106!
4535
  }
4536

4537
  if (tmpBuf == NULL) {
1,808,436!
4538
    taosMemoryFree(ttlV);
×
4539
  }
4540

4541
  {
4542
    char tbuf[256] = {0};
1,808,436✔
4543
    TAOS_UNUSED(ginitDict[cfIdx].toStrFunc((void*)key, tbuf));
1,808,436✔
4544
    stTrace("streamState str: %s succ to write to %s_%s", tbuf, wrapper->idstr, ginitDict[cfIdx].key);
1,808,204✔
4545
  }
4546
  return 0;
1,808,212✔
4547
}
4548
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) {
13,025✔
4549
  char*           err = NULL;
13,025✔
4550
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
13,025✔
4551
  TAOS_UNUSED(atomic_add_fetch_64(&wrapper->dataWritten, 1));
13,025✔
4552
  rocksdb_write(wrapper->db, wrapper->writeOpt, (rocksdb_writebatch_t*)pBatch, &err);
13,025✔
4553
  if (err != NULL) {
13,025!
4554
    stError("streamState failed to write batch, err:%s", err);
×
4555
    taosMemoryFree(err);
×
4556
    return -1;
×
4557
  } else {
4558
    stDebug("write batch to backend:%p", wrapper->db);
13,025✔
4559
  }
4560
  return 0;
13,025✔
4561
}
4562
uint32_t nextPow2(uint32_t x) {
13✔
4563
  if (x <= 1) return 2;
13✔
4564
  x = x - 1;
11✔
4565
  x = x | (x >> 1);
11✔
4566
  x = x | (x >> 2);
11✔
4567
  x = x | (x >> 4);
11✔
4568
  x = x | (x >> 8);
11✔
4569
  x = x | (x >> 16);
11✔
4570
  return x + 1;
11✔
4571
}
4572

4573
#ifdef BUILD_NO_CALL
4574
int32_t copyFiles(const char* src, const char* dst) {
4575
  int32_t code = 0;
4576
  // opt later, just hard link
4577
  int32_t sLen = strlen(src);
4578
  int32_t dLen = strlen(dst);
4579
  char*   srcName = taosMemoryCalloc(1, sLen + 64);
4580
  char*   dstName = taosMemoryCalloc(1, dLen + 64);
4581

4582
  TdDirPtr pDir = taosOpenDir(src);
4583
  if (pDir == NULL) {
4584
    taosMemoryFree(srcName);
4585
    taosMemoryFree(dstName);
4586
    return -1;
4587
  }
4588

4589
  TdDirEntryPtr de = NULL;
4590
  while ((de = taosReadDir(pDir)) != NULL) {
4591
    char* name = taosGetDirEntryName(de);
4592
    if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue;
4593

4594
    sprintf(srcName, "%s%s%s", src, TD_DIRSEP, name);
4595
    sprintf(dstName, "%s%s%s", dst, TD_DIRSEP, name);
4596
    if (!taosDirEntryIsDir(de)) {
4597
      code = taosCopyFile(srcName, dstName);
4598
      if (code == -1) {
4599
        goto _err;
4600
      }
4601
    }
4602

4603
    memset(srcName, 0, sLen + 64);
4604
    memset(dstName, 0, dLen + 64);
4605
  }
4606

4607
_err:
4608
  taosMemoryFreeClear(srcName);
4609
  taosMemoryFreeClear(dstName);
4610
  taosCloseDir(&pDir);
4611
  return code >= 0 ? 0 : -1;
4612
}
4613
#endif
4614

4615
int32_t isBkdDataMeta(char* name, int32_t len) {
12✔
4616
  const char* pCurrent = "CURRENT";
12✔
4617
  int32_t     currLen = strlen(pCurrent);
12✔
4618

4619
  const char* pManifest = "MANIFEST-";
12✔
4620
  int32_t     maniLen = strlen(pManifest);
12✔
4621

4622
  if (len >= maniLen && strncmp(name, pManifest, maniLen) == 0) {
12!
4623
    return 1;
×
4624
  } else if (len == currLen && strcmp(name, pCurrent) == 0) {
12!
4625
    return 1;
×
4626
  }
4627
  return 0;
12✔
4628
}
4629
int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) {
1✔
4630
  int32_t code = 0;
1✔
4631
  size_t  len = 0;
1✔
4632
  void*   pIter = taosHashIterate(p2, NULL);
1✔
4633
  while (pIter) {
7✔
4634
    char* name = taosHashGetKey(pIter, &len);
6✔
4635
    if (!isBkdDataMeta(name, len) && !taosHashGet(p1, name, len)) {
6!
4636
      int32_t cap = len + 1;
×
4637
      char*   fname = taosMemoryCalloc(1, cap);
×
4638
      if (fname == NULL) {
×
4639
        return terrno;
×
4640
      }
4641
      tstrncpy(fname, name, cap);
×
4642
      if (taosArrayPush(diff, &fname) == NULL) {
×
4643
        taosMemoryFree(fname);
×
4644
        return terrno;
×
4645
      }
4646
    }
4647
    pIter = taosHashIterate(p2, pIter);
6✔
4648
  }
4649
  return code;
1✔
4650
}
4651
int32_t compareHashTable(SHashObj* p1, SHashObj* p2, SArray* add, SArray* del) {
1✔
4652
  int32_t code = 0;
1✔
4653

4654
  code = compareHashTableImpl(p1, p2, add);
1✔
4655
  if (code != 0) {
1!
4656
    code = compareHashTableImpl(p2, p1, del);
×
4657
  }
4658

4659
  return code;
1✔
4660
}
4661

4662
void hashTableToDebug(SHashObj* pTbl, char** buf) {
4✔
4663
  size_t  sz = taosHashGetSize(pTbl);
4✔
4664
  int32_t total = 0;
4✔
4665
  int32_t cap = sz * 16 + 4;
4✔
4666

4667
  char* p = taosMemoryCalloc(1, cap);
4!
4668
  if (p == NULL) {
4!
4669
    stError("failed to alloc memory for stream snapshot debug info");
×
4670
    return;
×
4671
  }
4672

4673
  void* pIter = taosHashIterate(pTbl, NULL);
4✔
4674
  while (pIter) {
22✔
4675
    size_t len = 0;
18✔
4676
    char*  name = taosHashGetKey(pIter, &len);
18✔
4677
    if (name == NULL || len <= 0) {
18!
4678
      pIter = taosHashIterate(pTbl, pIter);
×
4679
      continue;
×
4680
    }
4681
    int32_t left = cap - strlen(p);
18✔
4682
    int32_t nBytes = snprintf(p + total, left, "%s,", name);
18✔
4683
    if (nBytes <= 0 || nBytes >= left) {
18!
4684
      stError("failed to debug snapshot info since %s", tstrerror(TSDB_CODE_OUT_OF_RANGE));
×
4685
      taosMemoryFree(p);
×
4686
      return;
×
4687
    }
4688

4689
    pIter = taosHashIterate(pTbl, pIter);
18✔
4690
    total += nBytes;
18✔
4691
  }
4692
  if (total > 0) {
4✔
4693
    p[total - 1] = 0;
3✔
4694
  }
4695
  *buf = p;
4✔
4696
}
4697
void strArrayDebugInfo(SArray* pArr, char** buf) {
4✔
4698
  int32_t sz = taosArrayGetSize(pArr);
4✔
4699
  if (sz <= 0) return;
4✔
4700

4701
  int32_t code = 0;
1✔
4702
  int32_t total = 0, nBytes = 0;
1✔
4703
  int32_t cap = 64 + sz * 64;
1✔
4704

4705
  char* p = (char*)taosMemoryCalloc(1, cap);
1!
4706
  if (p == NULL) {
1!
4707
    stError("failed to alloc memory for stream snapshot debug info");
×
4708
    return;
×
4709
  }
4710

4711
  for (int i = 0; i < sz; i++) {
7✔
4712
    char*   name = taosArrayGetP(pArr, i);
6✔
4713
    int32_t left = cap - strlen(p);
6✔
4714
    nBytes = snprintf(p + total, left, "%s,", name);
6✔
4715
    if (nBytes <= 0 || nBytes >= left) {
6!
4716
      code = TSDB_CODE_OUT_OF_RANGE;
×
4717
      stError("failed to debug snapshot info since %s", tstrerror(code));
×
4718
      taosMemoryFree(p);
×
4719
      return;
×
4720
    }
4721

4722
    total += nBytes;
6✔
4723
  }
4724

4725
  p[total - 1] = 0;
1✔
4726

4727
  *buf = p;
1✔
4728
}
4729
void dbChkpDebugInfo(SDbChkp* pDb) {
2✔
4730
  if (stDebugFlag & DEBUG_INFO) {
2!
4731
    char* p[4] = {NULL};
2✔
4732

4733
    hashTableToDebug(pDb->pSstTbl[pDb->idx], &p[0]);
2✔
4734
    if (p[0]) stTrace("chkp previous file: [%s]", p[0]);
2!
4735

4736
    hashTableToDebug(pDb->pSstTbl[1 - pDb->idx], &p[1]);
2✔
4737
    if (p[1]) stTrace("chkp curr file: [%s]", p[1]);
2!
4738

4739
    strArrayDebugInfo(pDb->pAdd, &p[2]);
2✔
4740
    if (p[2]) stTrace("chkp newly addded file: [%s]", p[2]);
2!
4741

4742
    strArrayDebugInfo(pDb->pDel, &p[3]);
2✔
4743
    if (p[3]) stTrace("chkp newly deleted file: [%s]", p[3]);
2!
4744

4745
    for (int i = 0; i < 4; i++) {
10✔
4746
      taosMemoryFree(p[i]);
8!
4747
    }
4748
  }
4749
}
2✔
4750
int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) {
2✔
4751
  int32_t code = 0;
2✔
4752
  int32_t nBytes;
4753
  TAOS_UNUSED(taosThreadRwlockWrlock(&p->rwLock));
2✔
4754

4755
  p->preCkptId = p->curChkpId;
2✔
4756
  p->curChkpId = chkpId;
2✔
4757
  const char* pCurrent = "CURRENT";
2✔
4758
  int32_t     currLen = strlen(pCurrent);
2✔
4759

4760
  const char* pManifest = "MANIFEST-";
2✔
4761
  int32_t     maniLen = strlen(pManifest);
2✔
4762

4763
  const char* pSST = ".sst";
2✔
4764
  int32_t     sstLen = strlen(pSST);
2✔
4765

4766
  memset(p->buf, 0, p->len);
2✔
4767

4768
  nBytes =
4769
      snprintf(p->buf, p->len, "%s%s%s%scheckpoint%" PRId64 "", p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId);
2✔
4770
  if (nBytes <= 0 || nBytes >= p->len) {
2!
4771
    TAOS_UNUSED(taosThreadRwlockUnlock(&p->rwLock));
×
4772
    return TSDB_CODE_OUT_OF_RANGE;
×
4773
  }
4774

4775
  taosArrayClearP(p->pAdd, NULL);
2✔
4776
  taosArrayClearP(p->pDel, NULL);
2✔
4777
  taosHashClear(p->pSstTbl[1 - p->idx]);
2✔
4778

4779
  TdDirPtr pDir = taosOpenDir(p->buf);
2✔
4780
  if (pDir == NULL) {
2!
4781
    TAOS_UNUSED(taosThreadRwlockUnlock(&p->rwLock));
×
4782
    return terrno;
×
4783
  }
4784

4785
  TdDirEntryPtr de = NULL;
2✔
4786
  int8_t        dummy = 0;
2✔
4787
  while ((de = taosReadDir(pDir)) != NULL) {
26✔
4788
    char* name = taosGetDirEntryName(de);
24✔
4789
    if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue;
24✔
4790
    if (strlen(name) == currLen && strcmp(name, pCurrent) == 0) {
20!
4791
      taosMemoryFreeClear(p->pCurrent);
2!
4792

4793
      p->pCurrent = taosStrdup(name);
2!
4794
      if (p->pCurrent == NULL) {
2!
4795
        code = terrno;
×
4796
        break;
×
4797
      }
4798
      continue;
2✔
4799
    }
4800

4801
    if (strlen(name) >= maniLen && strncmp(name, pManifest, maniLen) == 0) {
18✔
4802
      taosMemoryFreeClear(p->pManifest);
2!
4803
      p->pManifest = taosStrdup(name);
2!
4804
      if (p->pManifest == NULL) {
2!
4805
        code = terrno;
×
4806
        break;
×
4807
      }
4808
      continue;
2✔
4809
    }
4810
    if (strlen(name) >= sstLen && strncmp(name + strlen(name) - 4, pSST, sstLen) == 0) {
16!
4811
      if (taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy)) != 0) {
12!
4812
        break;
×
4813
      }
4814
      continue;
12✔
4815
    }
4816
  }
4817
  TAOS_UNUSED(taosCloseDir(&pDir));
2✔
4818
  if (code != 0) {
2!
4819
    TAOS_UNUSED(taosThreadRwlockUnlock(&p->rwLock));
×
4820
    return code;
×
4821
  }
4822

4823
  if (p->init == 0) {
2✔
4824
    void* pIter = taosHashIterate(p->pSstTbl[1 - p->idx], NULL);
1✔
4825
    while (pIter) {
7✔
4826
      size_t len = 0;
6✔
4827
      char*  name = taosHashGetKey(pIter, &len);
6✔
4828
      if (name != NULL && !isBkdDataMeta(name, len)) {
6!
4829
        int32_t cap = len + 1;
6✔
4830
        char*   fname = taosMemoryCalloc(1, cap);
6!
4831
        if (fname == NULL) {
6!
4832
          TAOS_UNUSED(taosThreadRwlockUnlock(&p->rwLock));
×
4833
          return terrno;
×
4834
        }
4835

4836
        tstrncpy(fname, name, cap);
6✔
4837
        if (taosArrayPush(p->pAdd, &fname) == NULL) {
12!
4838
          taosMemoryFree(fname);
×
4839
          TAOS_UNUSED(taosThreadRwlockUnlock(&p->rwLock));
×
4840
          return terrno;
×
4841
        }
4842
      }
4843
      pIter = taosHashIterate(p->pSstTbl[1 - p->idx], pIter);
6✔
4844
    }
4845
    if (taosArrayGetSize(p->pAdd) > 0) p->update = 1;
1!
4846

4847
    p->init = 1;
1✔
4848
    p->preCkptId = -1;
1✔
4849
    p->curChkpId = chkpId;
1✔
4850
  } else {
4851
    int32_t code = compareHashTable(p->pSstTbl[p->idx], p->pSstTbl[1 - p->idx], p->pAdd, p->pDel);
1✔
4852
    if (code != 0) {
1!
4853
      // dead code
4854
      taosArrayClearP(p->pAdd, NULL);
×
4855
      taosArrayClearP(p->pDel, NULL);
×
4856
      taosHashClear(p->pSstTbl[1 - p->idx]);
×
4857
      p->update = 0;
×
4858
      return code;
×
4859
    }
4860

4861
    if (taosArrayGetSize(p->pAdd) == 0 && taosArrayGetSize(p->pDel) == 0) {
1!
4862
      p->update = 0;
1✔
4863
    }
4864

4865
    p->preCkptId = p->curChkpId;
1✔
4866
    p->curChkpId = chkpId;
1✔
4867
  }
4868

4869
  dbChkpDebugInfo(p);
2✔
4870

4871
  p->idx = 1 - p->idx;
2✔
4872

4873
  TAOS_UNUSED(taosThreadRwlockUnlock(&p->rwLock));
2✔
4874

4875
  return code;
2✔
4876
}
4877

4878
void dbChkpDestroy(SDbChkp* pChkp);
4879

4880
int32_t dbChkpCreate(char* path, int64_t initChkpId, SDbChkp** ppChkp) {
1✔
4881
  int32_t  code = 0;
1✔
4882
  SDbChkp* p = taosMemoryCalloc(1, sizeof(SDbChkp));
1!
4883
  if (p == NULL) {
1!
4884
    code = terrno;
×
4885
    goto _EXIT;
×
4886
  }
4887

4888
  p->curChkpId = initChkpId;
1✔
4889
  p->preCkptId = -1;
1✔
4890
  p->pSST = taosArrayInit(64, sizeof(void*));
1✔
4891
  if (p->pSST == NULL) {
1!
4892
    code = terrno;
×
4893
    dbChkpDestroy(p);
×
4894
    return code;
×
4895
  }
4896

4897
  p->path = path;
1✔
4898
  p->len = strlen(path) + 128;
1✔
4899
  p->buf = taosMemoryCalloc(1, p->len);
1!
4900
  if (p->buf == NULL) {
1!
4901
    code = terrno;
×
4902
    goto _EXIT;
×
4903
  }
4904

4905
  p->idx = 0;
1✔
4906
  p->pSstTbl[0] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
1✔
4907
  if (p->pSstTbl[0] == NULL) {
1!
4908
    code = terrno;
×
4909
    goto _EXIT;
×
4910
  }
4911

4912
  p->pSstTbl[1] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
1✔
4913
  if (p->pSstTbl[1] == NULL) {
1!
4914
    code = terrno;
×
4915
    goto _EXIT;
×
4916
  }
4917

4918
  p->pAdd = taosArrayInit(64, sizeof(void*));
1✔
4919
  if (p->pAdd == NULL) {
1!
4920
    code = terrno;
×
4921
    goto _EXIT;
×
4922
  }
4923

4924
  p->pDel = taosArrayInit(64, sizeof(void*));
1✔
4925
  if (p->pDel == NULL) {
1!
4926
    code = terrno;
×
4927
    goto _EXIT;
×
4928
  }
4929

4930
  p->update = 0;
1✔
4931
  TAOS_UNUSED(taosThreadRwlockInit(&p->rwLock, NULL));
1✔
4932

4933
  SArray* list = NULL;
1✔
4934
  code = dbChkpGetDelta(p, initChkpId, list);
1✔
4935
  if (code != 0) {
1!
4936
    goto _EXIT;
×
4937
  }
4938
  *ppChkp = p;
1✔
4939
  return code;
1✔
4940
_EXIT:
×
4941
  dbChkpDestroy(p);
×
4942
  return code;
×
4943
}
4944

4945
void dbChkpDestroy(SDbChkp* pChkp) {
1✔
4946
  if (pChkp == NULL) return;
1!
4947

4948
  taosMemoryFree(pChkp->buf);
1!
4949
  taosMemoryFree(pChkp->path);
1!
4950

4951
  taosArrayDestroyP(pChkp->pSST, NULL);
1✔
4952
  taosArrayDestroyP(pChkp->pAdd, NULL);
1✔
4953
  taosArrayDestroyP(pChkp->pDel, NULL);
1✔
4954

4955
  taosHashCleanup(pChkp->pSstTbl[0]);
1✔
4956
  taosHashCleanup(pChkp->pSstTbl[1]);
1✔
4957

4958
  taosMemoryFree(pChkp->pCurrent);
1!
4959
  taosMemoryFree(pChkp->pManifest);
1!
4960
  taosMemoryFree(pChkp);
1!
4961
}
4962
#ifdef BUILD_NO_CALL
4963
int32_t dbChkpInit(SDbChkp* p) {
4964
  if (p == NULL) return 0;
4965
  return 0;
4966
}
4967
#endif
4968
int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) {
2✔
4969
  static char* chkpMeta = "META";
4970
  int32_t      code = 0;
2✔
4971

4972
  TAOS_UNUSED(taosThreadRwlockRdlock(&p->rwLock));
2✔
4973

4974
  int32_t cap = p->len + 128;
2✔
4975

4976
  char* buffer = taosMemoryCalloc(4, cap);
2!
4977
  if (buffer == NULL) {
2!
4978
    code = terrno;
×
4979
    goto _ERROR;
×
4980
  }
4981

4982
  char* srcBuf = buffer;
2✔
4983
  char* dstBuf = &srcBuf[cap];
2✔
4984
  char* srcDir = &dstBuf[cap];
2✔
4985
  char* dstDir = &srcDir[cap];
2✔
4986

4987
  int nBytes = snprintf(srcDir, cap, "%s%s%s%s%s%" PRId64 "", p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP,
2✔
4988
                        "checkpoint", p->curChkpId);
4989
  if (nBytes <= 0 || nBytes >= cap) {
2!
4990
    code = TSDB_CODE_OUT_OF_RANGE;
×
4991
    goto _ERROR;
×
4992
  }
4993

4994
  nBytes = snprintf(dstDir, cap, "%s", dname);
2✔
4995
  if (nBytes <= 0 || nBytes >= cap) {
2!
4996
    code = TSDB_CODE_OUT_OF_RANGE;
×
4997
    goto _ERROR;
×
4998
  }
4999

5000
  if (!taosDirExist(srcDir)) {
2!
5001
    stError("failed to dump srcDir %s, reason: not exist such dir", srcDir);
×
5002
    code = TSDB_CODE_INVALID_PARA;
×
5003
    goto _ERROR;
×
5004
  }
5005
  int64_t chkpId = 0, processId = -1;
2✔
5006
  code = chkpLoadExtraInfo(srcDir, &chkpId, &processId);
2✔
5007
  if (code < 0) {
2!
5008
    stError("failed to load extra info from %s, reason:%s", srcDir, code != 0 ? "unkown" : tstrerror(code));
×
5009

5010
    goto _ERROR;
×
5011
  }
5012

5013
  // add file to $name dir
5014
  for (int i = 0; i < taosArrayGetSize(p->pAdd); i++) {
8✔
5015
    memset(srcBuf, 0, cap);
6✔
5016
    memset(dstBuf, 0, cap);
6✔
5017

5018
    char* filename = taosArrayGetP(p->pAdd, i);
6✔
5019
    nBytes = snprintf(srcBuf, cap, "%s%s%s", srcDir, TD_DIRSEP, filename);
6✔
5020
    if (nBytes <= 0 || nBytes >= cap) {
6!
5021
      code = TSDB_CODE_OUT_OF_RANGE;
×
5022
      goto _ERROR;
×
5023
    }
5024

5025
    nBytes = snprintf(dstBuf, cap, "%s%s%s", dstDir, TD_DIRSEP, filename);
6✔
5026
    if (nBytes <= 0 || nBytes >= cap) {
6!
5027
      code = TSDB_CODE_OUT_OF_RANGE;
×
5028
      goto _ERROR;
×
5029
    }
5030

5031
    if (taosCopyFile(srcBuf, dstBuf) < 0) {
6!
5032
      code = TAOS_SYSTEM_ERROR(errno);
×
5033
      stError("failed to copy file from %s to %s, reason:%s", srcBuf, dstBuf, tstrerror(code));
×
5034
      goto _ERROR;
×
5035
    }
5036
  }
5037
  // del file in $name
5038
  for (int i = 0; i < taosArrayGetSize(p->pDel); i++) {
2!
5039
    char* filename = taosArrayGetP(p->pDel, i);
×
5040
    char* p = taosStrdup(filename);
×
5041
    if (p == NULL) {
×
5042
      code = terrno;
×
5043
      goto _ERROR;
×
5044
    }
5045
    if (taosArrayPush(list, &p) == NULL) {
×
5046
      taosMemoryFree(p);
×
5047
      code = terrno;
×
5048
      goto _ERROR;
×
5049
    }
5050
  }
5051

5052
  // copy current file to dst dir
5053
  memset(srcBuf, 0, cap);
2✔
5054
  memset(dstBuf, 0, cap);
2✔
5055

5056
  nBytes = snprintf(srcBuf, cap, "%s%s%s", srcDir, TD_DIRSEP, p->pCurrent);
2✔
5057
  if (nBytes <= 0 || nBytes >= cap) {
2!
5058
    code = TSDB_CODE_OUT_OF_RANGE;
×
5059
    goto _ERROR;
×
5060
  }
5061

5062
  nBytes = snprintf(dstBuf, cap, "%s%s%s_%" PRId64 "", dstDir, TD_DIRSEP, p->pCurrent, p->curChkpId);
2✔
5063
  if (nBytes <= 0 || nBytes >= cap) {
2!
5064
    code = TSDB_CODE_OUT_OF_RANGE;
×
5065
    goto _ERROR;
×
5066
  }
5067

5068
  if (taosCopyFile(srcBuf, dstBuf) < 0) {
2!
5069
    code = TAOS_SYSTEM_ERROR(errno);
×
5070
    stError("failed to copy file from %s to %s, reason:%s", srcBuf, dstBuf, tstrerror(code));
×
5071
    goto _ERROR;
×
5072
  }
5073

5074
  // copy manifest file to dst dir
5075
  memset(srcBuf, 0, cap);
2✔
5076
  memset(dstBuf, 0, cap);
2✔
5077

5078
  nBytes = snprintf(srcBuf, cap, "%s%s%s", srcDir, TD_DIRSEP, p->pManifest);
2✔
5079
  if (nBytes <= 0 || nBytes >= cap) {
2!
5080
    code = TSDB_CODE_OUT_OF_RANGE;
×
5081
    goto _ERROR;
×
5082
  }
5083

5084
  nBytes = snprintf(dstBuf, cap, "%s%s%s_%" PRId64 "", dstDir, TD_DIRSEP, p->pManifest, p->curChkpId);
2✔
5085
  if (nBytes <= 0 || nBytes >= cap) {
2!
5086
    code = TSDB_CODE_OUT_OF_RANGE;
×
5087
    goto _ERROR;
×
5088
  }
5089

5090
  if (taosCopyFile(srcBuf, dstBuf) < 0) {
2!
5091
    code = terrno;
×
5092
    stError("failed to copy file from %s to %s, reason:%s", srcBuf, dstBuf, tstrerror(code));
×
5093
    goto _ERROR;
×
5094
  }
5095
  memset(dstBuf, 0, cap);
2✔
5096
  nBytes = snprintf(dstBuf, cap, "%s%s%s", dstDir, TD_DIRSEP, chkpMeta);
2✔
5097
  if (nBytes <= 0 || nBytes >= cap) {
2!
5098
    code = TSDB_CODE_OUT_OF_RANGE;
×
5099
    goto _ERROR;
×
5100
  }
5101

5102
  TdFilePtr pFile = taosOpenFile(dstBuf, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
2✔
5103
  if (pFile == NULL) {
2!
5104
    code = terrno;
×
5105
    stError("chkp failed to create meta file: %s, reason:%s", dstDir, tstrerror(code));
×
5106
    goto _ERROR;
×
5107
  }
5108

5109
  char content[256] = {0};
2✔
5110
  nBytes = tsnprintf(content, sizeof(content), META_ON_S3_FORMATE, p->pCurrent, p->curChkpId, p->pManifest,
2✔
5111
                     p->curChkpId, "processVer", processId);
5112
  if (nBytes <= 0 || nBytes >= sizeof(content)) {
2!
5113
    code = TSDB_CODE_OUT_OF_RANGE;
×
5114
    stError("chkp failed to format meta file: %s, reason: invalid msg", dstDir);
×
5115
    TAOS_UNUSED(taosCloseFile(&pFile));
×
5116
    goto _ERROR;
×
5117
  }
5118

5119
  nBytes = taosWriteFile(pFile, content, strlen(content));
2✔
5120
  if (nBytes != strlen(content)) {
2!
5121
    code = terrno;
×
5122
    stError("chkp failed to write meta file: %s,reason:%s", dstDir, tstrerror(code));
×
5123
    TAOS_UNUSED(taosCloseFile(&pFile));
×
5124
    goto _ERROR;
×
5125
  }
5126
  TAOS_UNUSED(taosCloseFile(&pFile));
2✔
5127

5128
  // clear delta data buf
5129
  taosArrayClearP(p->pAdd, NULL);
2✔
5130
  taosArrayClearP(p->pDel, NULL);
2✔
5131
  code = 0;
2✔
5132

5133
_ERROR:
2✔
5134
  taosMemoryFree(buffer);
2!
5135
  TAOS_UNUSED(taosThreadRwlockUnlock(&p->rwLock));
2✔
5136
  return code;
2✔
5137
}
5138

5139
int32_t bkdMgtCreate(char* path, SBkdMgt** mgt) {
10,088✔
5140
  int32_t  code = 0;
10,088✔
5141
  SBkdMgt* p = taosMemoryCalloc(1, sizeof(SBkdMgt));
10,088!
5142
  if (p == NULL) {
10,092!
5143
    return terrno;
×
5144
  }
5145

5146
  p->pDbChkpTbl = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
10,092✔
5147
  if (p->pDbChkpTbl == NULL) {
10,092!
5148
    code = terrno;
×
5149
    bkdMgtDestroy(p);
×
5150
    return code;
×
5151
  }
5152

5153
  p->path = taosStrdup(path);
10,092!
5154
  if (p->path == NULL) {
10,092!
5155
    code = terrno;
×
5156
    bkdMgtDestroy(p);
×
5157
    return code;
×
5158
  }
5159

5160
  if (taosThreadRwlockInit(&p->rwLock, NULL) != 0) {
10,092!
5161
    code = TAOS_SYSTEM_ERROR(errno);
×
5162
    bkdMgtDestroy(p);
×
5163
    return code;
×
5164
  }
5165
  *mgt = p;
10,092✔
5166

5167
  return code;
10,092✔
5168
}
5169

5170
void bkdMgtDestroy(SBkdMgt* bm) {
10,089✔
5171
  if (bm == NULL) return;
10,089!
5172
  void* pIter = taosHashIterate(bm->pDbChkpTbl, NULL);
10,089✔
5173
  while (pIter) {
10,090✔
5174
    SDbChkp* pChkp = *(SDbChkp**)(pIter);
1✔
5175
    dbChkpDestroy(pChkp);
1✔
5176

5177
    pIter = taosHashIterate(bm->pDbChkpTbl, pIter);
1✔
5178
  }
5179

5180
  TAOS_UNUSED(taosThreadRwlockDestroy(&bm->rwLock));
10,089✔
5181
  taosMemoryFree(bm->path);
10,089!
5182
  taosHashCleanup(bm->pDbChkpTbl);
10,090✔
5183

5184
  taosMemoryFree(bm);
10,090!
5185
}
5186
int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, char* dname) {
2✔
5187
  int32_t code = 0;
2✔
5188
  TAOS_UNUSED(taosThreadRwlockWrlock(&bm->rwLock));
2✔
5189
  SDbChkp** ppChkp = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId));
2✔
5190
  SDbChkp*  pChkp = ppChkp != NULL ? *ppChkp : NULL;
2✔
5191

5192
  if (pChkp == NULL) {
2✔
5193
    int32_t cap = strlen(bm->path) + 64;
1✔
5194
    char*   path = taosMemoryCalloc(1, cap);
1!
5195
    if (path == NULL) {
1!
5196
      TAOS_UNUSED(taosThreadRwlockUnlock(&bm->rwLock));
×
5197
      return terrno;
×
5198
    }
5199

5200
    int32_t nBytes = snprintf(path, cap, "%s%s%s", bm->path, TD_DIRSEP, taskId);
1✔
5201
    if (nBytes <= 0 || nBytes >= cap) {
1!
5202
      taosMemoryFree(path);
×
5203
      TAOS_UNUSED(taosThreadRwlockUnlock(&bm->rwLock));
×
5204
      code = TSDB_CODE_OUT_OF_RANGE;
×
5205
      return code;
×
5206
    }
5207

5208
    SDbChkp* p = NULL;
1✔
5209
    code = dbChkpCreate(path, chkpId, &p);
1✔
5210
    if (code != 0) {
1!
5211
      taosMemoryFree(path);
×
5212
      TAOS_UNUSED(taosThreadRwlockUnlock(&bm->rwLock));
×
5213
      return code;
×
5214
    }
5215

5216
    if (taosHashPut(bm->pDbChkpTbl, taskId, strlen(taskId), &p, sizeof(void*)) != 0) {
1!
5217
      dbChkpDestroy(p);
×
5218
      TAOS_UNUSED(taosThreadRwlockUnlock(&bm->rwLock));
×
5219
      code = terrno;
×
5220
      return code;
×
5221
    }
5222

5223
    pChkp = p;
1✔
5224
    code = dbChkpDumpTo(pChkp, dname, list);
1✔
5225
    TAOS_UNUSED(taosThreadRwlockUnlock(&bm->rwLock));
1✔
5226
    return code;
1✔
5227
  } else {
5228
    code = dbChkpGetDelta(pChkp, chkpId, NULL);
1✔
5229
    if (code == 0) {
1!
5230
      code = dbChkpDumpTo(pChkp, dname, list);
1✔
5231
    }
5232
  }
5233

5234
  TAOS_UNUSED(taosThreadRwlockUnlock(&bm->rwLock));
1✔
5235
  return code;
1✔
5236
}
5237

5238
#ifdef BUILD_NO_CALL
5239
int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path) {
5240
  int32_t code = -1;
5241

5242
  taosThreadRwlockWrlock(&bm->rwLock);
5243
  SDbChkp** pp = taosHashGet(bm->pDbChkpTbl, task, strlen(task));
5244
  if (pp == NULL) {
5245
    SDbChkp* p = NULL;
5246
    code = dbChkpCreate(path, 0, &p);
5247
    if (code != 0) {
5248
      taosHashPut(bm->pDbChkpTbl, task, strlen(task), &p, sizeof(void*));
5249
      code = 0;
5250
    }
5251
  } else {
5252
    stError("task chkp already exists");
5253
  }
5254

5255
  taosThreadRwlockUnlock(&bm->rwLock);
5256

5257
  return code;
5258
}
5259

5260
int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname) {
5261
  int32_t code = 0;
5262
  taosThreadRwlockRdlock(&bm->rwLock);
5263

5264
  SDbChkp* p = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId));
5265
  code = dbChkpDumpTo(p, dname, NULL);
5266

5267
  taosThreadRwlockUnlock(&bm->rwLock);
5268
  return code;
5269
}
5270
#endif
5271

5272
SStreamStateCur* streamStateSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) {
25✔
5273
  stDebug("streamStateSeekKeyPrev_rocksdb");
25!
5274
  STaskDbWrapper*  wrapper = pState->pTdbState->pOwner->pBackend;
25✔
5275
  SStreamStateCur* pCur = createStreamStateCursor();
25✔
5276
  if (pCur == NULL) {
25!
5277
    return NULL;
×
5278
  }
5279

5280
  pCur->db = wrapper->db;
25✔
5281
  pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
50✔
5282
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
25✔
5283
  pCur->number = pState->number;
25✔
5284

5285
  char buf[128] = {0};
25✔
5286
  int  len = winKeyEncode((void*)key, buf);
25✔
5287
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
25✔
5288
    streamStateFreeCur(pCur);
23✔
5289
    return NULL;
23✔
5290
  }
5291
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
2!
5292
    rocksdb_iter_prev(pCur->iter);
×
5293
  }
5294

5295
  if (rocksdb_iter_valid(pCur->iter)) {
2!
5296
    SWinKey curKey;
5297
    size_t  kLen = 0;
2✔
5298
    char*   keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
2✔
5299
    TAOS_UNUSED(winKeyDecode((void*)&curKey, keyStr));
2✔
5300
    if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) > 0) {
2!
5301
      return pCur;
2✔
5302
    }
5303
    rocksdb_iter_prev(pCur->iter);
×
5304
    return pCur;
×
5305
  }
5306

5307
  streamStateFreeCur(pCur);
×
5308
  return NULL;
×
5309
}
5310

5311
int32_t streamStateGetGroupKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SWinKey* pKey,
25✔
5312
                                           const void** pVal, int32_t* pVLen) {
5313
  if (!pCur) {
25✔
5314
    return -1;
23✔
5315
  }
5316
  uint64_t groupId = pKey->groupId;
2✔
5317

5318
  int32_t code = streamStateGetKVByCur_rocksdb(pState, pCur, pKey, pVal, pVLen);
2✔
5319
  if (code == 0) {
2!
5320
    if (pKey->groupId == groupId) {
×
5321
      return 0;
×
5322
    }
5323
    if (pVal != NULL) {
×
5324
      taosMemoryFree((void*)*pVal);
×
5325
      *pVal = NULL;
×
5326
    }
5327
  }
5328
  return -1;
2✔
5329
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc