• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3768

28 Mar 2025 10:15AM UTC coverage: 33.726% (-0.3%) from 33.993%
#3768

push

travis-ci

happyguoxy
test:alter lcov result

144891 of 592084 branches covered (24.47%)

Branch coverage included in aggregate %.

218795 of 486283 relevant lines covered (44.99%)

765715.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

47.37
/source/libs/stream/src/streamBackendRocksdb.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamBackendRocksdb.h"
17
#include "lz4.h"
18
#include "streamInt.h"
19
#include "tcommon.h"
20
#include "tref.h"
21

22
typedef struct SCompactFilteFactory {
23
  void* status;
24
} SCompactFilteFactory;
25

26
typedef struct {
27
  rocksdb_t*                       db;
28
  rocksdb_column_family_handle_t** pHandle;
29
  rocksdb_writeoptions_t*          wOpt;
30
  rocksdb_readoptions_t*           rOpt;
31
  rocksdb_options_t**              cfOpt;
32
  rocksdb_options_t*               dbOpt;
33
  RocksdbCfParam*                  param;
34
  void*                            pBackend;
35
  SListNode*                       pCompareNode;
36
  rocksdb_comparator_t**           pCompares;
37
} RocksdbCfInst;
38

39
int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf);
40

41
void            destroyRocksdbCfInst(RocksdbCfInst* inst);
42
int32_t         getCfIdx(const char* cfName);
43
STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath);
44

45
static int32_t backendCopyFiles(const char* src, const char* dst);
46

47
void        destroyCompactFilteFactory(void* arg);
48
void        destroyCompactFilte(void* arg);
49
const char* compactFilteFactoryName(void* arg);
50
const char* compactFilteFactoryNameSess(void* arg);
51
const char* compactFilteFactoryNameState(void* arg);
52
const char* compactFilteFactoryNameFunc(void* arg);
53
const char* compactFilteFactoryNameFill(void* arg);
54

55
const char* compactFilteName(void* arg);
56
const char* compactFilteNameSess(void* arg);
57
const char* compactFilteNameState(void* arg);
58
const char* compactFilteNameFill(void* arg);
59
const char* compactFilteNameFunc(void* arg);
60

61
unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
62
                           char** newval, size_t* newvlen, unsigned char* value_changed);
63
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx);
64
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterSess(void* arg, rocksdb_compactionfiltercontext_t* ctx);
65
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterState(void* arg, rocksdb_compactionfiltercontext_t* ctx);
66
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFunc(void* arg, rocksdb_compactionfiltercontext_t* ctx);
67
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFill(void* arg, rocksdb_compactionfiltercontext_t* ctx);
68

69
typedef int (*__db_key_encode_fn_t)(void* key, char* buf);
70
typedef int (*__db_key_decode_fn_t)(void* key, char* buf);
71
typedef int (*__db_key_tostr_fn_t)(void* key, char* buf);
72
typedef const char* (*__db_key_cmpname_fn_t)(void* statue);
73
typedef int (*__db_key_cmp_fn_t)(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
74
typedef void (*__db_key_cmp_destroy_fn_t)(void* state);
75
typedef int32_t (*__db_value_encode_fn_t)(void* value, int32_t vlen, int64_t ttl, char** dest);
76
typedef int32_t (*__db_value_decode_fn_t)(void* value, int32_t vlen, int64_t* ttl, char** dest);
77

78
typedef rocksdb_compactionfilter_t* (*__db_factory_create_fn_t)(void* arg, rocksdb_compactionfiltercontext_t* ctx);
79
typedef const char* (*__db_factory_name_fn_t)(void* arg);
80
typedef void (*__db_factory_destroy_fn_t)(void* arg);
81
typedef struct {
82
  const char*               key;
83
  int32_t                   len;
84
  int                       idx;
85
  __db_key_cmp_fn_t         cmpKey;
86
  __db_key_encode_fn_t      enFunc;
87
  __db_key_decode_fn_t      deFunc;
88
  __db_key_tostr_fn_t       toStrFunc;
89
  __db_key_cmpname_fn_t     cmpName;
90
  __db_key_cmp_destroy_fn_t destroyCmp;
91
  __db_value_encode_fn_t    enValueFunc;
92
  __db_value_decode_fn_t    deValueFunc;
93

94
  __db_factory_create_fn_t  createFilter;
95
  __db_factory_destroy_fn_t destroyFilter;
96
  __db_factory_name_fn_t    funcName;
97

98
} SCfInit;
99

100
const char* compareDefaultName(void* name);
101
const char* compareStateName(void* name);
102
const char* compareWinKeyName(void* name);
103
const char* compareSessionKeyName(void* name);
104
const char* compareFuncKeyName(void* name);
105
const char* compareParKeyName(void* name);
106
const char* comparePartagKeyName(void* name);
107

108
int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
109
int defaultKeyEncode(void* k, char* buf);
110
int defaultKeyDecode(void* k, char* buf);
111
int defaultKeyToString(void* k, char* buf);
112

113
int stateKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
114
int stateKeyEncode(void* k, char* buf);
115
int stateKeyDecode(void* k, char* buf);
116
int stateKeyToString(void* k, char* buf);
117

118
int stateSessionKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
119
int stateSessionKeyEncode(void* ses, char* buf);
120
int stateSessionKeyDecode(void* ses, char* buf);
121
int stateSessionKeyToString(void* k, char* buf);
122

123
int winKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
124
int winKeyEncode(void* k, char* buf);
125
int winKeyDecode(void* k, char* buf);
126
int winKeyToString(void* k, char* buf);
127

128
int tupleKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
129
int tupleKeyEncode(void* k, char* buf);
130
int tupleKeyDecode(void* k, char* buf);
131
int tupleKeyToString(void* k, char* buf);
132

133
int parKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
134
int parKeyEncode(void* k, char* buf);
135
int parKeyDecode(void* k, char* buf);
136
int parKeyToString(void* k, char* buf);
137

138
int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest);
139
int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest);
140
int32_t valueToString(void* k, char* buf);
141
int32_t valueIsStale(void* k, int64_t ts);
142

143
void        destroyCompare(void* arg);
144
static void cleanDir(const char* pPath, const char* id);
145

146
static bool                streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len);
147
static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName,
148
                                                 rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt);
149

150
void taskDbRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp);
151
void taskDbUnRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp);
152

153
int32_t chkpAddExtraInfo(char* pChkpIdDir, int64_t chkpId, int64_t processId);
154
int32_t chkpLoadExtraInfo(char* pChkpIdDir, int64_t* chkpId, int64_t* processId);
155

156
int32_t  copyFiles(const char* src, const char* dst);
157
uint32_t nextPow2(uint32_t x);
158

159
SCfInit ginitDict[] = {
160
    {"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName,
161
     destroyCompare, valueEncode, valueDecode, compactFilteFactoryCreateFilter, destroyCompactFilteFactory,
162
     compactFilteFactoryName},
163

164
    {"state", 5, 1, stateKeyDBComp, stateKeyEncode, stateKeyDecode, stateKeyToString, compareStateName, destroyCompare,
165
     valueEncode, valueDecode, compactFilteFactoryCreateFilterState, destroyCompactFilteFactory,
166
     compactFilteFactoryNameState},
167

168
    {"fill", 4, 2, winKeyDBComp, winKeyEncode, winKeyDecode, winKeyToString, compareWinKeyName, destroyCompare,
169
     valueEncode, valueDecode, compactFilteFactoryCreateFilterFill, destroyCompactFilteFactory,
170
     compactFilteFactoryNameFill},
171

172
    {"sess", 4, 3, stateSessionKeyDBComp, stateSessionKeyEncode, stateSessionKeyDecode, stateSessionKeyToString,
173
     compareSessionKeyName, destroyCompare, valueEncode, valueDecode, compactFilteFactoryCreateFilterSess,
174
     destroyCompactFilteFactory, compactFilteFactoryNameSess},
175

176
    {"func", 4, 4, tupleKeyDBComp, tupleKeyEncode, tupleKeyDecode, tupleKeyToString, compareFuncKeyName, destroyCompare,
177
     valueEncode, valueDecode, compactFilteFactoryCreateFilterFunc, destroyCompactFilteFactory,
178
     compactFilteFactoryNameFunc},
179

180
    {"parname", 7, 5, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, compareParKeyName, destroyCompare,
181
     valueEncode, valueDecode, compactFilteFactoryCreateFilter, destroyCompactFilteFactory, compactFilteFactoryName},
182

183
    {"partag", 6, 6, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, comparePartagKeyName, destroyCompare,
184
     valueEncode, valueDecode, compactFilteFactoryCreateFilter, destroyCompactFilteFactory, compactFilteFactoryName},
185
};
186

187
int32_t getCfIdx(const char* cfName) {
55✔
188
  int    idx = -1;
55✔
189
  size_t len = strlen(cfName);
55✔
190
  for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
72!
191
    if (len == ginitDict[i].len && strncmp(cfName, ginitDict[i].key, strlen(cfName)) == 0) {
72✔
192
      idx = i;
55✔
193
      break;
55✔
194
    }
195
  }
196
  return idx;
55✔
197
}
198

199
bool isValidCheckpoint(const char* dir) {
3✔
200
  // not implement yet
201
  return true;
3✔
202
}
203

204
/*
205
 *copy pChkpIdDir's file to state dir
206
 */
207
int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
3✔
208
  // impl later
209
  int32_t code = 0;
3✔
210
  int32_t cap = strlen(path) + 64;
3✔
211
  int32_t nBytes = 0;
3✔
212

213
  char* state = taosMemoryCalloc(1, cap);
3!
214
  if (state == NULL) {
3!
215
    return terrno;
×
216
  }
217

218
  nBytes = snprintf(state, cap, "%s%s%s", path, TD_DIRSEP, "state");
3✔
219
  if (nBytes <= 0 || nBytes >= cap) {
3!
220
    taosMemoryFree(state);
×
221
    return TSDB_CODE_OUT_OF_RANGE;
×
222
  }
223

224
  if (chkpId != 0) {
3✔
225
    char* chkp = taosMemoryCalloc(1, cap);
2!
226
    if (chkp == NULL) {
2!
227
      taosMemoryFree(state);
×
228
      return terrno;
×
229
    }
230

231
    nBytes = snprintf(chkp, cap, "%s%s%s%scheckpoint%" PRId64, path, TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId);
2✔
232
    if (nBytes <= 0 || nBytes >= cap) {
2!
233
      taosMemoryFree(state);
×
234
      taosMemoryFree(chkp);
×
235
      return TSDB_CODE_OUT_OF_RANGE;
×
236
    }
237

238
    if (taosIsDir(chkp) && isValidCheckpoint(chkp)) {
2!
239
      cleanDir(state, "");
×
240
      code = backendCopyFiles(chkp, state);
×
241
      if (code != 0) {
×
242
        stError("failed to restart stream backend from %s, reason: %s", chkp, tstrerror(TAOS_SYSTEM_ERROR(code)));
×
243
      } else {
244
        stInfo("start to restart stream backend at checkpoint path: %s", chkp);
×
245
      }
246

247
    } else {
248
      stError("failed to start stream backend at %s, reason: %s, restart from default state dir:%s", chkp,
2!
249
              tstrerror(terrno), state);
250
      code = taosMkDir(state);
2✔
251
      if (code != 0) {
2!
252
        code = TAOS_SYSTEM_ERROR(ERRNO);
×
253
      }
254
    }
255

256
    taosMemoryFree(chkp);
2!
257
  }
258

259
  *dst = state;
3✔
260
  return code;
3✔
261
}
262

263
typedef struct {
264
  char    pCurrName[24];
265
  int64_t currChkptId;
266

267
  char    pManifestName[24];
268
  int64_t manifestChkptId;
269

270
  char    processName[24];
271
  int64_t processId;
272
} SSChkpMetaOnS3;
273

274
int32_t remoteChkp_readMetaData(char* path, SSChkpMetaOnS3** pMeta) {
1✔
275
  int32_t   code = 0;
1✔
276
  int32_t   cap = strlen(path) + 32;
1✔
277
  TdFilePtr pFile = NULL;
1✔
278

279
  char* metaPath = taosMemoryCalloc(1, cap);
1!
280
  if (metaPath == NULL) {
1!
281
    return terrno;
×
282
  }
283

284
  int32_t n = snprintf(metaPath, cap, "%s%s%s", path, TD_DIRSEP, "META");
1✔
285
  if (n <= 0 || n >= cap) {
1!
286
    taosMemoryFree(metaPath);
×
287
    return TSDB_CODE_OUT_OF_MEMORY;
×
288
  }
289

290
  pFile = taosOpenFile(path, TD_FILE_READ);
1✔
291
  if (pFile == NULL) {
1!
292
    code = terrno;
1✔
293
    goto _EXIT;
1✔
294
  }
295

296
  char buf[256] = {0};
×
297
  if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) {
×
298
    code = terrno;
×
299
    goto _EXIT;
×
300
  }
301

302
  SSChkpMetaOnS3* p = taosMemoryCalloc(1, sizeof(SSChkpMetaOnS3));
×
303
  if (p == NULL) {
×
304
    code = terrno;
×
305
    goto _EXIT;
×
306
  }
307
  n = sscanf(buf, META_ON_S3_FORMATE, p->pCurrName, &p->currChkptId, p->pManifestName, &p->manifestChkptId,
×
308
             p->processName, &p->processId);
×
309
  if (n != 6) {
×
310
    code = TSDB_CODE_INVALID_MSG;
×
311
    taosMemoryFree(p);
×
312
    goto _EXIT;
×
313
  }
314

315
  if (p->currChkptId != p->manifestChkptId) {
×
316
    code = TSDB_CODE_INVALID_MSG;
×
317
    taosMemoryFree(p);
×
318
    goto _EXIT;
×
319
  }
320
  *pMeta = p;
×
321
  code = 0;
×
322
_EXIT:
1✔
323
  taosCloseFile(&pFile);
1✔
324
  taosMemoryFree(metaPath);
1!
325
  return code;
1✔
326
}
327

328
int32_t remoteChkp_validAndCvtMeta(char* path, SSChkpMetaOnS3* pMeta, int64_t chkpId) {
×
329
  int32_t code = 0;
×
330
  int32_t nBytes = 0;
×
331

332
  int32_t cap = strlen(path) + 64;
×
333
  char*   src = taosMemoryCalloc(1, cap);
×
334
  char*   dst = taosMemoryCalloc(1, cap);
×
335
  if (src == NULL || dst == NULL) {
×
336
    code = terrno;
×
337
    goto _EXIT;
×
338
  }
339

340
  if (pMeta->currChkptId != chkpId || pMeta->manifestChkptId != chkpId) {
×
341
    code = TSDB_CODE_INVALID_CFG;
×
342
    goto _EXIT;
×
343
  }
344
  // rename current_chkp/mainfest to current
345
  for (int i = 0; i < 2; i++) {
×
346
    char* key = (i == 0 ? pMeta->pCurrName : pMeta->pManifestName);
×
347
    if (strlen(key) <= 0) {
×
348
      code = TSDB_CODE_INVALID_PARA;
×
349
      goto _EXIT;
×
350
    }
351

352
    nBytes = snprintf(src, cap, "%s%s%s_%" PRId64, path, TD_DIRSEP, key, pMeta->currChkptId);
×
353
    if (nBytes <= 0 || nBytes >= cap) {
×
354
      code = TSDB_CODE_OUT_OF_RANGE;
×
355
      goto _EXIT;
×
356
    }
357

358
    if (taosStatFile(src, NULL, NULL, NULL) != 0) {
×
359
      code = terrno;
×
360
      goto _EXIT;
×
361
    }
362

363
    nBytes = snprintf(dst, cap, "%s%s%s", path, TD_DIRSEP, key);
×
364
    if (nBytes <= 0 || nBytes >= cap) {
×
365
      code = TSDB_CODE_OUT_OF_RANGE;
×
366
      goto _EXIT;
×
367
    }
368

369
    code = taosRenameFile(src, dst);
×
370
    if (code != 0) {
×
371
      goto _EXIT;
×
372
    }
373

374
    memset(src, 0, cap);
×
375
    memset(dst, 0, cap);
×
376
  }
377
  code = 0;
×
378

379
// rename manifest_chkp to manifest
380
_EXIT:
×
381
  taosMemoryFree(src);
×
382
  taosMemoryFree(dst);
×
383
  return code;
×
384
}
385
int32_t remoteChkpGetDelFile(char* path, SArray* toDel) {
1✔
386
  int32_t code = 0;
1✔
387
  int32_t nBytes = 0;
1✔
388

389
  SSChkpMetaOnS3* pMeta = NULL;
1✔
390
  code = remoteChkp_readMetaData(path, &pMeta);
1✔
391
  if (code != 0) {
1!
392
    return code;
1✔
393
  }
394

395
  for (int i = 0; i < 2; i++) {
×
396
    char* key = (i == 0 ? pMeta->pCurrName : pMeta->pManifestName);
×
397

398
    int32_t cap = strlen(key) + 32;
×
399
    char*   p = taosMemoryCalloc(1, cap);
×
400
    if (p == NULL) {
×
401
      taosMemoryFree(pMeta);
×
402
      return terrno;
×
403
    }
404

405
    nBytes = snprintf(p, cap, "%s_%" PRId64, key, pMeta->currChkptId);
×
406
    if (nBytes <= 0 || nBytes >= cap) {
×
407
      taosMemoryFree(pMeta);
×
408
      taosMemoryFree(p);
×
409
      return TSDB_CODE_OUT_OF_RANGE;
×
410
    }
411
    if (taosArrayPush(toDel, &p) == NULL) {
×
412
      taosMemoryFree(pMeta);
×
413
      taosMemoryFree(p);
×
414
      return terrno;
×
415
    }
416
  }
417

418
  return 0;
×
419
}
420

421
void cleanDir(const char* pPath, const char* id) {
22✔
422
  if (pPath == NULL) {
22!
423
    stError("%s try to clean dir, but path is NULL", id);
×
424
    return;
×
425
  }
426

427
  if (taosIsDir(pPath)) {
22!
428
    taosRemoveDir(pPath);
22✔
429
    if (taosMkDir(pPath) != 0) {
22!
430
      stError("%s failed to create dir:%s", id, pPath);
×
431
    }
432
    stInfo("%s clear dir:%s, succ", id, pPath);
22!
433
  }
434
}
435

436
int32_t createDirIfNotExist(const char* pPath) {
66✔
437
  if (!taosIsDir(pPath)) {
66✔
438
    return taosMulMkDir(pPath);
57✔
439
  } else {
440
    return 0;
9✔
441
  }
442
}
443

444
int32_t rebuildFromRemoteChkp_rsync(const char* key, char* checkpointPath, int64_t checkpointId, char* defaultPath) {
×
445
  int32_t code = 0;
×
446
  if (taosIsDir(checkpointPath)) {
×
447
    taosRemoveDir(checkpointPath);
×
448
    stDebug("remove local checkpoint data dir:%s succ", checkpointPath);
×
449
  }
450

451
  cleanDir(defaultPath, key);
×
452
  stDebug("clear local default dir before downloading checkpoint data:%s succ", defaultPath);
×
453

454
  code = streamTaskDownloadCheckpointData(key, checkpointPath, checkpointId);
×
455
  if (code != 0) {
×
456
    stError("failed to download checkpoint data:%s", key);
×
457
    return code;
×
458
  }
459

460
  stDebug("download remote checkpoint data for checkpointId:%" PRId64 ", %s", checkpointId, key);
×
461
  return backendCopyFiles(checkpointPath, defaultPath);
×
462
}
463

464
int32_t rebuildDataFromS3(char* chkpPath, int64_t chkpId) {
×
465
  SSChkpMetaOnS3* pMeta = NULL;
×
466

467
  int32_t code = remoteChkp_readMetaData(chkpPath, &pMeta);
×
468
  if (code != 0) {
×
469
    return code;
×
470
  }
471

472
  if (pMeta->currChkptId != chkpId || pMeta->manifestChkptId != chkpId) {
×
473
    taosMemoryFree(pMeta);
×
474
    return TSDB_CODE_INVALID_PARA;
×
475
  }
476

477
  code = remoteChkp_validAndCvtMeta(chkpPath, pMeta, chkpId);
×
478
  if (code != 0) {
×
479
    taosMemoryFree(pMeta);
×
480
    return code;
×
481
  }
482
  taosMemoryFree(pMeta);
×
483

484
  return chkpAddExtraInfo(chkpPath, chkpId, pMeta->processId);
×
485
}
486

487
int32_t rebuildFromRemoteChkp_s3(const char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
×
488
  int8_t  rename = 0;
×
489
  int32_t code = streamTaskDownloadCheckpointData(key, chkpPath, chkpId);
×
490
  if (code != 0) {
×
491
    return code;
×
492
  }
493

494
  int32_t cap = strlen(defaultPath) + 32;
×
495

496
  char* defaultTmp = taosMemoryCalloc(1, cap);
×
497
  if (defaultTmp == NULL) {
×
498
    return terrno;
×
499
  }
500

501
  int32_t nBytes = snprintf(defaultPath, cap, "%s%s", defaultPath, "_tmp");
×
502
  if (nBytes <= 0 || nBytes >= cap) {
×
503
    taosMemoryFree(defaultPath);
×
504
    return TSDB_CODE_OUT_OF_RANGE;
×
505
  }
506

507
  if (taosIsDir(defaultTmp)) taosRemoveDir(defaultTmp);
×
508
  if (taosIsDir(defaultPath)) {
×
509
    code = taosRenameFile(defaultPath, defaultTmp);
×
510
    if (code != 0) {
×
511
      goto _EXIT;
×
512
    } else {
513
      rename = 1;
×
514
    }
515
  } else {
516
    code = taosMkDir(defaultPath);
×
517
    if (code != 0) {
×
518
      code = TAOS_SYSTEM_ERROR(ERRNO);
×
519
      goto _EXIT;
×
520
    }
521
  }
522

523
  code = rebuildDataFromS3(chkpPath, chkpId);
×
524
  if (code != 0) {
×
525
    goto _EXIT;
×
526
  }
527

528
  code = backendCopyFiles(chkpPath, defaultPath);
×
529
  if (code != 0) {
×
530
    goto _EXIT;
×
531
  }
532
  code = 0;
×
533

534
_EXIT:
×
535
  if (code != 0) {
×
536
    if (rename) {
×
537
      TAOS_UNUSED(taosRenameFile(defaultTmp, defaultPath));
×
538
    }
539
  }
540

541
  if (taosIsDir(defaultPath)) {
×
542
    taosRemoveDir(defaultPath);
×
543
  }
544

545
  taosMemoryFree(defaultTmp);
×
546
  return code;
×
547
}
548

549
int32_t rebuildFromRemoteCheckpoint(const char* key, char* checkpointPath, int64_t checkpointId, char* defaultPath) {
×
550
  ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType();
×
551
  if (type == DATA_UPLOAD_S3) {
×
552
    return rebuildFromRemoteChkp_s3(key, checkpointPath, checkpointId, defaultPath);
×
553
  } else if (type == DATA_UPLOAD_RSYNC) {
×
554
    return rebuildFromRemoteChkp_rsync(key, checkpointPath, checkpointId, defaultPath);
×
555
  } else {
556
    stError("%s no remote backup checkpoint data for:%" PRId64, key, checkpointId);
×
557
  }
558

559
  return -1;
×
560
}
561

562
int32_t copyFiles_create(char* src, char* dst, int8_t type) {
6✔
563
  // create and copy file
564
  int32_t err = taosCopyFile(src, dst);
6✔
565

566
  if (ERRNO == EXDEV || ERRNO == ENOTSUP) {
6!
567
    SET_ERRNO(0);
×
568
    return 0;
×
569
  }
570
  return 0;
6✔
571
}
572
int32_t copyFiles_hardlink(char* src, char* dst, int8_t type) {
12✔
573
  // same fs and hard link
574
  return taosLinkFile(src, dst);
12✔
575
}
576

577
int32_t backendFileCopyFilesImpl(const char* src, const char* dst) {
3✔
578
  const char* current = "CURRENT";
3✔
579
  size_t      currLen = strlen(current);
3✔
580

581
  const char* info = "info";
3✔
582
  size_t      infoLen = strlen(info);
3✔
583

584
  int32_t code = 0;
3✔
585
  int32_t sLen = strlen(src);
3✔
586
  int32_t dLen = strlen(dst);
3✔
587
  int32_t cap = TMAX(sLen, dLen) + 64;
3✔
588
  int32_t nBytes = 0;
3✔
589

590
  char* srcName = taosMemoryCalloc(1, cap);
3!
591
  char* dstName = taosMemoryCalloc(1, cap);
3!
592
  if (srcName == NULL || dstName == NULL) {
3!
593
    taosMemoryFree(srcName);
×
594
    taosMemoryFree(dstName);
×
595
    return terrno;
×
596
  }
597

598
  // copy file to dst
599
  TdDirPtr pDir = taosOpenDir(src);
3✔
600
  if (pDir == NULL) {
3!
601
    code = terrno;
×
602
    goto _ERROR;
×
603
  }
604

605
  SET_ERRNO(0);
3✔
606
  TdDirEntryPtr de = NULL;
3✔
607
  while ((de = taosReadDir(pDir)) != NULL) {
27✔
608
    char* name = taosGetDirEntryName(de);
24✔
609
    if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) {
24✔
610
      continue;
6✔
611
    }
612

613
    nBytes = snprintf(srcName, cap, "%s%s%s", src, TD_DIRSEP, name);
18✔
614
    if (nBytes <= 0 || nBytes >= cap) {
18!
615
      code = TSDB_CODE_OUT_OF_RANGE;
×
616
      goto _ERROR;
×
617
    }
618

619
    nBytes = snprintf(dstName, cap, "%s%s%s", dst, TD_DIRSEP, name);
18✔
620
    if (nBytes <= 0 || nBytes >= cap) {
18!
621
      code = TSDB_CODE_OUT_OF_RANGE;
×
622
      goto _ERROR;
×
623
    }
624

625
    if (strncmp(name, current, strlen(name) <= currLen ? strlen(name) : currLen) == 0) {
18✔
626
      code = copyFiles_create(srcName, dstName, 0);
3✔
627
      if (code != 0) {
3!
628
        code = TAOS_SYSTEM_ERROR(ERRNO);
×
629
        stError("failed to copy file, detail: %s to %s reason:%s", srcName, dstName, tstrerror(code));
×
630
        goto _ERROR;
×
631
      }
632
    } else if (strncmp(name, info, strlen(name) <= infoLen ? strlen(name) : infoLen) == 0) {
15✔
633
      code = copyFiles_create(srcName, dstName, 0);
3✔
634
      if (code != 0) {
3!
635
        code = TAOS_SYSTEM_ERROR(ERRNO);
×
636
        stError("failed to copy file, detail: %s to %s reason:%s", srcName, dstName, tstrerror(code));
×
637
        goto _ERROR;
×
638
      }
639

640
    } else {
641
      code = copyFiles_hardlink(srcName, dstName, 0);
12✔
642
      if (code != 0) {
12!
643
        code = TAOS_SYSTEM_ERROR(ERRNO);
×
644
        stError("failed to hard link file, detail:%s to %s, reason:%s", srcName, dstName, tstrerror(code));
×
645
        goto _ERROR;
×
646
      } else {
647
        stDebug("succ hard link file:%s to %s", srcName, dstName);
12!
648
      }
649
    }
650

651
    memset(srcName, 0, cap);
18✔
652
    memset(dstName, 0, cap);
18✔
653
  }
654

655
  taosMemoryFreeClear(srcName);
3!
656
  taosMemoryFreeClear(dstName);
3!
657
  TAOS_UNUSED(taosCloseDir(&pDir));
3✔
658
  return code;
3✔
659

660
_ERROR:
×
661
  taosMemoryFreeClear(srcName);
×
662
  taosMemoryFreeClear(dstName);
×
663
  TAOS_UNUSED(taosCloseDir(&pDir));
×
664
  return code;
×
665
}
666

667
int32_t backendCopyFiles(const char* src, const char* dst) { return backendFileCopyFilesImpl(src, dst); }
3✔
668

669
static int32_t rebuildFromLocalCheckpoint(const char* pTaskIdStr, const char* checkpointPath, int64_t checkpointId,
3✔
670
                                          const char* defaultPath, int64_t* processVer) {
671
  int32_t code = 0;
3✔
672
  cleanDir(defaultPath, pTaskIdStr);
3✔
673

674
  if (taosIsDir(checkpointPath) && isValidCheckpoint(checkpointPath)) {
3!
675
    stDebug("%s local checkpoint data existed, checkpointId:%" PRId64 " copy to backend dir", pTaskIdStr, checkpointId);
3!
676

677
    code = backendCopyFiles(checkpointPath, defaultPath);
3✔
678
    if (code != TSDB_CODE_SUCCESS) {
3!
679
      cleanDir(defaultPath, pTaskIdStr);
×
680
      stError("%s failed to start stream backend from local %s, reason:%s, try download checkpoint from remote",
×
681
              pTaskIdStr, checkpointPath, tstrerror(TAOS_SYSTEM_ERROR(code)));
682
      code = TSDB_CODE_SUCCESS;
×
683
    } else {
684
      stInfo("%s copy checkpoint data from:%s to:%s succ, try to start stream backend", pTaskIdStr, checkpointPath,
3!
685
             defaultPath);
686
    }
687
  } else {
688
    code = terrno;
×
689
    stError("%s no valid data for checkpointId:%" PRId64 " in %s", pTaskIdStr, checkpointId, checkpointPath);
×
690
  }
691

692
  return code;
3✔
693
}
694

695
int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId, char** dbPrefixPath, char** dbPath,
22✔
696
                              int64_t* processVer) {
697
  int32_t code = 0;
22✔
698

699
  char* prefixPath = NULL;
22✔
700
  char* defaultPath = NULL;
22✔
701
  char* checkpointPath = NULL;
22✔
702
  char* checkpointRoot = NULL;
22✔
703

704
  int32_t cap = strlen(path) + 128;
22✔
705
  int32_t nBytes;
706

707
  // alloc buf
708
  prefixPath = taosMemoryCalloc(1, cap);
22!
709
  defaultPath = taosMemoryCalloc(1, cap);
22!
710
  checkpointPath = taosMemoryCalloc(1, cap);
22!
711
  checkpointRoot = taosMemoryCalloc(1, cap);
22!
712
  if (prefixPath == NULL || defaultPath == NULL || checkpointPath == NULL || checkpointRoot == NULL) {
22!
713
    code = terrno;
×
714
    goto _EXIT;
×
715
  }
716

717
  nBytes = snprintf(prefixPath, cap, "%s%s%s", path, TD_DIRSEP, key);
22✔
718
  if (nBytes <= 0 || nBytes >= cap) {
22!
719
    code = TSDB_CODE_OUT_OF_RANGE;
×
720
    goto _EXIT;
×
721
  }
722

723
  code = createDirIfNotExist(prefixPath);
22✔
724
  if (code != 0) {
22!
725
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
726
    goto _EXIT;
×
727
  }
728

729
  nBytes = snprintf(defaultPath, cap, "%s%s%s", prefixPath, TD_DIRSEP, "state");
22✔
730
  if (nBytes <= 0 || nBytes >= cap) {
22!
731
    code = TSDB_CODE_OUT_OF_RANGE;
×
732
    goto _EXIT;
×
733
  }
734

735
  code = createDirIfNotExist(defaultPath);
22✔
736
  if (code != 0) {
22!
737
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
738
    goto _EXIT;
×
739
  }
740

741
  nBytes = snprintf(checkpointRoot, cap, "%s%s%s", prefixPath, TD_DIRSEP, "checkpoints");
22✔
742
  if (nBytes <= 0 || nBytes >= cap) {
22!
743
    code = TSDB_CODE_OUT_OF_RANGE;
×
744
    goto _EXIT;
×
745
  }
746

747
  code = createDirIfNotExist(checkpointRoot);
22✔
748
  if (code != 0) {
22!
749
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
750
    goto _EXIT;
×
751
  }
752

753
  stDebug("%s check local backend dir:%s, checkpointId:%" PRId64 " succ", key, defaultPath, chkptId);
22✔
754
  if (chkptId > 0) {
22✔
755
    nBytes = snprintf(checkpointPath, cap, "%s%s%s%s%s%" PRId64, prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP,
3✔
756
                      "checkpoint", chkptId);
757
    if (nBytes <= 0 || nBytes >= cap) {
3!
758
      code = TSDB_CODE_OUT_OF_RANGE;
×
759
      goto _EXIT;
×
760
    }
761

762
    code = rebuildFromLocalCheckpoint(key, checkpointPath, chkptId, defaultPath, processVer);
3✔
763
    if (code != 0) {
3!
764
      code = rebuildFromRemoteCheckpoint(key, checkpointPath, chkptId, defaultPath);
×
765
    }
766

767
    if (code != 0) {
3!
768
      stError("failed to start stream backend at %s, restart from defaultPath:%s, reason:%s", checkpointPath,
×
769
              defaultPath, tstrerror(code));
770
      code = 0;  // reset the error code
×
771
    }
772
  } else {  // no valid checkpoint id
773
    stInfo("%s no valid checkpoint ever generated, no need to copy checkpoint data, clean defaultPath:%s", key,
19!
774
           defaultPath);
775
    cleanDir(defaultPath, key);
19✔
776
  }
777

778
  *dbPath = defaultPath;
22✔
779
  *dbPrefixPath = prefixPath;
22✔
780
  defaultPath = NULL;
22✔
781
  prefixPath = NULL;
22✔
782

783
  code = 0;
22✔
784

785
_EXIT:
22✔
786
  taosMemoryFree(defaultPath);
22!
787
  taosMemoryFree(prefixPath);
22!
788
  taosMemoryFree(checkpointPath);
22!
789
  taosMemoryFree(checkpointRoot);
22!
790
  return code;
22✔
791
}
792
bool streamBackendDataIsExist(const char* path, int64_t chkpId) {
2✔
793
  bool    exist = true;
2✔
794
  int32_t cap = strlen(path) + 32;
2✔
795

796
  char* state = taosMemoryCalloc(1, cap);
2!
797
  if (state == NULL) {
2!
798
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
799
    return false;
×
800
  }
801

802
  int16_t nBytes = snprintf(state, cap, "%s%s%s", path, TD_DIRSEP, "state");
2✔
803
  if (nBytes <= 0 || nBytes >= cap) {
2!
804
    terrno = TSDB_CODE_OUT_OF_RANGE;
×
805
    exist = false;
×
806
  } else {
807
    if (!taosDirExist(state)) {
2✔
808
      exist = false;
1✔
809
    }
810
  }
811

812
  taosMemoryFree(state);
2!
813
  return exist;
2✔
814
}
815

816
int32_t streamBackendInit(const char* streamPath, int64_t chkpId, int32_t vgId, SBackendWrapper** pBackend) {
3✔
817
  char*   backendPath = NULL;
3✔
818
  int32_t code = 0;
3✔
819
  int32_t lino = 0;
3✔
820
  char*   err = NULL;
3✔
821
  size_t  nCf = 0;
3✔
822

823
  *pBackend = NULL;
3✔
824

825
  code = rebuildDirFromCheckpoint(streamPath, chkpId, &backendPath);
3✔
826
  TSDB_CHECK_CODE(code, lino, _EXIT);
3!
827

828
  stDebug("start to init stream backend:%s, checkpointId:%" PRId64 " vgId:%d", backendPath, chkpId, vgId);
3✔
829

830
  uint32_t         dbMemLimit = nextPow2(tsMaxStreamBackendCache) << 20;
3✔
831
  SBackendWrapper* pHandle = taosMemoryCalloc(1, sizeof(SBackendWrapper));
3!
832
  TSDB_CHECK_NULL(pHandle, code, lino, _EXIT, terrno);
3!
833

834
  pHandle->list = tdListNew(sizeof(SCfComparator));
3✔
835
  TSDB_CHECK_NULL(pHandle->list, code, lino, _EXIT, terrno);
3!
836

837
  code = taosThreadMutexInit(&pHandle->mutex, NULL);
3✔
838
  TSDB_CHECK_CODE(code, lino, _EXIT);
3!
839

840
  code = taosThreadMutexInit(&pHandle->cfMutex, NULL);
3✔
841
  TSDB_CHECK_CODE(code, lino, _EXIT);
3!
842

843
  pHandle->cfInst = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
3✔
844
  TSDB_CHECK_NULL(pHandle->cfInst, code, lino, _EXIT, terrno);
3!
845

846
  pHandle->vgId = vgId;
3✔
847

848
  rocksdb_env_t* env = rocksdb_create_default_env();  // rocksdb_envoptions_create();
3✔
849

850
  int32_t nBGThread = tsNumOfSnodeStreamThreads <= 2 ? 1 : tsNumOfSnodeStreamThreads / 2;
3!
851
  rocksdb_env_set_low_priority_background_threads(env, nBGThread);
3✔
852
  rocksdb_env_set_high_priority_background_threads(env, nBGThread);
3✔
853

854
  rocksdb_cache_t* cache = rocksdb_cache_create_lru(dbMemLimit / 2);
3✔
855

856
  rocksdb_options_t* opts = rocksdb_options_create();
3✔
857
  rocksdb_options_set_env(opts, env);
3✔
858
  rocksdb_options_set_create_if_missing(opts, 1);
3✔
859
  rocksdb_options_set_create_missing_column_families(opts, 1);
3✔
860
  rocksdb_options_set_max_total_wal_size(opts, dbMemLimit);
3✔
861
  rocksdb_options_set_recycle_log_file_num(opts, 6);
3✔
862
  rocksdb_options_set_max_write_buffer_number(opts, 3);
3✔
863
  rocksdb_options_set_info_log_level(opts, 1);
3✔
864
  rocksdb_options_set_db_write_buffer_size(opts, dbMemLimit);
3✔
865
  rocksdb_options_set_write_buffer_size(opts, dbMemLimit / 2);
3✔
866
  rocksdb_options_set_atomic_flush(opts, 1);
3✔
867

868
  pHandle->env = env;
3✔
869
  pHandle->dbOpt = opts;
3✔
870
  pHandle->cache = cache;
3✔
871
  pHandle->filterFactory = rocksdb_compactionfilterfactory_create(
3✔
872
      NULL, destroyCompactFilteFactory, compactFilteFactoryCreateFilter, compactFilteFactoryName);
873
  rocksdb_options_set_compaction_filter_factory(pHandle->dbOpt, pHandle->filterFactory);
3✔
874

875
  char** cfs = rocksdb_list_column_families(opts, backendPath, &nCf, &err);
3✔
876
  if (nCf == 0 || nCf == 1 || err != NULL) {
3!
877
    taosMemoryFreeClear(err);
2!
878
    pHandle->db = rocksdb_open(opts, backendPath, &err);
2✔
879
    if (err != NULL) {
2!
880
      stError("failed to open rocksdb, path:%s, reason:%s", backendPath, err);
×
881
      taosMemoryFreeClear(err);
×
882
      rocksdb_list_column_families_destroy(cfs, nCf);
×
883
      goto _EXIT;
×
884
    }
885
  } else {
886
    /*
887
      list all cf and get prefix
888
    */
889
    code = streamStateOpenBackendCf(pHandle, (char*)backendPath, cfs, nCf);
1✔
890
    if (code != 0) {
1!
891
      rocksdb_list_column_families_destroy(cfs, nCf);
×
892
      goto _EXIT;
×
893
    }
894
  }
895

896
  if (cfs != NULL) {
3!
897
    rocksdb_list_column_families_destroy(cfs, nCf);
3✔
898
  }
899

900
  stDebug("init stream backend at %s, backend:%p, vgId:%d", backendPath, pHandle, vgId);
3✔
901
  taosMemoryFreeClear(backendPath);
3!
902

903
  *pBackend = pHandle;
3✔
904
  return code;
3✔
905

906
_EXIT:
×
907
  rocksdb_options_destroy(opts);
×
908
  rocksdb_cache_destroy(cache);
×
909
  rocksdb_env_destroy(env);
×
910
  streamMutexDestroy(&pHandle->mutex);
×
911
  streamMutexDestroy(&pHandle->cfMutex);
×
912
  taosHashCleanup(pHandle->cfInst);
×
913
  pHandle->list = tdListFree(pHandle->list);
×
914
  taosMemoryFree(pHandle);
×
915
  stDebug("failed to init stream backend at %s, vgId:%d line:%d code:%s", backendPath, vgId, lino, tstrerror(code));
×
916
  taosMemoryFree(backendPath);
×
917
  return code;
×
918
}
919

920
void streamBackendCleanup(void* arg) {
3✔
921
  SBackendWrapper* pHandle = (SBackendWrapper*)arg;
3✔
922

923
  void* pIter = taosHashIterate(pHandle->cfInst, NULL);
3✔
924
  while (pIter != NULL) {
5✔
925
    RocksdbCfInst* inst = *(RocksdbCfInst**)pIter;
2✔
926
    destroyRocksdbCfInst(inst);
2✔
927
    pIter = taosHashIterate(pHandle->cfInst, pIter);
2✔
928
  }
929

930
  taosHashCleanup(pHandle->cfInst);
3✔
931

932
  if (pHandle->db) {
3!
933
    rocksdb_close(pHandle->db);
3✔
934
    pHandle->db = NULL;
3✔
935
  }
936

937
  rocksdb_options_destroy(pHandle->dbOpt);
3✔
938
  rocksdb_env_destroy(pHandle->env);
3✔
939
  rocksdb_cache_destroy(pHandle->cache);
3✔
940

941
  SListNode* head = tdListPopHead(pHandle->list);
3✔
942
  while (head != NULL) {
5✔
943
    streamStateDestroyCompar(head->data);
2✔
944
    taosMemoryFree(head);
2!
945
    head = tdListPopHead(pHandle->list);
2✔
946
  }
947

948
  pHandle->list = tdListFree(pHandle->list);
3✔
949
  streamMutexDestroy(&pHandle->mutex);
3✔
950

951
  streamMutexDestroy(&pHandle->cfMutex);
3✔
952
  stDebug("vgId:%d destroy stream backend:%p", (int32_t)pHandle->vgId, pHandle);
3✔
953
  taosMemoryFree(pHandle);
3!
954
}
3✔
955

956
void streamBackendHandleCleanup(void* arg) {
×
957
  SBackendCfWrapper* wrapper = arg;
×
958
  bool               remove = wrapper->remove;
×
959
  TAOS_UNUSED(taosThreadRwlockWrlock(&wrapper->rwLock));
×
960

961
  stDebug("start to do-close backendWrapper %p, %s", wrapper, wrapper->idstr);
×
962
  if (wrapper->rocksdb == NULL) {
×
963
    TAOS_UNUSED(taosThreadRwlockUnlock(&wrapper->rwLock));
×
964
    return;
×
965
  }
966

967
  int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
×
968

969
  char* err = NULL;
×
970
  if (remove) {
×
971
    for (int i = 0; i < cfLen; i++) {
×
972
      if (wrapper->pHandle[i] != NULL) rocksdb_drop_column_family(wrapper->rocksdb, wrapper->pHandle[i], &err);
×
973
      if (err != NULL) {
×
974
        stError("failed to drop cf:%s_%s, reason:%s", wrapper->idstr, ginitDict[i].key, err);
×
975
        taosMemoryFreeClear(err);
×
976
      }
977
    }
978
  } else {
979
    rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
×
980
    rocksdb_flushoptions_set_wait(flushOpt, 1);
×
981

982
    for (int i = 0; i < cfLen; i++) {
×
983
      if (wrapper->pHandle[i] != NULL) rocksdb_flush_cf(wrapper->rocksdb, flushOpt, wrapper->pHandle[i], &err);
×
984
      if (err != NULL) {
×
985
        stError("failed to flush cf:%s_%s, reason:%s", wrapper->idstr, ginitDict[i].key, err);
×
986
        taosMemoryFreeClear(err);
×
987
      }
988
    }
989
    rocksdb_flushoptions_destroy(flushOpt);
×
990
  }
991

992
  for (int i = 0; i < cfLen; i++) {
×
993
    if (wrapper->pHandle[i] != NULL) {
×
994
      rocksdb_column_family_handle_destroy(wrapper->pHandle[i]);
×
995
    }
996
  }
997
  taosMemoryFreeClear(wrapper->pHandle);
×
998

999
  for (int i = 0; i < cfLen; i++) {
×
1000
    rocksdb_options_destroy(wrapper->cfOpts[i]);
×
1001
    rocksdb_block_based_options_destroy(((RocksdbCfParam*)wrapper->param)[i].tableOpt);
×
1002
  }
1003

1004
  if (remove) {
×
1005
    streamBackendDelCompare(wrapper->pBackend, wrapper->pComparNode);
×
1006
  }
1007
  rocksdb_writeoptions_destroy(wrapper->writeOpts);
×
1008
  wrapper->writeOpts = NULL;
×
1009

1010
  rocksdb_readoptions_destroy(wrapper->readOpts);
×
1011
  wrapper->readOpts = NULL;
×
1012
  taosMemoryFreeClear(wrapper->cfOpts);
×
1013
  taosMemoryFreeClear(wrapper->param);
×
1014
  TAOS_UNUSED(taosThreadRwlockUnlock(&wrapper->rwLock));
×
1015

1016
  TAOS_UNUSED(taosThreadRwlockDestroy(&wrapper->rwLock));
×
1017
  wrapper->rocksdb = NULL;
×
1018
  // taosReleaseRef(streamBackendId, wrapper->backendId);
1019

1020
  stDebug("end to do-close backendwrapper %p, %s", wrapper, wrapper->idstr);
×
1021
  taosMemoryFree(wrapper);
×
1022
  return;
×
1023
}
1024

1025
#ifdef BUILD_NO_CALL
1026
int32_t getLatestCheckpoint(void* arg, int64_t* checkpoint) {
1027
  SStreamMeta* pMeta = arg;
1028
  taosWLockLatch(&pMeta->chkpDirLock);
1029
  int64_t tc = 0;
1030
  int32_t sz = taosArrayGetSize(pMeta->chkpSaved);
1031
  if (sz <= 0) {
1032
    taosWUnLockLatch(&pMeta->chkpDirLock);
1033
    return -1;
1034
  } else {
1035
    tc = *(int64_t*)taosArrayGetLast(pMeta->chkpSaved);
1036
  }
1037

1038
  taosArrayPush(pMeta->chkpInUse, &tc);
1039

1040
  *checkpoint = tc;
1041
  taosWUnLockLatch(&pMeta->chkpDirLock);
1042
  return 0;
1043
}
1044
/*
1045
 *  checkpointSave |--cp1--|--cp2--|--cp3--|--cp4--|--cp5--|
1046
 *  chkpInUse: |--cp2--|--cp4--|
1047
 *  chkpInUse is doing translation, cannot del until
1048
 *  replication is finished
1049
 */
1050
int32_t delObsoleteCheckpoint(void* arg, const char* path) {
1051
  SStreamMeta* pMeta = arg;
1052

1053
  taosWLockLatch(&pMeta->chkpDirLock);
1054

1055
  SArray* chkpDel = taosArrayInit(10, sizeof(int64_t));
1056
  SArray* chkpDup = taosArrayInit(10, sizeof(int64_t));
1057

1058
  int64_t firsId = 0;
1059
  if (taosArrayGetSize(pMeta->chkpInUse) >= 1) {
1060
    firsId = *(int64_t*)taosArrayGet(pMeta->chkpInUse, 0);
1061

1062
    for (int i = 0; i < taosArrayGetSize(pMeta->chkpSaved); i++) {
1063
      int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpSaved, i);
1064
      if (id >= firsId) {
1065
        taosArrayPush(chkpDup, &id);
1066
      } else {
1067
        taosArrayPush(chkpDel, &id);
1068
      }
1069
    }
1070
  } else {
1071
    int32_t sz = taosArrayGetSize(pMeta->chkpSaved);
1072
    int32_t dsz = sz - pMeta->chkpCap;  // del size
1073

1074
    for (int i = 0; i < dsz; i++) {
1075
      int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpSaved, i);
1076
      taosArrayPush(chkpDel, &id);
1077
    }
1078
    for (int i = dsz < 0 ? 0 : dsz; i < sz; i++) {
1079
      int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpSaved, i);
1080
      taosArrayPush(chkpDup, &id);
1081
    }
1082
  }
1083
  taosArrayDestroy(pMeta->chkpSaved);
1084
  pMeta->chkpSaved = chkpDup;
1085

1086
  taosWUnLockLatch(&pMeta->chkpDirLock);
1087

1088
  for (int i = 0; i < taosArrayGetSize(chkpDel); i++) {
1089
    int64_t id = *(int64_t*)taosArrayGet(chkpDel, i);
1090
    char    tbuf[256] = {0};
1091
    sprintf(tbuf, "%s%scheckpoint%" PRId64, path, TD_DIRSEP, id);
1092
    if (taosIsDir(tbuf)) {
1093
      taosRemoveDir(tbuf);
1094
    }
1095
  }
1096
  taosArrayDestroy(chkpDel);
1097
  return 0;
1098
}
1099
#endif
1100
/*
1101
 *  checkpointSave |--cp1--|--cp2--|--cp3--|--cp4--|--cp5--|
1102
 *  chkpInUse: |--cp2--|--cp4--|
1103
 *  chkpInUse is doing translation, cannot del until
1104
 *  replication is finished
1105
 */
1106
int32_t chkpMayDelObsolete(void* arg, int64_t chkpId, char* path) {
8✔
1107
  int32_t         code = 0;
8✔
1108
  STaskDbWrapper* pBackend = arg;
8✔
1109
  SArray *        chkpDel = NULL, *chkpDup = NULL;
8✔
1110
  TAOS_UNUSED(taosThreadRwlockWrlock(&pBackend->chkpDirLock));
8✔
1111

1112
  if (taosArrayPush(pBackend->chkpSaved, &chkpId) == NULL) {
16!
1113
    TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
1114
  }
1115

1116
  chkpDel = taosArrayInit(8, sizeof(int64_t));
8✔
1117
  if (chkpDel == NULL) {
8!
1118
    TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
1119
  }
1120

1121
  chkpDup = taosArrayInit(8, sizeof(int64_t));
8✔
1122
  if (chkpDup == NULL) {
8!
1123
    TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
1124
  }
1125

1126
  int64_t firsId = 0;
8✔
1127
  if (taosArrayGetSize(pBackend->chkpInUse) >= 1) {
8!
1128
    firsId = *(int64_t*)taosArrayGet(pBackend->chkpInUse, 0);
×
1129

1130
    for (int i = 0; i < taosArrayGetSize(pBackend->chkpSaved); i++) {
×
1131
      int64_t id = *(int64_t*)taosArrayGet(pBackend->chkpSaved, i);
×
1132
      if (id >= firsId) {
×
1133
        if (taosArrayPush(chkpDup, &id) == NULL) {
×
1134
          TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
1135
        }
1136
      } else {
1137
        if (taosArrayPush(chkpDel, &id) == NULL) {
×
1138
          TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
1139
        }
1140
      }
1141
    }
1142
  } else {
1143
    int32_t sz = taosArrayGetSize(pBackend->chkpSaved);
8✔
1144
    int32_t dsz = sz - pBackend->chkpCap;  // del size
8✔
1145

1146
    for (int i = 0; i < dsz; i++) {
8!
1147
      int64_t id = *(int64_t*)taosArrayGet(pBackend->chkpSaved, i);
×
1148
      if (taosArrayPush(chkpDel, &id) == NULL) {
×
1149
        TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
1150
      }
1151
    }
1152
    for (int i = dsz < 0 ? 0 : dsz; i < sz; i++) {
19✔
1153
      int64_t id = *(int64_t*)taosArrayGet(pBackend->chkpSaved, i);
11✔
1154
      if (taosArrayPush(chkpDup, &id) == NULL) {
11!
1155
        TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
1156
      }
1157
    }
1158
  }
1159

1160
  taosArrayDestroy(pBackend->chkpSaved);
8✔
1161
  pBackend->chkpSaved = chkpDup;
8✔
1162
  chkpDup = NULL;
8✔
1163

1164
  TAOS_UNUSED(taosThreadRwlockUnlock(&pBackend->chkpDirLock));
8✔
1165

1166
  for (int i = 0; i < taosArrayGetSize(chkpDel); i++) {
8!
1167
    int64_t id = *(int64_t*)taosArrayGet(chkpDel, i);
×
1168
    char    tbuf[256] = {0};
×
1169
    if (snprintf(tbuf, sizeof(tbuf), "%s%scheckpoint%" PRId64, path, TD_DIRSEP, id) >= sizeof(tbuf)) {
×
1170
      code = TSDB_CODE_OUT_OF_RANGE;
×
1171
      TAOS_CHECK_GOTO(code, NULL, _exception);
×
1172
    }
1173

1174
    stInfo("backend remove obsolete checkpoint: %s", tbuf);
×
1175
    if (taosIsDir(tbuf)) {
×
1176
      taosRemoveDir(tbuf);
×
1177
    }
1178
  }
1179
  taosArrayDestroy(chkpDel);
8✔
1180
  return 0;
8✔
1181
_exception:
×
1182
  taosArrayDestroy(chkpDup);
×
1183
  taosArrayDestroy(chkpDel);
×
1184
  TAOS_UNUSED(taosThreadRwlockUnlock(&pBackend->chkpDirLock));
×
1185
  return code;
×
1186
}
1187

1188
int chkpIdComp(const void* a, const void* b) {
×
1189
  int64_t x = *(int64_t*)a;
×
1190
  int64_t y = *(int64_t*)b;
×
1191
  if (x == y) return 0;
×
1192

1193
  return x < y ? -1 : 1;
×
1194
}
1195
int32_t taskDbLoadChkpInfo(STaskDbWrapper* pBackend) {
22✔
1196
  int32_t code = 0;
22✔
1197
  int32_t nBytes = 0;
22✔
1198
  int32_t cap = 256;
22✔
1199
  char*   pChkpDir = taosMemoryCalloc(1, cap);
22!
1200
  if (pChkpDir == NULL) {
22!
1201
    return terrno;
×
1202
  }
1203

1204
  nBytes = snprintf(pChkpDir, cap, "%s%s%s", pBackend->path, TD_DIRSEP, "checkpoints");
22✔
1205
  if (nBytes >= cap) {
22!
1206
    return TSDB_CODE_OUT_OF_RANGE;
×
1207
  }
1208
  if (!taosIsDir(pChkpDir)) {
22!
1209
    taosMemoryFree(pChkpDir);
×
1210
    return 0;
×
1211
  }
1212
  TdDirPtr pDir = taosOpenDir(pChkpDir);
22✔
1213
  if (pDir == NULL) {
22!
1214
    taosMemoryFree(pChkpDir);
×
1215
    return 0;
×
1216
  }
1217
  TdDirEntryPtr de = NULL;
22✔
1218
  while ((de = taosReadDir(pDir)) != NULL) {
69✔
1219
    if (strcmp(taosGetDirEntryName(de), ".") == 0 || strcmp(taosGetDirEntryName(de), "..") == 0) continue;
47✔
1220

1221
    if (taosDirEntryIsDir(de)) {
3!
1222
      char    checkpointPrefix[32] = {0};
3✔
1223
      int64_t checkpointId = 0;
3✔
1224

1225
      int ret = sscanf(taosGetDirEntryName(de), "checkpoint%" PRId64, &checkpointId);
3✔
1226
      if (ret == 1) {
3!
1227
        if (taosArrayPush(pBackend->chkpSaved, &checkpointId) == NULL) {
6!
1228
          TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
1229
        }
1230
      }
1231
    } else {
1232
      continue;
×
1233
    }
1234
  }
1235
  taosArraySort(pBackend->chkpSaved, chkpIdComp);
22✔
1236

1237
  taosMemoryFree(pChkpDir);
22!
1238
  TAOS_UNUSED(taosCloseDir(&pDir));
22✔
1239

1240
  return 0;
22✔
1241
_exception:
×
1242
  taosMemoryFree(pChkpDir);
×
1243
  TAOS_UNUSED(taosCloseDir(&pDir));
×
1244
  return code;
×
1245
}
1246
int32_t chkpGetAllDbCfHandle2(STaskDbWrapper* pBackend, rocksdb_column_family_handle_t*** ppHandle) {
8✔
1247
  int32_t code = 0;
8✔
1248
  SArray* pHandle = taosArrayInit(8, POINTER_BYTES);
8✔
1249
  for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
64✔
1250
    if (pBackend->pCf[i]) {
56✔
1251
      rocksdb_column_family_handle_t* p = pBackend->pCf[i];
27✔
1252
      if (taosArrayPush(pHandle, &p) == NULL) {
27!
1253
        code = terrno;
×
1254
        goto _exception;
×
1255
      }
1256
    }
1257
  }
1258
  int32_t nCf = taosArrayGetSize(pHandle);
8✔
1259
  if (nCf == 0) {
8!
1260
    taosArrayDestroy(pHandle);
×
1261
    return nCf;
×
1262
  }
1263

1264
  rocksdb_column_family_handle_t** ppCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
8!
1265
  if (ppCf == NULL) {
8!
1266
    TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
1267
  }
1268
  for (int i = 0; i < nCf; i++) {
35✔
1269
    ppCf[i] = taosArrayGetP(pHandle, i);
27✔
1270
  }
1271

1272
  taosArrayDestroy(pHandle);
8✔
1273

1274
  *ppHandle = ppCf;
8✔
1275
  return nCf;
8✔
1276
_exception:
×
1277
  taosArrayDestroy(pHandle);
×
1278
  return code;
×
1279
}
1280

1281
int32_t chkpDoDbCheckpoint(rocksdb_t* db, char* path) {
8✔
1282
  int32_t               code = -1;
8✔
1283
  char*                 err = NULL;
8✔
1284
  rocksdb_checkpoint_t* cp = rocksdb_checkpoint_object_create(db, &err);
8✔
1285
  if (cp == NULL || err != NULL) {
8!
1286
    stError("failed to do checkpoint at:%s, reason:%s", path, err);
×
1287
    taosMemoryFreeClear(err);
×
1288
    code = TSDB_CODE_THIRDPARTY_ERROR;
×
1289
    goto _ERROR;
×
1290
  }
1291
  rocksdb_checkpoint_create(cp, path, UINT64_MAX, &err);
8✔
1292
  if (err != NULL) {
8!
1293
    stError("failed to do checkpoint at:%s, reason:%s", path, err);
×
1294
    taosMemoryFreeClear(err);
×
1295
    code = TSDB_CODE_THIRDPARTY_ERROR;
×
1296
  } else {
1297
    code = 0;
8✔
1298
  }
1299
_ERROR:
8✔
1300
  rocksdb_checkpoint_object_destroy(cp);
8✔
1301
  return code;
8✔
1302
}
1303

1304
int32_t chkpPreFlushDb(rocksdb_t* db, rocksdb_column_family_handle_t** cf, int32_t nCf) {
5✔
1305
  int   code = 0;
5✔
1306
  char* err = NULL;
5✔
1307

1308
  rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
5✔
1309
  if (flushOpt == NULL) {
5!
1310
    return TSDB_CODE_OUT_OF_MEMORY;
×
1311
  }
1312

1313
  rocksdb_flushoptions_set_wait(flushOpt, 1);
5✔
1314

1315
  rocksdb_flush_cfs(db, flushOpt, cf, nCf, &err);
5✔
1316
  if (err != NULL) {
5!
1317
    stError("failed to flush db before streamBackend clean up, reason:%s", err);
×
1318
    taosMemoryFree(err);
×
1319
    code = TSDB_CODE_THIRDPARTY_ERROR;
×
1320
  }
1321
  rocksdb_flushoptions_destroy(flushOpt);
5✔
1322
  return code;
5✔
1323
}
1324

1325
int32_t chkpPreBuildDir(char* path, int64_t chkpId, char** chkpDir, char** chkpIdDir) {
8✔
1326
  int32_t code = 0;
8✔
1327
  int32_t cap = strlen(path) + 256;
8✔
1328
  int32_t nBytes = 0;
8✔
1329

1330
  char* pChkpDir = taosMemoryCalloc(1, cap);
8!
1331
  char* pChkpIdDir = taosMemoryCalloc(1, cap);
8!
1332
  if (pChkpDir == NULL || pChkpIdDir == NULL) {
8!
1333
    code = terrno;
×
1334
    goto _EXIT;
×
1335
  }
1336

1337
  nBytes = snprintf(pChkpDir, cap, "%s%s%s", path, TD_DIRSEP, "checkpoints");
8✔
1338
  if (nBytes <= 0 || nBytes >= cap) {
8!
1339
    code = TSDB_CODE_OUT_OF_RANGE;
×
1340
    goto _EXIT;
×
1341
  }
1342

1343
  nBytes = snprintf(pChkpIdDir, cap, "%s%s%s%" PRId64, pChkpDir, TD_DIRSEP, "checkpoint", chkpId);
8✔
1344
  if (nBytes <= 0 || nBytes >= cap) {
8!
1345
    code = TSDB_CODE_OUT_OF_RANGE;
×
1346
    goto _EXIT;
×
1347
  }
1348

1349
  code = taosMulModeMkDir(pChkpDir, 0755, true);
8✔
1350
  if (code != 0) {
8!
1351
    code = terrno;
×
1352
    stError("failed to prepare checkpoint dir, path:%s, reason:%s", path, tstrerror(code));
×
1353
    goto _EXIT;
×
1354
  }
1355

1356
  if (taosIsDir(pChkpIdDir)) {
8!
1357
    stInfo("stream rm exist checkpoint%s", pChkpIdDir);
×
1358
    taosRemoveDir(pChkpIdDir);
×
1359
  }
1360

1361
  *chkpDir = pChkpDir;
8✔
1362
  *chkpIdDir = pChkpIdDir;
8✔
1363
  return 0;
8✔
1364
_EXIT:
×
1365
  taosMemoryFree(pChkpDir);
×
1366
  taosMemoryFree(pChkpIdDir);
×
1367
  return code;
×
1368
}
1369

1370
int32_t taskDbBuildSnap(void* arg, SArray* pSnap) {
×
1371
  // vnode task->db
1372
  SStreamMeta* pMeta = arg;
×
1373

1374
  streamMutexLock(&pMeta->backendMutex);
×
1375
  void*   pIter = taosHashIterate(pMeta->pTaskDbUnique, NULL);
×
1376
  int32_t code = 0;
×
1377

1378
  while (pIter) {
×
1379
    STaskDbWrapper* pTaskDb = *(STaskDbWrapper**)pIter;
×
1380

1381
    void* p = taskDbAddRef(pTaskDb);
×
1382
    if (p == NULL) {
×
1383
      terrno = 0;
×
1384
      pIter = taosHashIterate(pMeta->pTaskDbUnique, pIter);
×
1385
      continue;
×
1386
    }
1387

1388
    // add chkpId to in-use-ckpkIdSet
1389
    taskDbRefChkp(pTaskDb, pTaskDb->chkpId);
×
1390

1391
    code = taskDbDoCheckpoint(pTaskDb, pTaskDb->chkpId, ((SStreamTask*)pTaskDb->pTask)->chkInfo.processedVer);
×
1392
    if (code != 0) {
×
1393
      // remove chkpId from in-use-ckpkIdSet
1394
      taskDbUnRefChkp(pTaskDb, pTaskDb->chkpId);
×
1395
      taskDbRemoveRef(pTaskDb);
×
1396
      break;
×
1397
    }
1398

1399
    SStreamTask*    pTask = pTaskDb->pTask;
×
1400
    SStreamTaskSnap snap = {.streamId = pTask->id.streamId,
×
1401
                            .taskId = pTask->id.taskId,
×
1402
                            .chkpId = pTaskDb->chkpId,
×
1403
                            .dbPrefixPath = taosStrdup(pTaskDb->path)};
×
1404
    if (snap.dbPrefixPath == NULL) {
×
1405
      // remove chkpid from chkp-in-use set
1406
      taskDbUnRefChkp(pTaskDb, pTaskDb->chkpId);
×
1407
      taskDbRemoveRef(pTaskDb);
×
1408
      code = terrno;
×
1409
      break;
×
1410
    }
1411
    if (taosArrayPush(pSnap, &snap) == NULL) {
×
1412
      taskDbUnRefChkp(pTaskDb, pTaskDb->chkpId);
×
1413
      taskDbRemoveRef(pTaskDb);
×
1414
      code = terrno;
×
1415
      break;
×
1416
    }
1417

1418
    taskDbRemoveRef(pTaskDb);
×
1419
    pIter = taosHashIterate(pMeta->pTaskDbUnique, pIter);
×
1420
  }
1421
  streamMutexUnlock(&pMeta->backendMutex);
×
1422
  return code;
×
1423
}
1424
int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo) {
×
1425
  if (pSnapInfo == NULL) return 0;
×
1426
  SStreamMeta* pMeta = arg;
×
1427
  int32_t      code = 0;
×
1428
  int32_t      cap = 256;
×
1429
  int32_t      nBytes = 0;
×
1430
  streamMutexLock(&pMeta->backendMutex);
×
1431

1432
  char buf[256] = {0};
×
1433
  for (int i = 0; i < taosArrayGetSize(pSnapInfo); i++) {
×
1434
    SStreamTaskSnap* pSnap = taosArrayGet(pSnapInfo, i);
×
1435
    nBytes = snprintf(buf, cap, "0x%" PRIx64 "-0x%x", pSnap->streamId, (int32_t)pSnap->taskId);
×
1436
    if (nBytes <= 0 || nBytes >= cap) {
×
1437
      code = TSDB_CODE_OUT_OF_RANGE;
×
1438
      break;
×
1439
    }
1440
    STaskDbWrapper** pTaskDb = taosHashGet(pMeta->pTaskDbUnique, buf, strlen(buf));
×
1441
    if (pTaskDb == NULL || *pTaskDb == NULL) {
×
1442
      stWarn("stream backend:%p failed to find task db, streamId:% " PRId64, pMeta, pSnap->streamId);
×
1443
      memset(buf, 0, sizeof(buf));
×
1444
      continue;
×
1445
    }
1446
    memset(buf, 0, sizeof(buf));
×
1447

1448
    taskDbUnRefChkp(*pTaskDb, pSnap->chkpId);
×
1449
  }
1450
  streamMutexUnlock(&pMeta->backendMutex);
×
1451
  return code;
×
1452
}
1453
#ifdef BUILD_NO_CALL
1454
int32_t streamBackendAddInUseChkp(void* arg, int64_t chkpId) {
1455
  // if (arg == NULL) return 0;
1456

1457
  // SStreamMeta* pMeta = arg;
1458
  // taosWLockLatch(&pMeta->chkpDirLock);
1459
  // taosArrayPush(pMeta->chkpInUse, &chkpId);
1460
  // taosWUnLockLatch(&pMeta->chkpDirLock);
1461
  return 0;
1462
}
1463
int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId) {
1464
  return 0;
1465
  // if (arg == NULL) return 0;
1466

1467
  // SStreamMeta* pMeta = arg;
1468
  // taosWLockLatch(&pMeta->chkpDirLock);
1469
  // if (taosArrayGetSize(pMeta->chkpInUse) > 0) {
1470
  //   int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpInUse, 0);
1471
  //   if (id == chkpId) {
1472
  //     taosArrayPopFrontBatch(pMeta->chkpInUse, 1);
1473
  //   }
1474
  // }
1475
  // taosWUnLockLatch(&pMeta->chkpDirLock);
1476
}
1477
#endif
1478

1479
/*
1480
   0
1481
*/
1482

1483
void* taskAcquireDb(int64_t refId) {
1✔
1484
  // acquire
1485
  void* p = taosAcquireRef(taskDbWrapperId, refId);
1✔
1486
  return p;
1✔
1487
}
1488
void taskReleaseDb(int64_t refId) {
1✔
1489
  // release
1490
  TAOS_UNUSED(taosReleaseRef(taskDbWrapperId, refId));
1✔
1491
}
1✔
1492

1493
int64_t taskGetDBRef(void* arg) {
2✔
1494
  if (arg == NULL) return -1;
2!
1495
  STaskDbWrapper* pDb = arg;
2✔
1496
  return pDb->refId;
2✔
1497
}
1498

1499
int32_t chkpLoadExtraInfo(char* pChkpIdDir, int64_t* chkpId, int64_t* processId) {
25✔
1500
  TdFilePtr pFile = NULL;
25✔
1501
  int32_t   code = 0;
25✔
1502

1503
  char    buf[256] = {0};
25✔
1504
  int32_t nBytes = 0;
25✔
1505

1506
  int32_t len = strlen(pChkpIdDir);
25✔
1507
  if (len == 0) {
25!
1508
    code = TSDB_CODE_INVALID_PARA;
×
1509
    stError("failed to load extra info, dir:%s, reason:%s", pChkpIdDir, tstrerror(code));
×
1510
    return code;
×
1511
  }
1512

1513
  int32_t cap = len + 64;
25✔
1514
  char*   pDst = taosMemoryCalloc(1, cap);
25!
1515
  if (pDst == NULL) {
25!
1516
    code = terrno;
×
1517
    stError("failed to alloc memory to load extra info, dir:%s", pChkpIdDir);
×
1518
    goto _EXIT;
×
1519
  }
1520

1521
  nBytes = snprintf(pDst, cap, "%s%sinfo", pChkpIdDir, TD_DIRSEP);
25✔
1522
  if (nBytes <= 0 || nBytes >= cap) {
25!
1523
    code = TSDB_CODE_OUT_OF_RANGE;
×
1524
    stError("failed to build dst to load extra info, dir:%s", pChkpIdDir);
×
1525
    goto _EXIT;
×
1526
  }
1527

1528
  pFile = taosOpenFile(pDst, TD_FILE_READ);
25✔
1529
  if (pFile == NULL) {
25✔
1530
    // compatible with previous version
1531
    *processId = -1;
19✔
1532
    code = 0;
19✔
1533
    stWarn("failed to open file to load extra info, file:%s, reason:%s", pDst, tstrerror(terrno));
19!
1534
    goto _EXIT;
19✔
1535
  }
1536

1537
  if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) {
6!
1538
    code = terrno;
×
1539
    stError("failed to read file to load extra info, file:%s, reason:%s", pDst, tstrerror(code));
×
1540
    goto _EXIT;
×
1541
  }
1542

1543
  if (sscanf(buf, "%" PRId64 " %" PRId64, chkpId, processId) < 2) {
6!
1544
    code = TSDB_CODE_INVALID_PARA;
×
1545
    stError("failed to read file content to load extra info, file:%s, reason:%s", pDst, tstrerror(code));
×
1546
    goto _EXIT;
×
1547
  }
1548
  code = 0;
6✔
1549
_EXIT:
25✔
1550
  taosMemoryFree(pDst);
25!
1551
  TAOS_UNUSED(taosCloseFile(&pFile));
25✔
1552
  return code;
25✔
1553
}
1554
int32_t chkpAddExtraInfo(char* pChkpIdDir, int64_t chkpId, int64_t processId) {
8✔
1555
  int32_t code = 0;
8✔
1556

1557
  TdFilePtr pFile = NULL;
8✔
1558

1559
  char    buf[256] = {0};
8✔
1560
  int32_t nBytes = 0;
8✔
1561

1562
  int32_t len = strlen(pChkpIdDir);
8✔
1563
  if (len == 0) {
8!
1564
    code = TSDB_CODE_INVALID_PARA;
×
1565
    stError("failed to add extra info, dir:%s, reason:%s", pChkpIdDir, tstrerror(code));
×
1566
    return code;
×
1567
  }
1568

1569
  int32_t cap = len + 64;
8✔
1570
  char*   pDst = taosMemoryCalloc(1, cap);
8!
1571
  if (pDst == NULL) {
8!
1572
    code = terrno;
×
1573
    stError("failed to alloc memory to add extra info, dir:%s", pChkpIdDir);
×
1574
    goto _EXIT;
×
1575
  }
1576

1577
  nBytes = snprintf(pDst, cap, "%s%sinfo", pChkpIdDir, TD_DIRSEP);
8✔
1578
  if (nBytes <= 0 || nBytes >= cap) {
8!
1579
    code = TSDB_CODE_OUT_OF_RANGE;
×
1580
    stError("failed to build dst to add extra info, dir:%s, reason:%s", pChkpIdDir, tstrerror(code));
×
1581
    goto _EXIT;
×
1582
  }
1583

1584
  pFile = taosOpenFile(pDst, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
8✔
1585
  if (pFile == NULL) {
8!
1586
    code = terrno;
×
1587
    stError("failed to open file to add extra info, file:%s, reason:%s", pDst, tstrerror(code));
×
1588
    goto _EXIT;
×
1589
  }
1590

1591
  nBytes = snprintf(buf, sizeof(buf), "%" PRId64 " %" PRId64, chkpId, processId);
8✔
1592
  if (nBytes <= 0 || nBytes >= sizeof(buf)) {
8!
1593
    code = TSDB_CODE_OUT_OF_RANGE;
×
1594
    stError("failed to build content to add extra info, dir:%s,reason:%s", pChkpIdDir, tstrerror(code));
×
1595
    goto _EXIT;
×
1596
  }
1597

1598
  if (nBytes != taosWriteFile(pFile, buf, nBytes)) {
8!
1599
    code = terrno;
×
1600
    stError("failed to write file to add extra info, file:%s, reason:%s", pDst, tstrerror(code));
×
1601
    goto _EXIT;
×
1602
  }
1603
  code = 0;
8✔
1604

1605
_EXIT:
8✔
1606
  TAOS_UNUSED(taosCloseFile(&pFile));
8✔
1607
  taosMemoryFree(pDst);
8!
1608
  return code;
8✔
1609
}
1610
int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId, int64_t processId) {
8✔
1611
  STaskDbWrapper* pTaskDb = arg;
8✔
1612
  int64_t         st = taosGetTimestampMs();
8✔
1613
  int32_t         code = 0;
8✔
1614
  int64_t         refId = pTaskDb->refId;
8✔
1615

1616
  if (taosAcquireRef(taskDbWrapperId, refId) == NULL) {
8!
1617
    code = terrno;
×
1618
    return code;
×
1619
  }
1620

1621
  char* pChkpDir = NULL;
8✔
1622
  char* pChkpIdDir = NULL;
8✔
1623
  if ((code = chkpPreBuildDir(pTaskDb->path, chkpId, &pChkpDir, &pChkpIdDir)) < 0) {
8!
1624
    goto _EXIT;
×
1625
  }
1626
  // Get all cf and acquire cfWrappter
1627
  rocksdb_column_family_handle_t** ppCf = NULL;
8✔
1628

1629
  int32_t nCf = chkpGetAllDbCfHandle2(pTaskDb, &ppCf);
8✔
1630
  stDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pTaskDb, pChkpIdDir, nCf);
8✔
1631

1632
  int64_t written = atomic_load_64(&pTaskDb->dataWritten);
8✔
1633

1634
  // flush db
1635
  if (written > 0) {
8✔
1636
    stDebug("stream backend:%p start to flush db at:%s, data written:%" PRId64, pTaskDb, pChkpIdDir, written);
5✔
1637
    code = chkpPreFlushDb(pTaskDb->db, ppCf, nCf);
5✔
1638
    if (code != 0) goto _EXIT;
5!
1639
  } else {
1640
    stDebug("stream backend:%p not need flush db at:%s, data written:%" PRId64, pTaskDb, pChkpIdDir, written);
3!
1641
  }
1642

1643
  // do checkpoint
1644
  if ((code = chkpDoDbCheckpoint(pTaskDb->db, pChkpIdDir)) != 0) {
8!
1645
    stError("stream backend:%p failed to do checkpoint at:%s", pTaskDb, pChkpIdDir);
×
1646
    goto _EXIT;
×
1647
  } else {
1648
    stDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pTaskDb, pChkpIdDir,
11✔
1649
            taosGetTimestampMs() - st);
1650
  }
1651

1652
  // add extra info to checkpoint
1653
  if ((code = chkpAddExtraInfo(pChkpIdDir, chkpId, processId)) != 0) {
8!
1654
    stError("stream backend:%p failed to add extra info to checkpoint at:%s", pTaskDb, pChkpIdDir);
×
1655
    goto _EXIT;
×
1656
  }
1657

1658
  // delete ttl checkpoint
1659
  code = chkpMayDelObsolete(pTaskDb, chkpId, pChkpDir);
8✔
1660
  if (code < 0) {
8!
1661
    goto _EXIT;
×
1662
  }
1663

1664
  TAOS_UNUSED(atomic_store_64(&pTaskDb->dataWritten, 0));
8✔
1665
  pTaskDb->chkpId = chkpId;
8✔
1666

1667
_EXIT:
8✔
1668

1669
  // clear checkpoint dir if failed
1670
  if (code != 0 && pChkpDir != NULL) {
8!
1671
    if (taosDirExist(pChkpIdDir)) {
×
1672
      taosRemoveDir(pChkpIdDir);
×
1673
    }
1674
  }
1675
  taosMemoryFree(pChkpIdDir);
8!
1676
  taosMemoryFree(pChkpDir);
8!
1677

1678
  TAOS_UNUSED(taosReleaseRef(taskDbWrapperId, refId));
8✔
1679
  taosMemoryFree(ppCf);
8!
1680
  return code;
8✔
1681
}
1682

1683
int32_t streamBackendDoCheckpoint(void* arg, int64_t chkpId, int64_t processVer) {
3✔
1684
  return taskDbDoCheckpoint(arg, chkpId, processVer);
3✔
1685
}
1686

1687
SListNode* streamBackendAddCompare(void* backend, void* arg) {
2✔
1688
  SBackendWrapper* pHandle = (SBackendWrapper*)backend;
2✔
1689
  SListNode*       node = NULL;
2✔
1690
  streamMutexLock(&pHandle->mutex);
2✔
1691
  node = tdListAdd(pHandle->list, arg);
2✔
1692
  streamMutexUnlock(&pHandle->mutex);
2✔
1693
  return node;
2✔
1694
}
1695
void streamBackendDelCompare(void* backend, void* arg) {
×
1696
  SBackendWrapper* pHandle = (SBackendWrapper*)backend;
×
1697
  SListNode*       node = NULL;
×
1698
  streamMutexLock(&pHandle->mutex);
×
1699
  node = tdListPopNode(pHandle->list, arg);
×
1700
  streamMutexUnlock(&pHandle->mutex);
×
1701
  if (node) {
×
1702
    streamStateDestroyCompar(node->data);
×
1703
    taosMemoryFree(node);
×
1704
  }
1705
}
×
1706
void destroyRocksdbCfInst(RocksdbCfInst* inst) {
2✔
1707
  int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
2✔
1708
  if (inst->pHandle) {
2!
1709
    for (int i = 0; i < cfLen; i++) {
16✔
1710
      if (inst->pHandle[i]) rocksdb_column_family_handle_destroy((inst->pHandle)[i]);
14✔
1711
    }
1712
    taosMemoryFree(inst->pHandle);
2!
1713
  }
1714

1715
  if (inst->cfOpt) {
2!
1716
    for (int i = 0; i < cfLen; i++) {
16✔
1717
      rocksdb_options_destroy(inst->cfOpt[i]);
14✔
1718
      rocksdb_block_based_options_destroy(((RocksdbCfParam*)inst->param)[i].tableOpt);
14✔
1719
    }
1720
    taosMemoryFreeClear(inst->cfOpt);
2!
1721
    taosMemoryFreeClear(inst->param);
2!
1722
  }
1723
  if (inst->wOpt) rocksdb_writeoptions_destroy(inst->wOpt);
2!
1724
  if (inst->rOpt) rocksdb_readoptions_destroy(inst->rOpt);
2!
1725

1726
  taosMemoryFree(inst);
2!
1727
}
2✔
1728

1729
// |key|-----value------|
1730
// |key|ttl|len|userData
1731

1732
int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
1,833✔
1733
  int len = aLen < bLen ? aLen : bLen;
1,833✔
1734
  int ret = memcmp(aBuf, bBuf, len);
1,833✔
1735
  if (ret == 0) {
1,833✔
1736
    if (aLen < bLen)
316✔
1737
      return -1;
66✔
1738
    else if (aLen > bLen)
250✔
1739
      return 1;
39✔
1740
    else
1741
      return 0;
211✔
1742
  } else {
1743
    return ret;
1,517✔
1744
  }
1745
}
1746
int streamStateValueIsStale(char* v) {
1,342✔
1747
  int64_t ts = 0;
1,342!
1748
  TAOS_UNUSED(taosDecodeFixedI64(v, &ts));
1749
  return (ts != 0 && ts < taosGetTimestampMs()) ? 1 : 0;
1,342!
1750
}
1751
int iterValueIsStale(rocksdb_iterator_t* iter) {
424✔
1752
  size_t len;
1753
  char*  v = (char*)rocksdb_iter_value(iter, &len);
424✔
1754
  return streamStateValueIsStale(v);
424✔
1755
}
1756
int defaultKeyEncode(void* k, char* buf) {
257✔
1757
  int len = strlen((char*)k);
257✔
1758
  memcpy(buf, (char*)k, len);
257✔
1759
  return len;
257✔
1760
}
1761
int defaultKeyDecode(void* k, char* buf) {
×
1762
  int len = strlen(buf);
×
1763
  memcpy(k, buf, len);
×
1764
  return len;
×
1765
}
1766
int defaultKeyToString(void* k, char* buf) {
104✔
1767
  // just to debug
1768
  return sprintf(buf, "key: %s", (char*)k);
104✔
1769
}
1770
//
1771
//  SStateKey
1772
//  |--groupid--|---ts------|--opNum----|
1773
//  |--uint64_t-|-uint64_t--|--int64_t--|
1774
//
1775
//
1776
//
1777
int stateKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
2,299✔
1778
  SStateKey key1, key2;
1779
  memset(&key1, 0, sizeof(key1));
2,299✔
1780
  memset(&key2, 0, sizeof(key2));
2,299✔
1781

1782
  char* p1 = (char*)aBuf;
2,299✔
1783
  char* p2 = (char*)bBuf;
2,299!
1784

1785
  p1 = taosDecodeFixedU64(p1, &key1.key.groupId);
2,299!
1786
  p2 = taosDecodeFixedU64(p2, &key2.key.groupId);
2,299!
1787

1788
  p1 = taosDecodeFixedI64(p1, &key1.key.ts);
2,299!
1789
  p2 = taosDecodeFixedI64(p2, &key2.key.ts);
2,299!
1790

1791
  TAOS_UNUSED(taosDecodeFixedI64(p1, &key1.opNum));
1792
  TAOS_UNUSED(taosDecodeFixedI64(p2, &key2.opNum));
1793

1794
  return stateKeyCmpr(&key1, sizeof(key1), &key2, sizeof(key2));
2,299✔
1795
}
1796

1797
int stateKeyEncode(void* k, char* buf) {
469✔
1798
  SStateKey* key = k;
469✔
1799
  int        len = 0;
469✔
1800
  len += taosEncodeFixedU64((void**)&buf, key->key.groupId);
469!
1801
  len += taosEncodeFixedI64((void**)&buf, key->key.ts);
469!
1802
  len += taosEncodeFixedI64((void**)&buf, key->opNum);
469!
1803
  return len;
469✔
1804
}
1805
int stateKeyDecode(void* k, char* buf) {
4✔
1806
  SStateKey* key = k;
4✔
1807
  int        len = 0;
4✔
1808
  char*      p = buf;
4✔
1809
  p = taosDecodeFixedU64(p, &key->key.groupId);
4!
1810
  p = taosDecodeFixedI64(p, &key->key.ts);
4!
1811
  p = taosDecodeFixedI64(p, &key->opNum);
4!
1812
  return p - buf;
4✔
1813
}
1814

1815
int32_t stateKeyToString(void* k, char* buf) {
253✔
1816
  SStateKey* key = k;
253✔
1817
  int        n = 0;
253✔
1818
  n += sprintf(buf + n, "[groupId:%" PRIu64 ",", key->key.groupId);
253✔
1819
  n += sprintf(buf + n, "ts:%" PRIi64 ",", key->key.ts);
253✔
1820
  n += sprintf(buf + n, "opNum:%" PRIi64 "]", key->opNum);
253✔
1821
  return n;
253✔
1822
}
1823

1824
//
1825
// SStateSessionKey
1826
//  |-----------SSessionKey----------|
1827
//  |-----STimeWindow-----|
1828
//  |---skey--|---ekey----|--groupId-|--opNum--|
1829
//  |---int64-|--int64_t--|--uint64--|--int64_t|
1830
// |
1831
//
1832
int stateSessionKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
4,198✔
1833
  SStateSessionKey w1, w2;
1834
  memset(&w1, 0, sizeof(w1));
4,198✔
1835
  memset(&w2, 0, sizeof(w2));
4,198✔
1836

1837
  char* p1 = (char*)aBuf;
4,198✔
1838
  char* p2 = (char*)bBuf;
4,198!
1839

1840
  p1 = taosDecodeFixedI64(p1, &w1.key.win.skey);
4,198!
1841
  p2 = taosDecodeFixedI64(p2, &w2.key.win.skey);
4,198!
1842

1843
  p1 = taosDecodeFixedI64(p1, &w1.key.win.ekey);
4,198!
1844
  p2 = taosDecodeFixedI64(p2, &w2.key.win.ekey);
4,198!
1845

1846
  p1 = taosDecodeFixedU64(p1, &w1.key.groupId);
4,198!
1847
  p2 = taosDecodeFixedU64(p2, &w2.key.groupId);
4,198!
1848

1849
  p1 = taosDecodeFixedI64(p1, &w1.opNum);
4,198!
1850
  p2 = taosDecodeFixedI64(p2, &w2.opNum);
4,198✔
1851

1852
  return stateSessionKeyCmpr(&w1, sizeof(w1), &w2, sizeof(w2));
4,198✔
1853
}
1854
int stateSessionKeyEncode(void* k, char* buf) {
408✔
1855
  SStateSessionKey* sess = k;
408✔
1856
  int               len = 0;
408✔
1857
  len += taosEncodeFixedI64((void**)&buf, sess->key.win.skey);
408!
1858
  len += taosEncodeFixedI64((void**)&buf, sess->key.win.ekey);
408!
1859
  len += taosEncodeFixedU64((void**)&buf, sess->key.groupId);
408!
1860
  len += taosEncodeFixedI64((void**)&buf, sess->opNum);
408!
1861
  return len;
408✔
1862
}
1863
int stateSessionKeyDecode(void* k, char* buf) {
409✔
1864
  SStateSessionKey* sess = k;
409✔
1865
  int               len = 0;
409✔
1866

1867
  char* p = buf;
409✔
1868
  p = taosDecodeFixedI64(p, &sess->key.win.skey);
409!
1869
  p = taosDecodeFixedI64(p, &sess->key.win.ekey);
409!
1870
  p = taosDecodeFixedU64(p, &sess->key.groupId);
409!
1871
  p = taosDecodeFixedI64(p, &sess->opNum);
409!
1872
  return p - buf;
409✔
1873
}
1874
int stateSessionKeyToString(void* k, char* buf) {
101✔
1875
  SStateSessionKey* key = k;
101✔
1876
  int               n = 0;
101✔
1877
  n += sprintf(buf + n, "[skey:%" PRIi64 ",", key->key.win.skey);
101✔
1878
  n += sprintf(buf + n, "ekey:%" PRIi64 ",", key->key.win.ekey);
101✔
1879
  n += sprintf(buf + n, "groupId:%" PRIu64 ",", key->key.groupId);
101✔
1880
  n += sprintf(buf + n, "opNum:%" PRIi64 "]", key->opNum);
101✔
1881
  return n;
101✔
1882
}
1883

1884
/**
1885
 *  SWinKey
1886
 *  |------groupId------|-----ts------|
1887
 *  |------uint64-------|----int64----|
1888
 */
1889
int winKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
3,319✔
1890
  SWinKey w1, w2;
1891
  memset(&w1, 0, sizeof(w1));
3,319✔
1892
  memset(&w2, 0, sizeof(w2));
3,319✔
1893

1894
  char* p1 = (char*)aBuf;
3,319✔
1895
  char* p2 = (char*)bBuf;
3,319!
1896

1897
  p1 = taosDecodeFixedU64(p1, &w1.groupId);
3,319!
1898
  p2 = taosDecodeFixedU64(p2, &w2.groupId);
3,319!
1899

1900
  p1 = taosDecodeFixedI64(p1, &w1.ts);
3,319!
1901
  p2 = taosDecodeFixedI64(p2, &w2.ts);
3,319✔
1902

1903
  return winKeyCmpr(&w1, sizeof(w1), &w2, sizeof(w2));
3,319✔
1904
}
1905

1906
int winKeyEncode(void* k, char* buf) {
305✔
1907
  SWinKey* key = k;
305✔
1908
  int      len = 0;
305✔
1909
  len += taosEncodeFixedU64((void**)&buf, key->groupId);
305!
1910
  len += taosEncodeFixedI64((void**)&buf, key->ts);
305!
1911
  return len;
305✔
1912
}
1913

1914
int winKeyDecode(void* k, char* buf) {
8✔
1915
  SWinKey* key = k;
8✔
1916
  int      len = 0;
8✔
1917
  char*    p = buf;
8✔
1918
  p = taosDecodeFixedU64(p, &key->groupId);
8!
1919
  p = taosDecodeFixedI64(p, &key->ts);
8!
1920
  return len;
8✔
1921
}
1922

1923
int winKeyToString(void* k, char* buf) {
102✔
1924
  SWinKey* key = k;
102✔
1925
  int      n = 0;
102✔
1926
  n += sprintf(buf + n, "[groupId:%" PRIu64 ",", key->groupId);
102✔
1927
  n += sprintf(buf + n, "ts:%" PRIi64 "]", key->ts);
102✔
1928
  return n;
102✔
1929
}
1930
/*
1931
 * STupleKey
1932
 * |---groupId---|---ts---|---exprIdx---|
1933
 * |---uint64--|---int64--|---int32-----|
1934
 */
1935
int tupleKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
2,978✔
1936
  STupleKey w1, w2;
1937
  memset(&w1, 0, sizeof(w1));
2,978✔
1938
  memset(&w2, 0, sizeof(w2));
2,978✔
1939

1940
  char* p1 = (char*)aBuf;
2,978✔
1941
  char* p2 = (char*)bBuf;
2,978!
1942

1943
  p1 = taosDecodeFixedU64(p1, &w1.groupId);
2,978!
1944
  p2 = taosDecodeFixedU64(p2, &w2.groupId);
2,978!
1945

1946
  p1 = taosDecodeFixedI64(p1, &w1.ts);
2,978!
1947
  p2 = taosDecodeFixedI64(p2, &w2.ts);
2,978!
1948

1949
  p1 = taosDecodeFixedI32(p1, &w1.exprIdx);
2,978!
1950
  p2 = taosDecodeFixedI32(p2, &w2.exprIdx);
2,978✔
1951

1952
  return STupleKeyCmpr(&w1, sizeof(w1), &w2, sizeof(w2));
2,978✔
1953
}
1954

1955
int tupleKeyEncode(void* k, char* buf) {
300✔
1956
  STupleKey* key = k;
300✔
1957
  int        len = 0;
300✔
1958
  len += taosEncodeFixedU64((void**)&buf, key->groupId);
300!
1959
  len += taosEncodeFixedI64((void**)&buf, key->ts);
300!
1960
  len += taosEncodeFixedI32((void**)&buf, key->exprIdx);
300!
1961
  return len;
300✔
1962
}
1963
int tupleKeyDecode(void* k, char* buf) {
×
1964
  STupleKey* key = k;
×
1965
  int        len = 0;
×
1966
  char*      p = buf;
×
1967
  p = taosDecodeFixedU64(p, &key->groupId);
×
1968
  p = taosDecodeFixedI64(p, &key->ts);
×
1969
  p = taosDecodeFixedI32(p, &key->exprIdx);
×
1970
  return len;
×
1971
}
1972
int tupleKeyToString(void* k, char* buf) {
100✔
1973
  int        n = 0;
100✔
1974
  STupleKey* key = k;
100✔
1975
  n += sprintf(buf + n, "[groupId:%" PRIu64 ",", key->groupId);
100✔
1976
  n += sprintf(buf + n, "ts:%" PRIi64 ",", key->ts);
100✔
1977
  n += sprintf(buf + n, "exprIdx:%d]", key->exprIdx);
100✔
1978
  return n;
100✔
1979
}
1980

1981
int parKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
4,799✔
1982
  int64_t w1, w2;
1983
  memset(&w1, 0, sizeof(w1));
1984
  memset(&w2, 0, sizeof(w2));
1985
  char* p1 = (char*)aBuf;
4,799✔
1986
  char* p2 = (char*)bBuf;
4,799!
1987

1988
  TAOS_UNUSED(taosDecodeFixedI64(p1, &w1));
1989
  TAOS_UNUSED(taosDecodeFixedI64(p2, &w2));
1990
  if (w1 == w2) {
4,799✔
1991
    return 0;
734✔
1992
  } else {
1993
    return w1 < w2 ? -1 : 1;
4,065✔
1994
  }
1995
}
1996
int parKeyEncode(void* k, char* buf) {
694✔
1997
  int64_t* groupid = k;
694✔
1998
  int      len = taosEncodeFixedI64((void**)&buf, *groupid);
694!
1999
  return len;
694✔
2000
}
2001
int parKeyDecode(void* k, char* buf) {
×
2002
  char*    p = buf;
×
2003
  int64_t* groupid = k;
×
2004

2005
  p = taosDecodeFixedI64(p, groupid);
×
2006
  return p - buf;
×
2007
}
2008
int parKeyToString(void* k, char* buf) {
210✔
2009
  int64_t* key = k;
210✔
2010
  int      n = 0;
210✔
2011
  n = sprintf(buf + n, "[groupId:%" PRIi64 "]", *key);
210✔
2012
  return n;
210✔
2013
}
2014
int32_t valueToString(void* k, char* buf) {
×
2015
  SStreamValue* key = k;
×
2016
  int           n = 0;
×
2017
  n += sprintf(buf + n, "[unixTimestamp:%" PRIi64 ",", key->unixTimestamp);
×
2018
  n += sprintf(buf + n, "len:%d,", key->len);
×
2019
  n += sprintf(buf + n, "data:%s]", key->data);
×
2020
  return n;
×
2021
}
2022

2023
/*1: stale, 0: no stale*/
2024
int32_t valueIsStale(void* k, int64_t ts) {
×
2025
  SStreamValue* key = k;
×
2026
  if (key->unixTimestamp < ts) {
×
2027
    return 1;
×
2028
  }
2029
  return 0;
×
2030
}
2031

2032
void destroyCompare(void* arg) {
126✔
2033
  TAOS_UNUSED(arg);
2034
  return;
126✔
2035
}
2036

2037
int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest) {
844✔
2038
  int32_t      code = 0;
844✔
2039
  SStreamValue key = {.unixTimestamp = ttl, .len = vlen, .rawLen = vlen, .compress = 0, .data = (char*)(value)};
844✔
2040
  int32_t      len = 0;
844✔
2041
  char*        dst = NULL;
844✔
2042
  if (vlen > 512) {
844✔
2043
    dst = taosMemoryCalloc(1, vlen + 128);
2!
2044
    if (dst == NULL) {
2!
2045
      return terrno;
×
2046
    }
2047
    int32_t dstCap = vlen + 128;
2✔
2048
    int32_t compressedSize = LZ4_compress_default((char*)value, dst, vlen, dstCap);
2✔
2049
    if (compressedSize < vlen) {
2!
2050
      key.compress = 1;
2✔
2051
      key.len = compressedSize;
2✔
2052
      value = dst;
2✔
2053
    }
2054
  }
2055

2056
  if (*dest == NULL) {
844✔
2057
    size_t size = sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len;
836✔
2058
    char*  p = taosMemoryCalloc(1, size);
836!
2059
    if (p == NULL) {
836!
2060
      code = terrno;
×
2061
      goto _exception;
×
2062
    }
2063
    char* buf = p;
836✔
2064
    len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp);
836!
2065
    len += taosEncodeFixedI32((void**)&buf, key.len);
836!
2066
    len += taosEncodeFixedI32((void**)&buf, key.rawLen);
836!
2067
    len += taosEncodeFixedI8((void**)&buf, key.compress);
836!
2068
    if (value != NULL && key.len != 0) {
836✔
2069
      len += taosEncodeBinary((void**)&buf, (char*)value, key.len);
1,632!
2070
    }
2071
    *dest = p;
836✔
2072
  } else {
2073
    char* buf = *dest;
8✔
2074
    len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp);
8!
2075
    len += taosEncodeFixedI32((void**)&buf, key.len);
8!
2076
    len += taosEncodeFixedI32((void**)&buf, key.rawLen);
8!
2077
    len += taosEncodeFixedI8((void**)&buf, key.compress);
8!
2078
    if (value != NULL && key.len != 0) {
8!
2079
      len += taosEncodeBinary((void**)&buf, (char*)value, key.len);
16!
2080
    }
2081
  }
2082

2083
  taosMemoryFree(dst);
844!
2084
  return len;
844✔
2085
_exception:
×
2086
  taosMemoryFree(dst);
×
2087
  return code;
×
2088
}
2089

2090
/*
2091
 *  ret >= 0 : found valid value
2092
 *  ret < 0 : error or timeout
2093
 */
2094
int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest) {
914✔
2095
  int32_t      code = -1;
914✔
2096
  SStreamValue key = {0};
914✔
2097
  char*        p = value;
914✔
2098

2099
  char* pCompressData = NULL;
914✔
2100
  char* pOutput = NULL;
914✔
2101
  if (streamStateValueIsStale(p)) {
914!
2102
    code = TSDB_CODE_INVALID_DATA_FMT;
×
2103
    goto _EXCEPT;
×
2104
  }
2105

2106
  p = taosDecodeFixedI64(p, &key.unixTimestamp);
914!
2107
  p = taosDecodeFixedI32(p, &key.len);
914✔
2108
  if (key.len == 0) {
914!
2109
    code = 0;
×
2110
    goto _EXCEPT;
×
2111
  }
2112
  if (vlen == (sizeof(key.unixTimestamp) + sizeof(key.len) + key.len)) {
914!
2113
    // compatiable with previous data
2114
    p = taosDecodeBinary(p, (void**)&pOutput, key.len);
×
2115
    if (p == NULL) {
×
2116
      code = terrno;
×
2117
      goto _EXCEPT;
×
2118
    }
2119

2120
  } else {
2121
    p = taosDecodeFixedI32(p, &key.rawLen);
914✔
2122
    p = taosDecodeFixedI8(p, &key.compress);
914✔
2123
    if (vlen != (sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len)) {
914!
2124
      stError("vlen: %d, read len: %d", vlen, key.len);
×
2125
      code = TSDB_CODE_INVALID_DATA_FMT;
×
2126
      goto _EXCEPT;
×
2127
    }
2128
    if (key.compress == 1) {
914✔
2129
      p = taosDecodeBinary(p, (void**)&pCompressData, key.len);
2!
2130
      if (p == NULL) {
2!
2131
        code = terrno;
×
2132
        goto _EXCEPT;
×
2133
      }
2134
      pOutput = taosMemoryCalloc(1, key.rawLen);
2!
2135
      if (pOutput == NULL) {
2!
2136
        code = terrno;
×
2137
        goto _EXCEPT;
×
2138
      }
2139

2140
      int32_t rawLen = LZ4_decompress_safe(pCompressData, pOutput, key.len, key.rawLen);
2✔
2141
      if (rawLen != key.rawLen) {
2!
2142
        stError("read invalid read, rawlen: %d, currlen: %d", key.rawLen, key.len);
×
2143
        code = TSDB_CODE_INVALID_DATA_FMT;
×
2144
        goto _EXCEPT;
×
2145
      }
2146
      key.len = rawLen;
2✔
2147
    } else {
2148
      p = taosDecodeBinary(p, (void**)&pOutput, key.len);
912!
2149
      if (p == NULL) {
912!
2150
        code = terrno;
×
2151
        goto _EXCEPT;
×
2152
      }
2153
    }
2154
  }
2155

2156
  if (ttl != NULL) *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - taosGetTimestampMs();
914!
2157

2158
  code = 0;
914✔
2159
  if (dest) {
914✔
2160
    *dest = pOutput;
713✔
2161
    pOutput = NULL;
713✔
2162
  }
2163
  taosMemoryFree(pCompressData);
914!
2164
  taosMemoryFree(pOutput);
914!
2165
  return key.len;
914✔
2166

2167
_EXCEPT:
×
2168
  if (dest != NULL) *dest = NULL;
×
2169
  if (ttl != NULL) *ttl = 0;
×
2170

2171
  taosMemoryFree(pOutput);
×
2172
  taosMemoryFree(pCompressData);
×
2173
  return code;
×
2174
}
2175

2176
const char* compareDefaultName(void* arg) {
309✔
2177
  TAOS_UNUSED(arg);
2178
  return ginitDict[0].key;
309✔
2179
}
2180
const char* compareStateName(void* arg) {
130✔
2181
  TAOS_UNUSED(arg);
2182
  return ginitDict[1].key;
130✔
2183
}
2184
const char* compareWinKeyName(void* arg) { return ginitDict[2].key; }
14✔
2185
const char* compareSessionKeyName(void* arg) {
12✔
2186
  TAOS_UNUSED(arg);
2187
  return ginitDict[3].key;
12✔
2188
}
2189
const char* compareFuncKeyName(void* arg) {
15✔
2190
  TAOS_UNUSED(arg);
2191
  return ginitDict[4].key;
15✔
2192
}
2193
const char* compareParKeyName(void* arg) {
29✔
2194
  TAOS_UNUSED(arg);
2195
  return ginitDict[5].key;
29✔
2196
}
2197
const char* comparePartagKeyName(void* arg) {
×
2198
  TAOS_UNUSED(arg);
2199
  return ginitDict[6].key;
×
2200
}
2201

2202
void destroyCompactFilteFactory(void* arg) {
131✔
2203
  if (arg == NULL) return;
131!
2204
}
2205
const char* compactFilteFactoryName(void* arg) {
198✔
2206
  SCompactFilteFactory* state = arg;
198✔
2207
  return "stream_compact_factory_filter_default";
198✔
2208
}
2209
const char* compactFilteFactoryNameSess(void* arg) {
7✔
2210
  SCompactFilteFactory* state = arg;
7✔
2211
  return "stream_compact_factory_filter_sess";
7✔
2212
}
2213
const char* compactFilteFactoryNameState(void* arg) {
64✔
2214
  SCompactFilteFactory* state = arg;
64✔
2215
  return "stream_compact_factory_filter_state";
64✔
2216
}
2217
const char* compactFilteFactoryNameFill(void* arg) {
8✔
2218
  SCompactFilteFactory* state = arg;
8✔
2219
  return "stream_compact_factory_filter_fill";
8✔
2220
}
2221
const char* compactFilteFactoryNameFunc(void* arg) {
9✔
2222
  SCompactFilteFactory* state = arg;
9✔
2223
  return "stream_compact_factory_filter_func";
9✔
2224
}
2225

2226
void          destroyCompactFilte(void* arg) { TAOS_UNUSED(arg); }
7✔
2227
unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
4✔
2228
                           char** newval, size_t* newvlen, unsigned char* value_changed) {
2229
  return streamStateValueIsStale((char*)val) ? 1 : 0;
4✔
2230
}
2231
const char* compactFilteName(void* arg) { return "stream_filte_default"; }
×
2232
const char* compactFilteNameSess(void* arg) { return "stream_filte_sess"; }
×
2233
const char* compactFilteNameState(void* arg) { return "stream_filte_state"; }
×
2234
const char* compactFilteNameFill(void* arg) { return "stream_filte_fill"; }
×
2235
const char* compactFilteNameFunc(void* arg) { return "stream_filte_func"; }
×
2236

2237
unsigned char compactFilteSess(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
×
2238
                               char** newval, size_t* newvlen, unsigned char* value_changed) {
2239
  // not impl yet
2240
  return 0;
×
2241
}
2242

2243
unsigned char compactFilteState(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
8✔
2244
                                char** newval, size_t* newvlen, unsigned char* value_changed) {
2245
  // not impl yet
2246
  return 0;
8✔
2247
}
2248

2249
unsigned char compactFilteFill(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
×
2250
                               char** newval, size_t* newvlen, unsigned char* value_changed) {
2251
  // not impl yet
2252
  return 0;
×
2253
}
2254

2255
unsigned char compactFilteFunc(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
×
2256
                               char** newval, size_t* newvlen, unsigned char* value_changed) {
2257
  // not impl yet
2258
  return 0;
×
2259
  // return streamStateValueIsStale((char*)val) ? 1 : 0;
2260
}
2261

2262
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx) {
3✔
2263
  SCompactFilteFactory*       state = arg;
3✔
2264
  rocksdb_compactionfilter_t* filter =
2265
      rocksdb_compactionfilter_create(state, destroyCompactFilte, compactFilte, compactFilteName);
3✔
2266
  return filter;
3✔
2267
}
2268
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterSess(void* arg, rocksdb_compactionfiltercontext_t* ctx) {
×
2269
  SCompactFilteFactory*       state = arg;
×
2270
  rocksdb_compactionfilter_t* filter =
2271
      rocksdb_compactionfilter_create(state, destroyCompactFilte, compactFilteSess, compactFilteNameSess);
×
2272
  return filter;
×
2273
}
2274
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterState(void* arg, rocksdb_compactionfiltercontext_t* ctx) {
4✔
2275
  SCompactFilteFactory*       state = arg;
4✔
2276
  rocksdb_compactionfilter_t* filter =
2277
      rocksdb_compactionfilter_create(state, destroyCompactFilte, compactFilteState, compactFilteNameState);
4✔
2278
  return filter;
4✔
2279
}
2280
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFill(void* arg, rocksdb_compactionfiltercontext_t* ctx) {
×
2281
  SCompactFilteFactory*       state = arg;
×
2282
  rocksdb_compactionfilter_t* filter =
2283
      rocksdb_compactionfilter_create(state, destroyCompactFilte, compactFilteFill, compactFilteNameFill);
×
2284
  return filter;
×
2285
}
2286
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFunc(void* arg, rocksdb_compactionfiltercontext_t* ctx) {
×
2287
  SCompactFilteFactory*       state = arg;
×
2288
  rocksdb_compactionfilter_t* filter =
2289
      rocksdb_compactionfilter_create(state, destroyCompactFilte, compactFilteFunc, compactFilteNameFunc);
×
2290
  return filter;
×
2291
}
2292

2293
int32_t taskDbOpenCfs(STaskDbWrapper* pTask, char* path, char** pCfNames, int32_t nCf) {
22✔
2294
  int32_t code = -1;
22✔
2295
  char*   err = NULL;
22✔
2296

2297
  rocksdb_options_t** cfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*));
22!
2298
  if (cfOpts == NULL) {
22!
2299
    return terrno;
×
2300
  }
2301
  rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
22!
2302
  if (cfHandle == NULL) {
22!
2303
    return terrno;
×
2304
  }
2305

2306
  for (int i = 0; i < nCf; i++) {
48✔
2307
    int32_t idx = getCfIdx(pCfNames[i]);
26✔
2308
    cfOpts[i] = pTask->pCfOpts[idx];
26✔
2309
  }
2310

2311
  rocksdb_t* db = rocksdb_open_column_families(pTask->dbOpt, path, nCf, (const char* const*)pCfNames,
22✔
2312
                                               (const rocksdb_options_t* const*)cfOpts, cfHandle, &err);
2313

2314
  if (err != NULL) {
22!
2315
    stError("failed to open cf path: %s", err);
×
2316
    taosMemoryFree(err);
×
2317
    goto _EXIT;
×
2318
  }
2319

2320
  for (int i = 0; i < nCf; i++) {
48✔
2321
    int32_t idx = getCfIdx(pCfNames[i]);
26✔
2322
    pTask->pCf[idx] = cfHandle[i];
26✔
2323
  }
2324

2325
  pTask->db = db;
22✔
2326
  code = 0;
22✔
2327

2328
_EXIT:
22✔
2329
  taosMemoryFree(cfOpts);
22!
2330
  taosMemoryFree(cfHandle);
22!
2331
  return code;
22✔
2332
}
2333

2334
void* taskDbAddRef(void* pTaskDb) {
3✔
2335
  STaskDbWrapper* pBackend = pTaskDb;
3✔
2336
  return taosAcquireRef(taskDbWrapperId, pBackend->refId);
3✔
2337
}
2338

2339
void taskDbRemoveRef(void* pTaskDb) {
17✔
2340
  if (pTaskDb == NULL) {
17!
2341
    return;
×
2342
  }
2343

2344
  STaskDbWrapper* pBackend = pTaskDb;
17✔
2345
  TAOS_UNUSED(taosReleaseRef(taskDbWrapperId, pBackend->refId));
17✔
2346
}
2347

2348
void taskDbSetClearFileFlag(void* pTaskDb) {
×
2349
  if (pTaskDb == NULL) {
×
2350
    return;
×
2351
  }
2352

2353
  STaskDbWrapper* pBackend = pTaskDb;
×
2354
  atomic_store_8(&pBackend->removeAllFiles, 1);
×
2355
}
2356

2357
void taskDbInitOpt(STaskDbWrapper* pTaskDb) {
22✔
2358
  rocksdb_env_t* env = rocksdb_create_default_env();
22✔
2359

2360
  rocksdb_cache_t*   cache = rocksdb_cache_create_lru(256);
22✔
2361
  rocksdb_options_t* opts = rocksdb_options_create();
22✔
2362
  rocksdb_options_set_env(opts, env);
22✔
2363
  rocksdb_options_set_create_if_missing(opts, 1);
22✔
2364
  rocksdb_options_set_create_missing_column_families(opts, 1);
22✔
2365
  // rocksdb_options_set_max_total_wal_size(opts, dbMemLimit);
2366
  // rocksdb_options_set_ecycle_log_file_num(opts, 6);
2367
  rocksdb_options_set_max_write_buffer_number(opts, 3);
22✔
2368
  rocksdb_options_set_info_log_level(opts, 1);
22✔
2369
  rocksdb_options_set_db_write_buffer_size(opts, 256 << 20);
22✔
2370
  rocksdb_options_set_write_buffer_size(opts, 128 << 20);
22✔
2371
  rocksdb_options_set_atomic_flush(opts, 1);
22✔
2372

2373
  pTaskDb->dbOpt = opts;
22✔
2374
  pTaskDb->env = env;
22✔
2375
  pTaskDb->cache = cache;
22✔
2376
  pTaskDb->filterFactory = rocksdb_compactionfilterfactory_create(
22✔
2377
      NULL, destroyCompactFilteFactory, compactFilteFactoryCreateFilter, compactFilteFactoryName);
2378
  rocksdb_options_set_compaction_filter_factory(pTaskDb->dbOpt, pTaskDb->filterFactory);
22✔
2379
  pTaskDb->readOpt = rocksdb_readoptions_create();
22✔
2380
  pTaskDb->writeOpt = rocksdb_writeoptions_create();
22✔
2381
  rocksdb_writeoptions_disable_WAL(pTaskDb->writeOpt, 1);
22✔
2382

2383
  size_t nCf = sizeof(ginitDict) / sizeof(ginitDict[0]);
22✔
2384
  pTaskDb->pCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
22!
2385
  pTaskDb->pCfParams = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam));
22!
2386
  pTaskDb->pCfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*));
22!
2387
  pTaskDb->pCompares = taosMemoryCalloc(nCf, sizeof(rocksdb_comparator_t*));
22!
2388
  if (pTaskDb->pCf == NULL || pTaskDb->pCfParams == NULL || pTaskDb->pCfOpts == NULL || pTaskDb->pCompares == NULL) {
22!
2389
    stError("failed to alloc memory for cf");
×
2390
    taosMemoryFreeClear(pTaskDb->pCf);
×
2391
    taosMemoryFreeClear(pTaskDb->pCfParams);
×
2392
    taosMemoryFreeClear(pTaskDb->pCfOpts);
×
2393
    taosMemoryFreeClear(pTaskDb->pCompares);
×
2394
    return;
×
2395
  }
2396

2397
  for (int i = 0; i < nCf; i++) {
176✔
2398
    rocksdb_options_t*                   opt = rocksdb_options_create_copy(pTaskDb->dbOpt);
154✔
2399
    rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create();
154✔
2400
    rocksdb_block_based_options_set_block_cache(tableOpt, pTaskDb->cache);
154✔
2401
    rocksdb_block_based_options_set_partition_filters(tableOpt, 1);
154✔
2402

2403
    rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15);
154✔
2404
    rocksdb_block_based_options_set_filter_policy(tableOpt, filter);
154✔
2405

2406
    rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)opt, tableOpt);
154✔
2407

2408
    SCfInit* cfPara = &ginitDict[i];
154✔
2409

2410
    rocksdb_comparator_t* compare =
2411
        rocksdb_comparator_create(NULL, cfPara->destroyCmp, cfPara->cmpKey, cfPara->cmpName);
154✔
2412
    rocksdb_options_set_comparator((rocksdb_options_t*)opt, compare);
154✔
2413

2414
    rocksdb_compactionfilterfactory_t* filterFactory =
2415
        rocksdb_compactionfilterfactory_create(NULL, cfPara->destroyFilter, cfPara->createFilter, cfPara->funcName);
154✔
2416
    rocksdb_options_set_compaction_filter_factory(opt, filterFactory);
154✔
2417

2418
    pTaskDb->pCompares[i] = compare;
154✔
2419
    pTaskDb->pCfOpts[i] = opt;
154✔
2420
    pTaskDb->pCfParams[i].tableOpt = tableOpt;
154✔
2421
  }
2422
  return;
22✔
2423
}
2424
void taskDbInitChkpOpt(STaskDbWrapper* pTaskDb) {
22✔
2425
  pTaskDb->chkpId = -1;
22✔
2426
  pTaskDb->chkpCap = 4;
22✔
2427
  pTaskDb->chkpSaved = taosArrayInit(4, sizeof(int64_t));
22✔
2428
  TAOS_UNUSED(taskDbLoadChkpInfo(pTaskDb));
22✔
2429

2430
  pTaskDb->chkpInUse = taosArrayInit(4, sizeof(int64_t));
22✔
2431

2432
  TAOS_UNUSED(taosThreadRwlockInit(&pTaskDb->chkpDirLock, NULL));
22✔
2433
}
22✔
2434

2435
void taskDbRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) {
3✔
2436
  TAOS_UNUSED(taosThreadRwlockWrlock(&pTaskDb->chkpDirLock));
3✔
2437
  if (taosArrayPush(pTaskDb->chkpInUse, &chkp) == NULL) {
6!
2438
    stError("failed to push chkp: %" PRIi64 " into inuse", chkp);
×
2439
  }
2440
  taosArraySort(pTaskDb->chkpInUse, chkpIdComp);
3✔
2441
  TAOS_UNUSED(taosThreadRwlockUnlock(&pTaskDb->chkpDirLock));
3✔
2442
}
3✔
2443

2444
void taskDbUnRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) {
3✔
2445
  TAOS_UNUSED(taosThreadRwlockWrlock(&pTaskDb->chkpDirLock));
3✔
2446
  int32_t size = taosArrayGetSize(pTaskDb->chkpInUse);
3✔
2447
  for (int i = 0; i < size; i++) {
3!
2448
    int64_t* p = taosArrayGet(pTaskDb->chkpInUse, i);
3✔
2449
    if (*p == chkp) {
3!
2450
      taosArrayRemove(pTaskDb->chkpInUse, i);
3✔
2451
      break;
3✔
2452
    }
2453
  }
2454
  TAOS_UNUSED(taosThreadRwlockUnlock(&pTaskDb->chkpDirLock));
3✔
2455
}
3✔
2456

2457
void taskDbDestroyChkpOpt(STaskDbWrapper* pTaskDb) {
16✔
2458
  taosArrayDestroy(pTaskDb->chkpSaved);
16✔
2459
  taosArrayDestroy(pTaskDb->chkpInUse);
16✔
2460
  TAOS_UNUSED(taosThreadRwlockDestroy(&pTaskDb->chkpDirLock));
16✔
2461
}
16✔
2462

2463
int32_t taskDbBuildFullPath(char* path, char* key, char** dbFullPath, char** stateFullPath) {
×
2464
  int32_t code = 0;
×
2465
  int32_t cap = strlen(path) + 128, nBytes = 0;
×
2466
  char*   statePath = NULL;
×
2467
  char*   dbPath = NULL;
×
2468

2469
  statePath = taosMemoryCalloc(1, cap);
×
2470
  if (statePath == NULL) {
×
2471
    TAOS_CHECK_GOTO(terrno, NULL, _err);
×
2472
  }
2473

2474
  nBytes = snprintf(statePath, cap, "%s%s%s", path, TD_DIRSEP, key);
×
2475
  if (nBytes < 0 || nBytes >= cap) {
×
2476
    code = TSDB_CODE_OUT_OF_RANGE;
×
2477
    TAOS_CHECK_GOTO(code, NULL, _err);
×
2478
  }
2479

2480
  if (!taosDirExist(statePath)) {
×
2481
    code = taosMulMkDir(statePath);
×
2482
    TAOS_CHECK_GOTO(code, NULL, _err);
×
2483
  }
2484

2485
  dbPath = taosMemoryCalloc(1, cap);
×
2486
  if (dbPath == NULL) {
×
2487
    TAOS_CHECK_GOTO(terrno, NULL, _err);
×
2488
  }
2489
  nBytes = snprintf(dbPath, cap, "%s%s%s", statePath, TD_DIRSEP, "state");
×
2490
  if (nBytes < 0 || nBytes >= cap) {
×
2491
    code = TSDB_CODE_OUT_OF_RANGE;
×
2492
    TAOS_CHECK_GOTO(code, NULL, _err);
×
2493
  }
2494

2495
  if (!taosDirExist(dbPath)) {
×
2496
    code = taosMulMkDir(dbPath);
×
2497
    TAOS_CHECK_GOTO(code, NULL, _err);
×
2498
  }
2499

2500
  *dbFullPath = dbPath;
×
2501
  *stateFullPath = statePath;
×
2502
  return 0;
×
2503
_err:
×
2504
  stError("failed to create dir: %s, reason:%s", dbPath, tstrerror(code));
×
2505

2506
  taosMemoryFree(statePath);
×
2507
  taosMemoryFree(dbPath);
×
2508
  return code;
×
2509
}
2510

2511
void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId) {
×
2512
  STaskDbWrapper* p = pTaskDb;
×
2513
  TAOS_UNUSED(streamMutexLock(&p->mutex));
×
2514
  p->chkpId = chkpId;
×
2515
  TAOS_UNUSED(streamMutexUnlock(&p->mutex));
×
2516
}
×
2517

2518
STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) {
22✔
2519
  char*   err = NULL;
22✔
2520
  char**  cfNames = NULL;
22✔
2521
  size_t  nCf = 0;
22✔
2522
  int32_t code = 0;
22✔
2523
  int32_t lino = 0;
22✔
2524

2525
  STaskDbWrapper* pTaskDb = taosMemoryCalloc(1, sizeof(STaskDbWrapper));
22!
2526
  TSDB_CHECK_NULL(pTaskDb, code, lino, _EXIT, terrno);
22!
2527

2528
  pTaskDb->idstr = key ? taosStrdup(key) : NULL;
22!
2529
  pTaskDb->path = statePath ? taosStrdup(statePath) : NULL;
22!
2530

2531
  code = taosThreadMutexInit(&pTaskDb->mutex, NULL);
22✔
2532
  TSDB_CHECK_CODE(code, lino, _EXIT);
22!
2533

2534
  taskDbInitChkpOpt(pTaskDb);
22✔
2535
  taskDbInitOpt(pTaskDb);
22✔
2536

2537
  cfNames = rocksdb_list_column_families(pTaskDb->dbOpt, dbPath, &nCf, &err);
22✔
2538
  if (nCf == 0) {
22✔
2539
    stInfo("%s newly create db in state-backend", key);
19!
2540
    // pre create db
2541
    pTaskDb->db = rocksdb_open(pTaskDb->pCfOpts[0], dbPath, &err);
19✔
2542
    if (pTaskDb->db == NULL) {
19!
2543
      stError("%s open state-backend failed, reason:%s", key, err);
×
2544
      code = TSDB_CODE_STREAM_INTERNAL_ERROR;
×
2545
      goto _EXIT;
×
2546
    }
2547

2548
    rocksdb_close(pTaskDb->db);
19✔
2549
    pTaskDb->db = NULL;
19✔
2550

2551
    if (cfNames != NULL) {
19!
2552
      rocksdb_list_column_families_destroy(cfNames, nCf);
19✔
2553
    }
2554

2555
    taosMemoryFree(err);
19!
2556
    err = NULL;
19✔
2557

2558
    cfNames = rocksdb_list_column_families(pTaskDb->dbOpt, dbPath, &nCf, &err);
19✔
2559
    if (err != NULL) {
19!
2560
      stError("%s failed to create column-family, %s, %" PRIzu ", reason:%s", key, dbPath, nCf, err);
×
2561
      code = TSDB_CODE_STREAM_INTERNAL_ERROR;
×
2562
      goto _EXIT;
×
2563
    }
2564
  }
2565

2566
  if ((code = taskDbOpenCfs(pTaskDb, dbPath, cfNames, nCf)) != 0) {
22!
2567
    goto _EXIT;
×
2568
  }
2569

2570
  if (cfNames != NULL) {
22!
2571
    rocksdb_list_column_families_destroy(cfNames, nCf);
22✔
2572
    cfNames = NULL;
22✔
2573
  }
2574

2575
  stDebug("init s-task backend in:%s, backend:%p, %s", dbPath, pTaskDb, key);
22✔
2576
  return pTaskDb;
22✔
2577

2578
_EXIT:
×
2579
  stError("%s taskDb open failed, %s at line:%d code:%s", key, __func__, lino, tstrerror(code));
×
2580

2581
  taskDbDestroy(pTaskDb, false);
×
2582
  if (err) taosMemoryFree(err);
×
2583
  if (cfNames) rocksdb_list_column_families_destroy(cfNames, nCf);
×
2584
  return NULL;
×
2585
}
2586

2587
int32_t taskDbOpen(const char* path, const char* key, int64_t chkptId, int64_t* processVer, STaskDbWrapper** ppTaskDb) {
22✔
2588
  char* statePath = NULL;
22✔
2589
  char* dbPath = NULL;
22✔
2590
  int   code = 0;
22✔
2591
  if ((code = restoreCheckpointData(path, key, chkptId, &statePath, &dbPath, processVer)) < 0) {
22!
2592
    stError("failed to restore checkpoint data, path:%s, key:%s, checkpointId: %" PRId64 "reason:%s", path, key,
×
2593
            chkptId, tstrerror(code));
2594
    return code;
×
2595
  }
2596

2597
  STaskDbWrapper* pTaskDb = taskDbOpenImpl(key, statePath, dbPath);
22✔
2598
  if (pTaskDb != NULL) {
22!
2599
    int64_t chkpId = -1, ver = -1;
22✔
2600
    if ((code = chkpLoadExtraInfo(dbPath, &chkpId, &ver)) == 0) {
22!
2601
      *processVer = ver;
22✔
2602
    } else {
2603
      stError("failed to load extra info, path:%s, key:%s, checkpointId: %" PRId64 "reason:%s", path, key, chkptId,
×
2604
              tstrerror(code));
2605
      taskDbDestroy(pTaskDb, false);
×
2606
      return code;
×
2607
    }
2608
  } else {
2609
    code = TSDB_CODE_INVALID_PARA;
×
2610
  }
2611

2612
  taosMemoryFree(dbPath);
22!
2613
  taosMemoryFree(statePath);
22!
2614
  *ppTaskDb = pTaskDb;
22✔
2615
  return code;
22✔
2616
}
2617

2618
void taskDbDestroy(void* pDb, bool flush) {
16✔
2619
  STaskDbWrapper* wrapper = pDb;
16✔
2620
  if (wrapper == NULL) {
16!
2621
    return;
×
2622
  }
2623

2624
  int64_t st = taosGetTimestampMs();
16✔
2625
  streamMetaRemoveDB(wrapper->pMeta, wrapper->idstr);
16✔
2626

2627
  stDebug("%s succ to destroy stream backend:%p", wrapper->idstr, wrapper);
16!
2628

2629
  int8_t nCf = tListLen(ginitDict);
16✔
2630
  if (flush && wrapper->removeAllFiles == 0) {
16!
2631
    if (wrapper->db && wrapper->pCf) {
16!
2632
      rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
16✔
2633
      rocksdb_flushoptions_set_wait(flushOpt, 1);
16✔
2634

2635
      char*                            err = NULL;
16✔
2636
      rocksdb_column_family_handle_t** cfs = taosMemoryCalloc(1, sizeof(rocksdb_column_family_handle_t*) * nCf);
16!
2637
      int                              numOfFlushCf = 0;
16✔
2638
      for (int i = 0; i < nCf; i++) {
128✔
2639
        if (wrapper->pCf[i] != NULL) {
112✔
2640
          cfs[numOfFlushCf++] = wrapper->pCf[i];
34✔
2641
        }
2642
      }
2643
      if (numOfFlushCf != 0) {
16!
2644
        rocksdb_flush_cfs(wrapper->db, flushOpt, cfs, numOfFlushCf, &err);
16✔
2645
        if (err != NULL) {
16!
2646
          stError("failed to flush all cfs, reason:%s", err);
×
2647
          taosMemoryFreeClear(err);
×
2648
        }
2649
      }
2650
      taosMemoryFree(cfs);
16!
2651
      rocksdb_flushoptions_destroy(flushOpt);
16✔
2652
    }
2653
  }
2654

2655
  if (wrapper->pCf != NULL) {
16!
2656
    for (int i = 0; i < nCf; i++) {
128✔
2657
      if (wrapper->pCf[i] != NULL) {
112✔
2658
        rocksdb_column_family_handle_destroy(wrapper->pCf[i]);
34✔
2659
      }
2660
    }
2661
  }
2662

2663
  if (wrapper->db) {
16!
2664
    rocksdb_close(wrapper->db);
16✔
2665
    wrapper->db = NULL;
16✔
2666
  }
2667

2668
  rocksdb_options_destroy(wrapper->dbOpt);
16✔
2669
  rocksdb_readoptions_destroy(wrapper->readOpt);
16✔
2670
  rocksdb_writeoptions_destroy(wrapper->writeOpt);
16✔
2671
  rocksdb_env_destroy(wrapper->env);
16✔
2672
  rocksdb_cache_destroy(wrapper->cache);
16✔
2673

2674
  taosMemoryFree(wrapper->pCf);
16!
2675
  for (int i = 0; i < nCf; i++) {
128✔
2676
    rocksdb_options_t*                   opt = wrapper->pCfOpts[i];
112✔
2677
    rocksdb_comparator_t*                compare = wrapper->pCompares[i];
112✔
2678
    rocksdb_block_based_table_options_t* tblOpt = wrapper->pCfParams[i].tableOpt;
112✔
2679

2680
    rocksdb_options_destroy(opt);
112✔
2681
    rocksdb_comparator_destroy(compare);
112✔
2682
    rocksdb_block_based_options_destroy(tblOpt);
112✔
2683
  }
2684

2685
  taosMemoryFree(wrapper->pCompares);
16!
2686
  taosMemoryFree(wrapper->pCfOpts);
16!
2687
  taosMemoryFree(wrapper->pCfParams);
16!
2688

2689
  streamMutexDestroy(&wrapper->mutex);
16✔
2690
  taskDbDestroyChkpOpt(wrapper);
16✔
2691

2692
  if (wrapper->removeAllFiles) {
16!
2693
    char* err = NULL;
×
2694
    stInfo("drop task remove backend data:%s", wrapper->path);
×
2695
    taosRemoveDir(wrapper->path);
×
2696
  }
2697

2698
  int64_t et = taosGetTimestampMs();
16✔
2699
  stDebug("%s destroy stream backend:%p completed, elapsed time:%.2fs", wrapper->idstr, wrapper, (et - st)/1000.0);
16!
2700

2701
  taosMemoryFree(wrapper->idstr);
16!
2702
  taosMemoryFree(wrapper->path);
16!
2703
  taosMemoryFree(wrapper);
16!
2704
}
2705

2706
void taskDbDestroy2(void* pDb) { taskDbDestroy(pDb, true); }
14✔
2707

2708
int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char** path) {
1✔
2709
  int32_t code = 0;
1✔
2710
  int64_t refId = pDb->refId;
1✔
2711
  int32_t nBytes = 0;
1✔
2712

2713
  if (taosAcquireRef(taskDbWrapperId, refId) == NULL) {
1!
2714
    code = terrno;
×
2715
    return code;
×
2716
  }
2717

2718
  int32_t cap = strlen(pDb->path) + 128;
1✔
2719

2720
  char* buf = taosMemoryCalloc(1, cap);
1!
2721
  if (buf == NULL) {
1!
2722
    TAOS_UNUSED(taosReleaseRef(taskDbWrapperId, refId));
×
2723
    return terrno;
×
2724
  }
2725

2726
  nBytes =
2727
      snprintf(buf, cap, "%s%s%s%s%s%" PRId64, pDb->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkpId);
1✔
2728
  if (nBytes <= 0 || nBytes >= cap) {
1!
2729
    taosMemoryFree(buf);
×
2730
    TAOS_UNUSED(taosReleaseRef(taskDbWrapperId, refId));
×
2731
    return TSDB_CODE_OUT_OF_RANGE;
×
2732
  }
2733

2734
  if (taosIsDir(buf)) {
1!
2735
    code = 0;
×
2736
    *path = buf;
×
2737
  } else {
2738
    taosMemoryFree(buf);
1!
2739
  }
2740

2741
  TAOS_UNUSED(taosReleaseRef(taskDbWrapperId, refId));
1✔
2742
  return code;
1✔
2743
}
2744

2745
int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64_t chkpId, char** path, SArray* list,
1✔
2746
                                    const char* idStr) {
2747
  int32_t  code = 0;
1✔
2748
  int32_t  cap = strlen(pDb->path) + 32;
1✔
2749
  SBkdMgt* p = (SBkdMgt*)bkdChkpMgt;
1✔
2750

2751
  char* temp = taosMemoryCalloc(1, cap);
1!
2752
  if (temp == NULL) {
1!
2753
    return terrno;
×
2754
  }
2755

2756
  int32_t nBytes = snprintf(temp, cap, "%s%s%s%" PRId64, pDb->path, TD_DIRSEP, "tmp", chkpId);
1✔
2757
  if (nBytes <= 0 || nBytes >= cap) {
1!
2758
    taosMemoryFree(temp);
×
2759
    return TSDB_CODE_OUT_OF_RANGE;
×
2760
  }
2761

2762
  if (taosDirExist(temp)) {
1!
2763
    cleanDir(temp, idStr);
×
2764
  } else {
2765
    code = taosMkDir(temp);
1✔
2766
    if (code != 0) {
1!
2767
      taosMemoryFree(temp);
×
2768
      return TAOS_SYSTEM_ERROR(ERRNO);
×
2769
    }
2770
  }
2771

2772
  code = bkdMgtGetDelta(p, pDb->idstr, chkpId, list, temp);
1✔
2773
  *path = temp;
1✔
2774

2775
  return code;
1✔
2776
}
2777

2778
int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list,
3✔
2779
                                const char* idStr) {
2780
  int32_t                 code = -1;
3✔
2781
  STaskDbWrapper*         pDb = arg;
3✔
2782
  ECHECKPOINT_BACKUP_TYPE utype = type;
3✔
2783

2784
  taskDbRefChkp(pDb, chkpId);
3✔
2785
  if (utype == DATA_UPLOAD_RSYNC) {
3✔
2786
    code = taskDbGenChkpUploadData__rsync(pDb, chkpId, path);
1✔
2787
  } else if (utype == DATA_UPLOAD_S3) {
2✔
2788
    code = taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path, list, idStr);
1✔
2789
  }
2790
  taskDbUnRefChkp(pDb, chkpId);
3✔
2791
  return code;
3✔
2792
}
2793

2794
int32_t taskDbOpenCfByKey(STaskDbWrapper* pDb, const char* key) {
3✔
2795
  int32_t code = 0;
3✔
2796
  char*   err = NULL;
3✔
2797
  int8_t  idx = getCfIdx(key);
3✔
2798

2799
  if (idx == -1) return -1;
3!
2800

2801
  if (pDb->pCf[idx] != NULL) return code;
3✔
2802

2803
  rocksdb_column_family_handle_t* cf =
2804
      rocksdb_create_column_family(pDb->db, pDb->pCfOpts[idx], ginitDict[idx].key, &err);
1✔
2805
  if (err != NULL) {
1!
2806
    stError("failed to open cf, key:%s, reason: %s", key, err);
×
2807
    taosMemoryFree(err);
×
2808
    code = -1;
×
2809
    return code;
×
2810
  }
2811

2812
  pDb->pCf[idx] = cf;
1✔
2813
  return code;
1✔
2814
}
2815
int32_t copyDataAt(RocksdbCfInst* pSrc, STaskDbWrapper* pDst, int8_t i) {
3✔
2816
  int32_t WRITE_BATCH = 1024;
3✔
2817
  char*   err = NULL;
3✔
2818
  int     code = 0;
3✔
2819

2820
  rocksdb_readoptions_t* pRdOpt = rocksdb_readoptions_create();
3✔
2821

2822
  rocksdb_writebatch_t* wb = rocksdb_writebatch_create();
3✔
2823
  rocksdb_iterator_t*   pIter = rocksdb_create_iterator_cf(pSrc->db, pRdOpt, pSrc->pHandle[i]);
3✔
2824
  rocksdb_iter_seek_to_first(pIter);
3✔
2825
  while (rocksdb_iter_valid(pIter)) {
3!
2826
    if (rocksdb_writebatch_count(wb) >= WRITE_BATCH) {
×
2827
      rocksdb_write(pDst->db, pDst->writeOpt, wb, &err);
×
2828
      if (err != NULL) {
×
2829
        code = -1;
×
2830
        goto _EXIT;
×
2831
      }
2832
      rocksdb_writebatch_clear(wb);
×
2833
    }
2834

2835
    size_t klen = 0, vlen = 0;
×
2836
    char*  key = (char*)rocksdb_iter_key(pIter, &klen);
×
2837
    char*  val = (char*)rocksdb_iter_value(pIter, &vlen);
×
2838

2839
    rocksdb_writebatch_put_cf(wb, pDst->pCf[i], key, klen, val, vlen);
×
2840
    rocksdb_iter_next(pIter);
×
2841
  }
2842

2843
  if (rocksdb_writebatch_count(wb) > 0) {
3!
2844
    rocksdb_write(pDst->db, pDst->writeOpt, wb, &err);
×
2845
    if (err != NULL) {
×
2846
      code = -1;
×
2847
      goto _EXIT;
×
2848
    }
2849
  }
2850

2851
_EXIT:
3✔
2852
  rocksdb_writebatch_destroy(wb);
3✔
2853
  rocksdb_iter_destroy(pIter);
3✔
2854
  rocksdb_readoptions_destroy(pRdOpt);
3✔
2855
  taosMemoryFree(err);
3!
2856

2857
  return code;
3✔
2858
}
2859

2860
int32_t streamStateCvtDataFormat(char* path, char* key, void* pCfInst) {
2✔
2861
  int nCf = sizeof(ginitDict) / sizeof(ginitDict[0]);
2✔
2862

2863
  int32_t code = 0;
2✔
2864

2865
  int64_t         processVer = -1;
2✔
2866
  STaskDbWrapper* pTaskDb = NULL;
2✔
2867

2868
  code = taskDbOpen(path, key, 0, &processVer, &pTaskDb);
2✔
2869
  RocksdbCfInst* pSrcBackend = pCfInst;
2✔
2870

2871
  for (int i = 0; i < nCf; i++) {
16✔
2872
    rocksdb_column_family_handle_t* pSrcCf = pSrcBackend->pHandle[i];
14✔
2873
    if (pSrcCf == NULL) continue;
14✔
2874

2875
    code = taskDbOpenCfByKey(pTaskDb, ginitDict[i].key);
3✔
2876
    if (code != 0) goto _EXIT;
3!
2877

2878
    code = copyDataAt(pSrcBackend, pTaskDb, i);
3✔
2879
    if (code != 0) goto _EXIT;
3!
2880
  }
2881

2882
_EXIT:
2✔
2883
  taskDbDestroy(pTaskDb, true);
2✔
2884

2885
  return code;
2✔
2886
}
2887
int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf) {
1✔
2888
  SBackendWrapper* handle = backend;
1✔
2889
  char*            err = NULL;
1✔
2890
  int64_t          streamId;
2891
  int32_t          taskId, dummy = 0;
1✔
2892
  char             suffix[64] = {0};
1✔
2893
  int32_t          code = 0;
1✔
2894

2895
  rocksdb_options_t**              cfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*));
1!
2896
  RocksdbCfParam*                  params = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam));
1!
2897
  rocksdb_comparator_t**           pCompare = taosMemoryCalloc(nCf, sizeof(rocksdb_comparator_t*));
1!
2898
  rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
1!
2899

2900
  for (int i = 0; i < nCf; i++) {
5✔
2901
    char* cf = cfs[i];
4✔
2902
    char  funcname[64] = {0};
4✔
2903

2904
    cfOpts[i] = rocksdb_options_create_copy(handle->dbOpt);
4✔
2905
    if (i == 0) continue;
4✔
2906
    if (3 == sscanf(cf, "0x%" PRIx64 "-%d_%s", &streamId, &taskId, funcname)) {
3!
2907
      rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create();
3✔
2908
      rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache);
3✔
2909
      rocksdb_block_based_options_set_partition_filters(tableOpt, 1);
3✔
2910

2911
      rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15);
3✔
2912
      rocksdb_block_based_options_set_filter_policy(tableOpt, filter);
3✔
2913

2914
      rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpts[i], tableOpt);
3✔
2915
      params[i].tableOpt = tableOpt;
3✔
2916

2917
      int idx = streamStateGetCfIdx(NULL, funcname);
3✔
2918
      if (idx < 0 || idx >= sizeof(ginitDict) / sizeof(ginitDict[0])) {
3!
2919
        stError("failed to open cf");
×
2920
        return -1;
×
2921
      }
2922
      SCfInit* cfPara = &ginitDict[idx];
3✔
2923

2924
      rocksdb_comparator_t* compare =
2925
          rocksdb_comparator_create(NULL, cfPara->destroyCmp, cfPara->cmpKey, cfPara->cmpName);
3✔
2926
      rocksdb_options_set_comparator((rocksdb_options_t*)cfOpts[i], compare);
3✔
2927
      pCompare[i] = compare;
3✔
2928
    }
2929
  }
2930
  rocksdb_t* db = rocksdb_open_column_families(handle->dbOpt, name, nCf, (const char* const*)cfs,
1✔
2931
                                               (const rocksdb_options_t* const*)cfOpts, cfHandle, &err);
2932
  if (err != NULL) {
1!
2933
    stError("failed to open rocksdb cf, reason:%s", err);
×
2934
    taosMemoryFree(err);
×
2935
    taosMemoryFree(cfHandle);
×
2936
    taosMemoryFree(pCompare);
×
2937
    taosMemoryFree(params);
×
2938
    taosMemoryFree(cfOpts);
×
2939
    // fix other leak
2940
    return TSDB_CODE_THIRDPARTY_ERROR;
×
2941
  } else {
2942
    stDebug("succ to open rocksdb cf");
1!
2943
  }
2944
  // close default cf
2945
  if (((rocksdb_column_family_handle_t**)cfHandle)[0] != 0) {
1!
2946
    rocksdb_column_family_handle_destroy(cfHandle[0]);
1✔
2947
    cfHandle[0] = NULL;
1✔
2948
  }
2949
  rocksdb_options_destroy(cfOpts[0]);
1✔
2950

2951
  handle->db = db;
1✔
2952

2953
  static int32_t cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
2954
  for (int i = 0; i < nCf; i++) {
5✔
2955
    char* cf = cfs[i];
4✔
2956
    if (i == 0) continue;  // skip default column family, not set opt
4✔
2957

2958
    char funcname[64] = {0};
3✔
2959
    if (3 == sscanf(cf, "0x%" PRIx64 "-%d_%s", &streamId, &taskId, funcname)) {
3!
2960
      char    idstr[128] = {0};
3✔
2961
      int32_t nBytes = snprintf(idstr, sizeof(idstr), "0x%" PRIx64 "-%d", streamId, taskId);
3✔
2962
      if (nBytes <= 0 || nBytes >= sizeof(idstr)) {
3!
2963
        code = TSDB_CODE_OUT_OF_RANGE;
×
2964
        stError("failed to open cf since %s", tstrerror(code));
×
2965
        return code;
×
2966
      }
2967

2968
      int idx = streamStateGetCfIdx(NULL, funcname);
3✔
2969

2970
      RocksdbCfInst*  inst = NULL;
3✔
2971
      RocksdbCfInst** pInst = taosHashGet(handle->cfInst, idstr, strlen(idstr) + 1);
3✔
2972
      if (pInst == NULL || *pInst == NULL) {
3!
2973
        inst = taosMemoryCalloc(1, sizeof(RocksdbCfInst));
2!
2974
        inst->pHandle = taosMemoryCalloc(cfLen, sizeof(rocksdb_column_family_handle_t*));
2!
2975
        inst->cfOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*));
2!
2976
        inst->wOpt = rocksdb_writeoptions_create();
2✔
2977
        inst->rOpt = rocksdb_readoptions_create();
2✔
2978
        inst->param = taosMemoryCalloc(cfLen, sizeof(RocksdbCfParam));
2!
2979
        inst->pBackend = handle;
2✔
2980
        inst->db = db;
2✔
2981
        inst->pCompares = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*));
2!
2982

2983
        inst->dbOpt = handle->dbOpt;
2✔
2984
        rocksdb_writeoptions_disable_WAL(inst->wOpt, 1);
2✔
2985
        TAOS_UNUSED(taosHashPut(handle->cfInst, idstr, strlen(idstr) + 1, &inst, sizeof(void*)));
2✔
2986
      } else {
2987
        inst = *pInst;
1✔
2988
      }
2989
      inst->cfOpt[idx] = cfOpts[i];
3✔
2990
      inst->pCompares[idx] = pCompare[i];
3✔
2991
      memcpy(&(inst->param[idx]), &(params[i]), sizeof(RocksdbCfParam));
3✔
2992
      inst->pHandle[idx] = cfHandle[i];
3✔
2993
    }
2994
  }
2995
  void* pIter = taosHashIterate(handle->cfInst, NULL);
1✔
2996
  while (pIter) {
3✔
2997
    RocksdbCfInst* inst = *(RocksdbCfInst**)pIter;
2✔
2998

2999
    for (int i = 0; i < cfLen; i++) {
16✔
3000
      if (inst->cfOpt[i] == NULL) {
14✔
3001
        rocksdb_options_t*                   opt = rocksdb_options_create_copy(handle->dbOpt);
11✔
3002
        rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create();
11✔
3003
        rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache);
11✔
3004
        rocksdb_block_based_options_set_partition_filters(tableOpt, 1);
11✔
3005

3006
        rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15);
11✔
3007
        rocksdb_block_based_options_set_filter_policy(tableOpt, filter);
11✔
3008

3009
        rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)opt, tableOpt);
11✔
3010

3011
        SCfInit* cfPara = &ginitDict[i];
11✔
3012

3013
        rocksdb_comparator_t* compare =
3014
            rocksdb_comparator_create(NULL, cfPara->destroyCmp, cfPara->cmpKey, cfPara->cmpName);
11✔
3015
        rocksdb_options_set_comparator((rocksdb_options_t*)opt, compare);
11✔
3016

3017
        inst->pCompares[i] = compare;
11✔
3018
        inst->cfOpt[i] = opt;
11✔
3019
        inst->param[i].tableOpt = tableOpt;
11✔
3020
      }
3021
    }
3022
    SCfComparator compare = {.comp = inst->pCompares, .numOfComp = cfLen};
2✔
3023
    inst->pCompareNode = streamBackendAddCompare(handle, &compare);
2✔
3024
    pIter = taosHashIterate(handle->cfInst, pIter);
2✔
3025
  }
3026

3027
  taosMemoryFree(cfHandle);
1!
3028
  taosMemoryFree(pCompare);
1!
3029
  taosMemoryFree(params);
1!
3030
  taosMemoryFree(cfOpts);
1!
3031
  return 0;
1✔
3032
}
3033

3034
void streamStateDestroyCompar(void* arg) {
2✔
3035
  SCfComparator* comp = (SCfComparator*)arg;
2✔
3036
  for (int i = 0; i < comp->numOfComp; i++) {
16✔
3037
    if (comp->comp[i]) rocksdb_comparator_destroy(comp->comp[i]);
14!
3038
  }
3039
  taosMemoryFree(comp->comp);
2!
3040
}
2✔
3041

3042
int streamStateGetCfIdx(SStreamState* pState, const char* funcName) {
2,436✔
3043
  int    idx = -1;
2,436✔
3044
  size_t len = strlen(funcName);
2,436✔
3045
  for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
9,407!
3046
    if (len == ginitDict[i].len && strncmp(funcName, ginitDict[i].key, strlen(funcName)) == 0) {
9,407✔
3047
      idx = i;
2,436✔
3048
      break;
2,436✔
3049
    }
3050
  }
3051
  if (pState != NULL && idx != -1) {
2,436!
3052
    STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
2,430✔
3053
    if (pState->pTdbState->recalc) {
2,430!
3054
      wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
3055
    }
3056
    if (wrapper == NULL) {
2,430!
3057
      return -1;
×
3058
    }
3059

3060
    streamMutexLock(&wrapper->mutex);
2,430✔
3061

3062
    rocksdb_column_family_handle_t* cf = wrapper->pCf[idx];
2,430✔
3063
    if (cf == NULL) {
2,430✔
3064
      char* err = NULL;
20✔
3065
      cf = rocksdb_create_column_family(wrapper->db, wrapper->pCfOpts[idx], ginitDict[idx].key, &err);
20✔
3066
      if (err != NULL) {
20!
3067
        idx = -1;
×
3068
        stError("failed to open cf, %p %s_%s, reason:%s", pState, wrapper->idstr, funcName, err);
×
3069
        rocksdb_column_family_handle_destroy(cf);
×
3070
        taosMemoryFree(err);
×
3071
      } else {
3072
        stDebug("succ to open cf, %p %s_%s", pState, wrapper->idstr, funcName);
20✔
3073
        wrapper->pCf[idx] = cf;
20✔
3074
      }
3075
    }
3076
    streamMutexUnlock(&wrapper->mutex);
2,430✔
3077
  }
3078

3079
  return idx;
2,436✔
3080
}
3081
bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len) {
211✔
3082
  rocksdb_iter_seek(iter, buf, len);
211✔
3083
  if (!rocksdb_iter_valid(iter)) {
211✔
3084
    rocksdb_iter_seek_for_prev(iter, buf, len);
1✔
3085
    if (!rocksdb_iter_valid(iter)) {
1!
3086
      return false;
1✔
3087
    }
3088
  }
3089
  return true;
210✔
3090
}
3091
rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKeyName, rocksdb_snapshot_t** snapshot,
231✔
3092
                                          rocksdb_readoptions_t** readOpt) {
3093
  int idx = streamStateGetCfIdx(pState, cfKeyName);
231✔
3094

3095
  *readOpt = rocksdb_readoptions_create();
231✔
3096

3097
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
231✔
3098
  if (pState->pTdbState->recalc) {
231!
3099
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
3100
  }
3101
  if (snapshot != NULL) {
231!
3102
    *snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(wrapper->db);
231✔
3103
    rocksdb_readoptions_set_snapshot(*readOpt, *snapshot);
231✔
3104
    rocksdb_readoptions_set_fill_cache(*readOpt, 0);
231✔
3105
  }
3106

3107
  return rocksdb_create_iterator_cf(wrapper->db, *readOpt, ((rocksdb_column_family_handle_t**)wrapper->pCf)[idx]);
231✔
3108
}
3109

3110
#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen)                                                 \
3111
  do {                                                                                                               \
3112
    code = 0;                                                                                                        \
3113
    char  buf[128] = {0};                                                                                            \
3114
    char* err = NULL;                                                                                                \
3115
    int   i = streamStateGetCfIdx(pState, funcname);                                                                 \
3116
    if (i < 0) {                                                                                                     \
3117
      stWarn("streamState failed to get cf name: %s", funcname);                                                     \
3118
      code = TSDB_CODE_THIRDPARTY_ERROR;                                                                             \
3119
      break;                                                                                                         \
3120
    }                                                                                                                \
3121
    STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;                                                   \
3122
    TAOS_UNUSED(atomic_add_fetch_64(&wrapper->dataWritten, 1));                                                      \
3123
    char toString[128] = {0};                                                                                        \
3124
    TAOS_UNUSED((ginitDict[i].toStrFunc((void*)key, toString)));                      \
3125
    int32_t                         klen = ginitDict[i].enFunc((void*)key, buf);                                     \
3126
    rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx];    \
3127
    rocksdb_writeoptions_t*         opts = wrapper->writeOpt;                                                        \
3128
    rocksdb_t*                      db = wrapper->db;                                                                \
3129
    char*                           ttlV = NULL;                                                                     \
3130
    int32_t                         ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV);                \
3131
    rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err);             \
3132
    if (err != NULL) {                                                                                               \
3133
      stError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err);                        \
3134
      taosMemoryFree(err);                                                                                           \
3135
      code = TSDB_CODE_THIRDPARTY_ERROR;                                                                             \
3136
    } else {                                                                                                         \
3137
      stDebug("[InternalERR] write streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d, %p", toString, \
3138
             funcname, vLen, ttlVLen, wrapper);                                                                      \
3139
    }                                                                                                                \
3140
    taosMemoryFree(ttlV);                                                                                            \
3141
  } while (0);
3142

3143
#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen)                                                   \
3144
  do {                                                                                                                \
3145
    code = 0;                                                                                                         \
3146
    char  buf[128] = {0};                                                                                             \
3147
    char* err = NULL;                                                                                                 \
3148
    int   i = streamStateGetCfIdx(pState, funcname);                                                                  \
3149
    if (i < 0) {                                                                                                      \
3150
      stWarn("streamState failed to get cf name: %s", funcname);                                                      \
3151
      code = -1;                                                                                                      \
3152
      break;                                                                                                          \
3153
    }                                                                                                                 \
3154
    STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;                                                    \
3155
    if (pState->pTdbState->recalc) {                                                                                  \
3156
      wrapper = pState->pTdbState->pOwner->pRecalBackend;                                                             \
3157
    }                                                                                                                 \
3158
    char toString[128] = {0};                                                                                         \
3159
    if (stDebugFlag & DEBUG_TRACE) TAOS_UNUSED((ginitDict[i].toStrFunc((void*)key, toString)));                       \
3160
    int32_t                         klen = ginitDict[i].enFunc((void*)key, buf);                                      \
3161
    rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx];     \
3162
    rocksdb_t*                      db = wrapper->db;                                                                 \
3163
    rocksdb_readoptions_t*          opts = wrapper->readOpt;                                                          \
3164
    size_t                          len = 0;                                                                          \
3165
    char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err);                       \
3166
    if (val == NULL || len == 0) {                                                                                    \
3167
      if (err == NULL) {                                                                                              \
3168
        stTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, wrapper->idstr, funcname); \
3169
      } else {                                                                                                        \
3170
        stError("streamState str: %s failed to read from %s_%s, err: %s", toString, wrapper->idstr, funcname, err);   \
3171
        taosMemoryFreeClear(err);                                                                                     \
3172
      }                                                                                                               \
3173
      code = -1;                                                                                                      \
3174
    } else {                                                                                                          \
3175
      char*   p = NULL;                                                                                               \
3176
      int32_t tlen = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal);                                          \
3177
      if (tlen <= 0) {                                                                                                \
3178
        stError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, wrapper->idstr,         \
3179
                funcname);                                                                                            \
3180
        code = -1;                                                                                                    \
3181
      } else {                                                                                                        \
3182
        stTrace("streamState str: %s succ to read from %s_%s, valLen:%d, %p", toString, wrapper->idstr, funcname,     \
3183
                tlen, wrapper);                                                                                       \
3184
      }                                                                                                               \
3185
      taosMemoryFree(val);                                                                                            \
3186
      if (vLen != NULL) *vLen = tlen;                                                                                 \
3187
    }                                                                                                                 \
3188
  } while (0);
3189

3190
#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key)                                                           \
3191
  do {                                                                                                            \
3192
    code = 0;                                                                                                     \
3193
    char  buf[128] = {0};                                                                                         \
3194
    char* err = NULL;                                                                                             \
3195
    int   i = streamStateGetCfIdx(pState, funcname);                                                              \
3196
    if (i < 0) {                                                                                                  \
3197
      stWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname);                     \
3198
      code = TSDB_CODE_THIRDPARTY_ERROR;                                                                          \
3199
      break;                                                                                                      \
3200
    }                                                                                                             \
3201
    STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;                                                \
3202
    if (pState->pTdbState->recalc) {                                                                              \
3203
      wrapper = pState->pTdbState->pOwner->pRecalBackend;                                                         \
3204
    }                                                                                                             \
3205
    TAOS_UNUSED(atomic_add_fetch_64(&wrapper->dataWritten, 1));                                                   \
3206
    char toString[128] = {0};                                                                                     \
3207
    if (stDebugFlag & DEBUG_TRACE) TAOS_UNUSED(ginitDict[i].toStrFunc((void*)key, toString));                     \
3208
    int32_t                         klen = ginitDict[i].enFunc((void*)key, buf);                                  \
3209
    rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \
3210
    rocksdb_t*                      db = wrapper->db;                                                             \
3211
    rocksdb_writeoptions_t*         opts = wrapper->writeOpt;                                                     \
3212
    rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, klen, &err);                                           \
3213
    if (err != NULL) {                                                                                            \
3214
      stError("streamState str: %s failed to del from %s_%s, err: %s", toString, wrapper->idstr, funcname, err);  \
3215
      taosMemoryFree(err);                                                                                        \
3216
      code = TSDB_CODE_THIRDPARTY_ERROR;                                                                          \
3217
    } else {                                                                                                      \
3218
      stTrace("streamState str: %s succ to del from %s_%s", toString, wrapper->idstr, funcname);                  \
3219
    }                                                                                                             \
3220
  } while (0);
3221

3222
// state cf
3223
int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
201✔
3224
  int code = 0;
201✔
3225

3226
  SStateKey sKey = {.key = *key, .opNum = pState->number};
201✔
3227
  char*     dst = NULL;
201✔
3228
  size_t    size = 0;
201✔
3229
  if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) {
201!
3230
    STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)value, (int32_t)vLen);
201!
3231
  } else {
3232
    code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size);
×
3233
    if (code != 0) {
×
3234
      return code;
×
3235
    }
3236
    STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)dst, (int32_t)size);
×
3237
    taosMemoryFree(dst);
×
3238
  }
3239
  return code;
201✔
3240
}
3241
int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
200✔
3242
  int       code = 0;
200✔
3243
  SStateKey sKey = {.key = *key, .opNum = pState->number};
200✔
3244

3245
  char*  tVal = NULL;
200✔
3246
  size_t tValLen = 0;
200✔
3247
  STREAM_STATE_GET_ROCKSDB(pState, "state", &sKey, &tVal, &tValLen);
300!
3248
  if (code != 0) {
200✔
3249
    taosMemoryFree(tVal);
100!
3250
    return code;
100✔
3251
  }
3252
  if (pState->pResultRowStore.resultRowGet == NULL || pState->pExprSupp == NULL) {
100!
3253
    *pVal = tVal;
100✔
3254
    *pVLen = tValLen;
100✔
3255
    return code;
100✔
3256
  }
3257
  size_t pValLen = 0;
×
3258
  code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, &pValLen);
×
3259
  *pVLen = (int32_t)pValLen;
×
3260
  taosMemoryFree(tVal);
×
3261
  return code;
×
3262
}
3263
int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key) {
2✔
3264
  int       code = 0;
2✔
3265
  SStateKey sKey = {.key = *key, .opNum = pState->number};
2✔
3266
  STREAM_STATE_DEL_ROCKSDB(pState, "state", &sKey);
2!
3267
  return code;
2✔
3268
}
3269
int32_t streamStateClear_rocksdb(SStreamState* pState) {
1✔
3270
  stDebug("streamStateClear_rocksdb");
1!
3271

3272
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
1✔
3273
  if (pState->pTdbState->recalc) {
1!
3274
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
3275
  }
3276

3277
  TAOS_UNUSED(atomic_add_fetch_64(&wrapper->dataWritten, 1));
1✔
3278

3279
  char      sKeyStr[128] = {0};
1✔
3280
  char      eKeyStr[128] = {0};
1✔
3281
  SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number};
1✔
3282
  SStateKey eKey = {.key = {.ts = INT64_MAX, .groupId = UINT64_MAX}, .opNum = pState->number};
1✔
3283

3284
  int sLen = stateKeyEncode(&sKey, sKeyStr);
1✔
3285
  int eLen = stateKeyEncode(&eKey, eKeyStr);
1✔
3286

3287
  if (wrapper->pCf[1] != NULL) {
1!
3288
    char* err = NULL;
1✔
3289
    rocksdb_delete_range_cf(wrapper->db, wrapper->writeOpt, wrapper->pCf[1], sKeyStr, sLen, eKeyStr, eLen, &err);
1✔
3290
    if (err != NULL) {
1!
3291
      char toStringStart[128] = {0};
×
3292
      char toStringEnd[128] = {0};
×
3293
      TAOS_UNUSED(stateKeyToString(&sKey, toStringStart));
×
3294
      TAOS_UNUSED(stateKeyToString(&eKey, toStringEnd));
×
3295

3296
      stWarn("failed to delete range cf(state) start: %s, end:%s, reason:%s", toStringStart, toStringEnd, err);
×
3297
      taosMemoryFree(err);
×
3298
    } else {
3299
      rocksdb_compact_range_cf(wrapper->db, wrapper->pCf[1], sKeyStr, sLen, eKeyStr, eLen);
1✔
3300
    }
3301
  }
3302

3303
  return 0;
1✔
3304
}
3305
void streamStateCurNext_rocksdb(SStreamStateCur* pCur) {
×
3306
  if (pCur && pCur->iter && rocksdb_iter_valid(pCur->iter)) {
×
3307
    rocksdb_iter_next(pCur->iter);
×
3308
  }
3309
}
×
3310
int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) {
1✔
3311
  int code = 0;
1✔
3312
  stDebug("streamStateGetFirst_rocksdb");
1!
3313
  SWinKey tmp = {.ts = 0, .groupId = 0};
1✔
3314
  code = streamStatePut_rocksdb(pState, &tmp, NULL, 0);
1✔
3315
  if (code != 0) {
1!
3316
    return code;
×
3317
  }
3318

3319
  SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp);
1✔
3320
  code = streamStateGetKVByCur_rocksdb(pState, pCur, key, NULL, 0);
1✔
3321
  if (code != 0) {
1!
3322
    return code;
×
3323
  }
3324
  streamStateFreeCur(pCur);
1✔
3325
  return streamStateDel_rocksdb(pState, &tmp);
1✔
3326
}
3327

3328
int32_t streamStateFillGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal,
1✔
3329
                                               int32_t* pVLen) {
3330
  if (!pCur) {
1!
3331
    return -1;
×
3332
  }
3333
  uint64_t groupId = pKey->groupId;
1✔
3334

3335
  int32_t code = streamStateFillGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
1✔
3336
  if (code == 0) {
1!
3337
    if (pKey->groupId == groupId) {
1!
3338
      return 0;
1✔
3339
    }
3340
    if (pVal != NULL) {
×
3341
      taosMemoryFree((void*)*pVal);
×
3342
      *pVal = NULL;
×
3343
    }
3344
  }
3345
  return -1;
×
3346
}
3347
int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
×
3348
  stDebug("streamStateAddIfNotExist_rocksdb");
×
3349
  int32_t size = *pVLen;
×
3350
  if (streamStateGet_rocksdb(pState, key, pVal, pVLen) == 0) {
×
3351
    return 0;
×
3352
  }
3353
  *pVal = taosMemoryMalloc(size);
×
3354
  if (*pVal == NULL) {
×
3355
    return terrno;
×
3356
  }
3357
  memset(*pVal, 0, size);
×
3358
  return 0;
×
3359
}
3360
void streamStateCurPrev_rocksdb(SStreamStateCur* pCur) {
×
3361
  if (pCur) {
×
3362
    rocksdb_iter_prev(pCur->iter);
×
3363
  }
3364
}
×
3365
int32_t streamStateGetKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SWinKey* pKey, const void** pVal,
18✔
3366
                                      int32_t* pVLen) {
3367
  if (!pCur) return -1;
18✔
3368
  SStateKey  tkey;
3369
  SStateKey* pKtmp = &tkey;
2✔
3370

3371
  if (rocksdb_iter_valid(pCur->iter) && !iterValueIsStale(pCur->iter)) {
2!
3372
    size_t tlen;
3373
    char*  keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen);
2✔
3374
    TAOS_UNUSED(stateKeyDecode((void*)pKtmp, keyStr));
2✔
3375
    if (pKtmp->opNum != pCur->number) {
2!
3376
      return -1;
×
3377
    }
3378

3379
    if (pVLen != NULL) {
2✔
3380
      size_t      vlen = 0;
1✔
3381
      const char* valStr = rocksdb_iter_value(pCur->iter, &vlen);
1✔
3382
      char*       val = NULL;
1✔
3383
      int32_t     len = valueDecode((void*)valStr, vlen, NULL, (char**)val);
1✔
3384
      if (len <= 0) {
1!
3385
        taosMemoryFree(val);
×
3386
        return -1;
1✔
3387
      }
3388

3389
      char*  tVal = val;
1✔
3390
      size_t tVlen = len;
1✔
3391

3392
      if (pVal != NULL) {
1!
3393
        if (pState != NULL && pState->pResultRowStore.resultRowGet != NULL && pState->pExprSupp != NULL) {
1!
3394
          int code =
3395
              (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, val, len, (char**)&tVal, (size_t*)&tVlen);
1✔
3396
          if (code != 0) {
1!
3397
            taosMemoryFree(val);
1!
3398
            return code;
1✔
3399
          }
3400
          taosMemoryFree(val);
×
3401
          *pVal = (char*)tVal;
×
3402
        } else {
3403
          stInfo("streamStateGetKVByCur_rocksdb, pState = %p, pResultRowStore = %p, pExprSupp = %p", pState,
×
3404
                 pState->pResultRowStore.resultRowGet, pState->pExprSupp);
3405
          *pVal = (char*)tVal;
×
3406
        }
3407
      } else {
3408
        taosMemoryFree(val);
×
3409
      }
3410
      *pVLen = (int32_t)tVlen;
×
3411
    }
3412

3413
    *pKey = pKtmp->key;
1✔
3414
    return 0;
1✔
3415
  }
3416
  return -1;
×
3417
}
3418
SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key) {
×
3419
  SStreamStateCur* pCur = streamStateFillGetCur_rocksdb(pState, key);
×
3420
  if (pCur) {
×
3421
    int32_t code = streamStateFillGetGroupKVByCur_rocksdb(pCur, key, NULL, 0);
×
3422
    if (code == 0) return pCur;
×
3423
    streamStateFreeCur(pCur);
×
3424
  }
3425
  return NULL;
×
3426
}
3427

3428
SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) {
2✔
3429
  SStreamStateCur* pCur = createStreamStateCursor();
2✔
3430
  if (pCur == NULL) {
2!
3431
    return NULL;
×
3432
  }
3433
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
2✔
3434
  if (pState->pTdbState->recalc) {
2!
3435
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
3436
  }
3437
  pCur->number = pState->number;
2✔
3438
  pCur->db = wrapper->db;
2✔
3439
  pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
4✔
3440
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
2✔
3441

3442
  SStateKey sKey = {.key = *key, .opNum = pState->number};
2✔
3443
  char      buf[128] = {0};
2✔
3444
  int       len = stateKeyEncode((void*)&sKey, buf);
2✔
3445
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
2!
3446
    streamStateFreeCur(pCur);
×
3447
    return NULL;
×
3448
  }
3449
  // skip ttl expired data
3450
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
2!
3451
    rocksdb_iter_next(pCur->iter);
×
3452
  }
3453

3454
  if (rocksdb_iter_valid(pCur->iter)) {
2!
3455
    SStateKey curKey;
3456
    size_t    kLen;
3457
    char*     keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
2✔
3458
    TAOS_UNUSED(stateKeyDecode((void*)&curKey, keyStr));
2✔
3459
    if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) > 0) {
2!
3460
      return pCur;
×
3461
    }
3462
    rocksdb_iter_next(pCur->iter);
2✔
3463
    return pCur;
2✔
3464
  }
3465
  streamStateFreeCur(pCur);
×
3466
  return NULL;
×
3467
}
3468

3469
SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState) {
18✔
3470
  int32_t code = 0;
18✔
3471

3472
  const SStateKey maxStateKey = {.key = {.groupId = UINT64_MAX, .ts = INT64_MAX}, .opNum = INT64_MAX};
18✔
3473
  STREAM_STATE_PUT_ROCKSDB(pState, "state", &maxStateKey, "", 0);
18!
3474
  if (code != 0) {
18!
3475
    return NULL;
×
3476
  }
3477

3478
  {
3479
    char tbuf[256] = {0};
18✔
3480
    TAOS_UNUSED(stateKeyToString((void*)&maxStateKey, tbuf));
18✔
3481
    stDebug("seek to last:%s", tbuf);
18✔
3482
  }
3483

3484
  SStreamStateCur* pCur = createStreamStateCursor();
18✔
3485
  if (pCur == NULL) return NULL;
18!
3486

3487
  pCur->number = pState->number;
18✔
3488

3489
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
18✔
3490
  if (pState->pTdbState->recalc) {
18!
3491
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
3492
  }
3493
  pCur->db = ((STaskDbWrapper*)wrapper)->db;
18✔
3494

3495
  pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
36✔
3496
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
18✔
3497

3498
  char    buf[128] = {0};
18✔
3499
  int32_t klen = stateKeyEncode((void*)&maxStateKey, buf);
18✔
3500
  rocksdb_iter_seek(pCur->iter, buf, (size_t)klen);
18✔
3501
  rocksdb_iter_prev(pCur->iter);
18✔
3502
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
18!
3503
    rocksdb_iter_prev(pCur->iter);
×
3504
  }
3505

3506
  if (!rocksdb_iter_valid(pCur->iter)) {
18✔
3507
    streamStateFreeCur(pCur);
16✔
3508
    pCur = NULL;
16✔
3509
  }
3510

3511
  STREAM_STATE_DEL_ROCKSDB(pState, "state", &maxStateKey);
18!
3512
  return pCur;
18✔
3513
}
3514

3515
SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
×
3516
  stDebug("streamStateGetCur_rocksdb");
×
3517

3518
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
×
3519
  if (pState->pTdbState->recalc) {
×
3520
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
3521
  }
3522

3523
  SStreamStateCur* pCur = createStreamStateCursor();
×
3524
  if (pCur == NULL) return NULL;
×
3525

3526
  pCur->db = wrapper->db;
×
3527
  pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
×
3528
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
×
3529
  pCur->number = pState->number;
×
3530

3531
  SStateKey sKey = {.key = *key, .opNum = pState->number};
×
3532
  char      buf[128] = {0};
×
3533
  int       len = stateKeyEncode((void*)&sKey, buf);
×
3534

3535
  rocksdb_iter_seek(pCur->iter, buf, len);
×
3536

3537
  if (rocksdb_iter_valid(pCur->iter) && !iterValueIsStale(pCur->iter)) {
×
3538
    SStateKey curKey;
3539
    size_t    kLen = 0;
×
3540
    char*     keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
×
3541
    TAOS_UNUSED(stateKeyDecode((void*)&curKey, keyStr));
×
3542

3543
    if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) == 0) {
×
3544
      pCur->number = pState->number;
×
3545
      return pCur;
×
3546
    }
3547
  }
3548
  streamStateFreeCur(pCur);
×
3549
  return NULL;
×
3550
}
3551

3552
// func cf
3553
int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) {
100✔
3554
  int    code = 0;
100✔
3555
  char*  dst = NULL;
100✔
3556
  size_t size = 0;
100✔
3557
  if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) {
100!
3558
    STREAM_STATE_PUT_ROCKSDB(pState, "func", key, (void*)value, (int32_t)vLen);
100!
3559
    return code;
100✔
3560
  }
3561
  code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size);
×
3562
  if (code != 0) {
×
3563
    return code;
×
3564
  }
3565
  STREAM_STATE_PUT_ROCKSDB(pState, "func", key, (void*)dst, (int32_t)size);
×
3566
  taosMemoryFree(dst);
×
3567

3568
  return code;
×
3569
}
3570
int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) {
100✔
3571
  int    code = 0;
100✔
3572
  char*  tVal = NULL;
100✔
3573
  size_t tValLen = 0;
100✔
3574
  STREAM_STATE_GET_ROCKSDB(pState, "func", key, tVal, &tValLen);
200!
3575
  if (code != 0) {
100!
3576
    taosMemoryFree(tVal);
×
3577
    return code;
×
3578
  }
3579

3580
  if (pState->pResultRowStore.resultRowGet == NULL || pState->pExprSupp == NULL) {
100!
3581
    *pVal = tVal;
100✔
3582
    *pVLen = tValLen;
100✔
3583
    return code;
100✔
3584
  }
3585

3586
  size_t pValLen = 0;
×
3587
  code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, &pValLen);
×
3588
  *pVLen = (int32_t)pValLen;
×
3589

3590
  taosMemoryFree(tVal);
×
3591
  return code;
×
3592
}
3593
int32_t streamStateFuncDel_rocksdb(SStreamState* pState, const STupleKey* key) {
100✔
3594
  int code = 0;
100✔
3595
  STREAM_STATE_DEL_ROCKSDB(pState, "func", key);
100!
3596
  return 0;
100✔
3597
}
3598

3599
// session cf
3600
int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
100✔
3601
  int              code = 0;
100✔
3602
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
100✔
3603
  if (value == NULL || vLen == 0) {
100!
3604
    stError("streamStateSessionPut_rocksdb val: %p, len: %d", value, vLen);
×
3605
  }
3606
  char*  dst = NULL;
100✔
3607
  size_t size = 0;
100✔
3608
  if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) {
100!
3609
    STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, (void*)value, (int32_t)vLen);
100!
3610
    return code;
100✔
3611
  }
3612

3613
  code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size);
×
3614
  if (code != 0) {
×
3615
    return code;
×
3616
  }
3617
  STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, dst, (int32_t)size);
×
3618
  taosMemoryFree(dst);
×
3619

3620
  return code;
×
3621
}
3622
int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
200✔
3623
  stDebug("streamStateSessionGet_rocksdb");
200!
3624
  int              code = 0;
200✔
3625
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, key);
200✔
3626
  SSessionKey      resKey = *key;
200✔
3627
  void*            tmp = NULL;
200✔
3628
  int32_t          vLen = 0;
200✔
3629

3630
  code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &resKey, &tmp, &vLen);
200✔
3631
  if (code == 0 && key->win.skey == resKey.win.skey) {
200!
3632
    *key = resKey;
200✔
3633

3634
    if (pVal) {
200!
3635
      *pVal = tmp;
200✔
3636
      tmp = NULL;
200✔
3637
    };
3638
    if (pVLen) *pVLen = vLen;
200!
3639
  } else {
3640
    code = -1;
×
3641
  }
3642

3643
  taosMemoryFree(tmp);
200!
3644
  streamStateFreeCur(pCur);
200✔
3645
  return code;
200✔
3646
}
3647

3648
int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* key) {
100✔
3649
  int              code = 0;
100✔
3650
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
100✔
3651
  STREAM_STATE_DEL_ROCKSDB(pState, "sess", &sKey);
100!
3652
  return code;
100✔
3653
}
3654

3655
SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState, int64_t groupId) {
1✔
3656
  stDebug("streamStateSessionSeekToLast_rocksdb");
1!
3657

3658
  int32_t code = 0;
1✔
3659

3660
  SSessionKey      maxSessionKey = {.groupId = groupId, .win = {.skey = INT64_MAX, .ekey = INT64_MAX}};
1✔
3661
  SStateSessionKey maxKey = {.key = maxSessionKey, .opNum = pState->number};
1✔
3662

3663
  STREAM_STATE_PUT_ROCKSDB(pState, "sess", &maxKey, "", 0);
1!
3664
  if (code != 0) {
1!
3665
    return NULL;
×
3666
  }
3667
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
1✔
3668
  if (pState->pTdbState->recalc) {
1!
3669
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
3670
  }
3671

3672
  SStreamStateCur* pCur = createStreamStateCursor();
1✔
3673
  pCur->number = pState->number;
1✔
3674
  pCur->db = wrapper->db;
1✔
3675
  pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
2✔
3676
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
1✔
3677

3678
  char    buf[128] = {0};
1✔
3679
  int32_t klen = stateSessionKeyEncode((void*)&maxKey, buf);
1✔
3680
  rocksdb_iter_seek(pCur->iter, buf, (size_t)klen);
1✔
3681
  rocksdb_iter_prev(pCur->iter);
1✔
3682
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
1!
3683
    rocksdb_iter_prev(pCur->iter);
×
3684
  }
3685

3686
  if (!rocksdb_iter_valid(pCur->iter)) {
1!
3687
    streamStateFreeCur(pCur);
×
3688
    pCur = NULL;
×
3689
  }
3690

3691
  STREAM_STATE_DEL_ROCKSDB(pState, "sess", &maxKey);
1!
3692
  return pCur;
1✔
3693
}
3694

3695
int32_t streamStateSessionCurPrev_rocksdb(SStreamStateCur* pCur) {
×
3696
  stDebug("streamStateCurPrev_rocksdb");
×
3697
  if (!pCur) return -1;
×
3698

3699
  rocksdb_iter_prev(pCur->iter);
×
3700
  return 0;
×
3701
}
3702

3703
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) {
2✔
3704
  stDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb");
2!
3705

3706
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
2✔
3707
  if (pState->pTdbState->recalc) {
2!
3708
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
3709
  }
3710
  SStreamStateCur* pCur = createStreamStateCursor();
2✔
3711
  if (pCur == NULL) {
2!
3712
    return NULL;
×
3713
  }
3714

3715
  pCur->number = pState->number;
2✔
3716
  pCur->db = wrapper->db;
2✔
3717
  pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
4✔
3718
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
2✔
3719
  if (pCur->iter == NULL) {
2!
3720
    streamStateFreeCur(pCur);
×
3721
    return NULL;
×
3722
  }
3723

3724
  char             buf[128] = {0};
2✔
3725
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
2✔
3726
  int              len = stateSessionKeyEncode(&sKey, buf);
2✔
3727
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
2!
3728
    streamStateFreeCur(pCur);
×
3729
    return NULL;
×
3730
  }
3731
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) rocksdb_iter_prev(pCur->iter);
2!
3732

3733
  if (!rocksdb_iter_valid(pCur->iter)) {
2!
3734
    streamStateFreeCur(pCur);
×
3735
    return NULL;
×
3736
  }
3737

3738
  int32_t          c = 0;
2✔
3739
  size_t           klen;
3740
  const char*      iKey = rocksdb_iter_key(pCur->iter, &klen);
2✔
3741
  SStateSessionKey curKey = {0};
2✔
3742
  TAOS_UNUSED(stateSessionKeyDecode(&curKey, (char*)iKey));
2✔
3743
  if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) >= 0) return pCur;
2!
3744

3745
  if (!rocksdb_iter_valid(pCur->iter)) {
×
3746
    streamStateFreeCur(pCur);
×
3747
    return NULL;
×
3748
  }
3749

3750
  rocksdb_iter_prev(pCur->iter);
×
3751
  if (!rocksdb_iter_valid(pCur->iter)) {
×
3752
    streamStateFreeCur(pCur);
×
3753
    return NULL;
×
3754
  }
3755
  return pCur;
×
3756
}
3757
SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, const SSessionKey* key) {
201✔
3758
  stDebug("streamStateSessionSeekKeyCurrentNext_rocksdb");
201!
3759

3760
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
201✔
3761
  if (pState->pTdbState->recalc) {
201!
3762
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
3763
  }
3764
  SStreamStateCur* pCur = createStreamStateCursor();
201✔
3765
  if (pCur == NULL) {
201!
3766
    return NULL;
×
3767
  }
3768
  pCur->db = wrapper->db;
201✔
3769
  pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
402✔
3770
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
201✔
3771
  pCur->number = pState->number;
201✔
3772

3773
  char             buf[128] = {0};
201✔
3774
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
201✔
3775
  int              len = stateSessionKeyEncode(&sKey, buf);
201✔
3776

3777
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
201✔
3778
    streamStateFreeCur(pCur);
1✔
3779
    return NULL;
1✔
3780
  }
3781
  if (iterValueIsStale(pCur->iter)) {
200!
3782
    streamStateFreeCur(pCur);
×
3783
    return NULL;
×
3784
  }
3785
  size_t           klen;
3786
  const char*      iKey = rocksdb_iter_key(pCur->iter, &klen);
200✔
3787
  SStateSessionKey curKey = {0};
200✔
3788
  TAOS_UNUSED(stateSessionKeyDecode(&curKey, (char*)iKey));
200✔
3789
  if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) <= 0) return pCur;
200!
3790

3791
  rocksdb_iter_next(pCur->iter);
×
3792
  if (!rocksdb_iter_valid(pCur->iter)) {
×
3793
    streamStateFreeCur(pCur);
×
3794
    return NULL;
×
3795
  }
3796
  return pCur;
×
3797
}
3798

3799
SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) {
1✔
3800
  stDebug("streamStateSessionSeekKeyNext_rocksdb");
1!
3801
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
1✔
3802
  if (pState->pTdbState->recalc) {
1!
3803
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
3804
  }
3805

3806
  SStreamStateCur* pCur = createStreamStateCursor();
1✔
3807
  if (pCur == NULL) {
1!
3808
    return NULL;
×
3809
  }
3810
  pCur->db = wrapper->db;
1✔
3811
  pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
2✔
3812
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
1✔
3813
  pCur->number = pState->number;
1✔
3814

3815
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
1✔
3816

3817
  char buf[128] = {0};
1✔
3818
  int  len = stateSessionKeyEncode(&sKey, buf);
1✔
3819
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
1!
3820
    streamStateFreeCur(pCur);
×
3821
    return NULL;
×
3822
  }
3823
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) rocksdb_iter_next(pCur->iter);
1!
3824
  if (!rocksdb_iter_valid(pCur->iter)) {
1!
3825
    streamStateFreeCur(pCur);
×
3826
    return NULL;
×
3827
  }
3828

3829
  size_t           klen;
3830
  const char*      iKey = rocksdb_iter_key(pCur->iter, &klen);
1✔
3831
  SStateSessionKey curKey = {0};
1✔
3832
  TAOS_UNUSED(stateSessionKeyDecode(&curKey, (char*)iKey));
1✔
3833
  if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) < 0) return pCur;
1!
3834

3835
  rocksdb_iter_next(pCur->iter);
1✔
3836
  if (!rocksdb_iter_valid(pCur->iter)) {
1!
3837
    streamStateFreeCur(pCur);
×
3838
    return NULL;
×
3839
  }
3840
  return pCur;
1✔
3841
}
3842

3843
SStreamStateCur* streamStateSessionSeekKeyPrev_rocksdb(SStreamState* pState, const SSessionKey* key) {
1✔
3844
  stDebug("streamStateSessionSeekKeyPrev_rocksdb");
1!
3845
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
1✔
3846
  if (pState->pTdbState->recalc) {
1!
3847
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
3848
  }
3849
  SStreamStateCur* pCur = createStreamStateCursor();
1✔
3850
  if (pCur == NULL) {
1!
3851
    return NULL;
×
3852
  }
3853
  pCur->db = wrapper->db;
1✔
3854
  pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
2✔
3855
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
1✔
3856
  pCur->number = pState->number;
1✔
3857

3858
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
1✔
3859

3860
  char buf[128] = {0};
1✔
3861
  int  len = stateSessionKeyEncode(&sKey, buf);
1✔
3862
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
1!
3863
    streamStateFreeCur(pCur);
×
3864
    return NULL;
×
3865
  }
3866
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) rocksdb_iter_prev(pCur->iter);
1!
3867
  if (!rocksdb_iter_valid(pCur->iter)) {
1!
3868
    streamStateFreeCur(pCur);
×
3869
    return NULL;
×
3870
  }
3871

3872
  size_t           klen;
3873
  const char*      iKey = rocksdb_iter_key(pCur->iter, &klen);
1✔
3874
  SStateSessionKey curKey = {0};
1✔
3875
  TAOS_UNUSED(stateSessionKeyDecode(&curKey, (char*)iKey));
1✔
3876
  if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) > 0) return pCur;
1!
3877

3878
  rocksdb_iter_prev(pCur->iter);
1✔
3879
  if (!rocksdb_iter_valid(pCur->iter)) {
1!
3880
    streamStateFreeCur(pCur);
×
3881
    return NULL;
×
3882
  }
3883
  return pCur;
1✔
3884
}
3885

3886
int32_t streamStateSessionGetKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SSessionKey* pKey,
206✔
3887
                                             void** pVal, int32_t* pVLen) {
3888
  if (!pCur) {
206✔
3889
    return -1;
1✔
3890
  }
3891
  SStateSessionKey ktmp = {0};
205✔
3892
  size_t           kLen = 0, vLen = 0;
205✔
3893

3894
  if (!rocksdb_iter_valid(pCur->iter) || iterValueIsStale(pCur->iter)) {
205!
3895
    return -1;
×
3896
  }
3897
  const char* curKey = rocksdb_iter_key(pCur->iter, (size_t*)&kLen);
205✔
3898
  TAOS_UNUSED(stateSessionKeyDecode((void*)&ktmp, (char*)curKey));
205✔
3899

3900
  if (pVal != NULL) *pVal = NULL;
205!
3901
  if (pVLen != NULL) *pVLen = 0;
205!
3902

3903
  SStateSessionKey* pKTmp = &ktmp;
205✔
3904
  const char*       vval = rocksdb_iter_value(pCur->iter, (size_t*)&vLen);
205✔
3905
  char*             val = NULL;
205✔
3906
  int32_t           len = valueDecode((void*)vval, vLen, NULL, &val);
205✔
3907
  if (len < 0) {
205!
3908
    taosMemoryFree(val);
×
3909
    return -1;
×
3910
  }
3911

3912
  if (pKTmp->opNum != pCur->number) {
205!
3913
    taosMemoryFree(val);
×
3914
    return -1;
×
3915
  }
3916
  if (pKey->groupId != 0 && pKey->groupId != pKTmp->key.groupId) {
205!
3917
    taosMemoryFree(val);
×
3918
    return -1;
×
3919
  }
3920

3921
  char*  tVal = val;
205✔
3922
  size_t tVlen = len;
205✔
3923

3924
  if (pVal != NULL) {
205!
3925
    if (pState != NULL && pState->pResultRowStore.resultRowGet != NULL && pState->pExprSupp != NULL) {
205!
3926
      int code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, val, len, (char**)&tVal, (size_t*)&tVlen);
×
3927
      if (code != 0) {
×
3928
        taosMemoryFree(val);
×
3929
        return code;
×
3930
      }
3931
      taosMemoryFree(val);
×
3932
      *pVal = (char*)tVal;
×
3933
    } else {
3934
      *pVal = (char*)tVal;
205✔
3935
    }
3936
  } else {
3937
    taosMemoryFree(val);
×
3938
  }
3939

3940
  if (pVLen != NULL) *pVLen = (int32_t)tVlen;
205!
3941

3942
  *pKey = pKTmp->key;
205✔
3943
  return 0;
205✔
3944
}
3945
// fill cf
3946
int32_t streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
102✔
3947
  int code = 0;
102✔
3948

3949
  STREAM_STATE_PUT_ROCKSDB(pState, "fill", key, value, vLen);
102!
3950
  return code;
102✔
3951
}
3952

3953
int32_t streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
100✔
3954
  int code = 0;
100✔
3955

3956
  STREAM_STATE_GET_ROCKSDB(pState, "fill", key, pVal, pVLen);
100!
3957
  return code;
100✔
3958
}
3959
int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) {
99✔
3960
  int code = 0;
99✔
3961
  STREAM_STATE_DEL_ROCKSDB(pState, "fill", key);
99!
3962
  return code;
99✔
3963
}
3964

3965
SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
1✔
3966
  stDebug("streamStateFillGetCur_rocksdb");
1!
3967
  SStreamStateCur* pCur = createStreamStateCursor();
1✔
3968
  STaskDbWrapper*  wrapper = pState->pTdbState->pOwner->pBackend;
1✔
3969
  if (pState->pTdbState->recalc) {
1!
3970
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
3971
  }
3972

3973
  if (pCur == NULL) return NULL;
1!
3974

3975
  pCur->db = wrapper->db;
1✔
3976
  pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot,
2✔
3977
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
1✔
3978
  pCur->number = pState->number;
1✔
3979

3980
  char buf[128] = {0};
1✔
3981
  int  len = winKeyEncode((void*)key, buf);
1✔
3982
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
1!
3983
    streamStateFreeCur(pCur);
×
3984
    return NULL;
×
3985
  }
3986
  if (iterValueIsStale(pCur->iter)) {
1!
3987
    streamStateFreeCur(pCur);
×
3988
    return NULL;
×
3989
  }
3990

3991
  if (rocksdb_iter_valid(pCur->iter)) {
1!
3992
    size_t  kLen;
3993
    SWinKey curKey;
3994
    char*   keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
1✔
3995
    TAOS_UNUSED(winKeyDecode((void*)&curKey, keyStr));
1✔
3996
    if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) == 0) {
1!
3997
      return pCur;
1✔
3998
    }
3999
  }
4000

4001
  streamStateFreeCur(pCur);
×
4002
  return NULL;
×
4003
}
4004
int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
4✔
4005
  if (!pCur) {
4!
4006
    return -1;
×
4007
  }
4008
  SWinKey winKey;
4009
  if (!rocksdb_iter_valid(pCur->iter) || iterValueIsStale(pCur->iter)) {
4!
4010
    return -1;
×
4011
  }
4012
  size_t klen, vlen;
4013
  char*  keyStr = (char*)rocksdb_iter_key(pCur->iter, &klen);
4✔
4014
  TAOS_UNUSED(winKeyDecode(&winKey, keyStr));
4✔
4015

4016
  const char* valStr = rocksdb_iter_value(pCur->iter, &vlen);
4✔
4017
  int32_t     len = valueDecode((void*)valStr, vlen, NULL, (char**)pVal);
4✔
4018
  if (len < 0) {
4!
4019
    return -1;
×
4020
  }
4021
  if (pVLen != NULL) *pVLen = len;
4!
4022

4023
  *pKey = winKey;
4✔
4024
  return 0;
4✔
4025
}
4026

4027
SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) {
2✔
4028
  stDebug("streamStateFillSeekKeyNext_rocksdb");
2!
4029
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
2✔
4030
  if (pState->pTdbState->recalc) {
2!
4031
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
4032
  }
4033
  SStreamStateCur* pCur = createStreamStateCursor();
2✔
4034
  if (!pCur) {
2!
4035
    return NULL;
×
4036
  }
4037

4038
  pCur->db = wrapper->db;
2✔
4039
  pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot,
4✔
4040
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
2✔
4041
  pCur->number = pState->number;
2✔
4042

4043
  char buf[128] = {0};
2✔
4044
  int  len = winKeyEncode((void*)key, buf);
2✔
4045
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
2!
4046
    streamStateFreeCur(pCur);
×
4047
    return NULL;
×
4048
  }
4049
  // skip stale data
4050
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
2!
4051
    rocksdb_iter_next(pCur->iter);
×
4052
  }
4053

4054
  if (rocksdb_iter_valid(pCur->iter)) {
2!
4055
    SWinKey curKey;
4056
    size_t  kLen = 0;
2✔
4057
    char*   keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
2✔
4058
    TAOS_UNUSED(winKeyDecode((void*)&curKey, keyStr));
2✔
4059
    if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) < 0) {
2!
4060
      return pCur;
×
4061
    }
4062
    rocksdb_iter_next(pCur->iter);
2✔
4063
    return pCur;
2✔
4064
  }
4065
  streamStateFreeCur(pCur);
×
4066
  return NULL;
×
4067
}
4068
SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) {
1✔
4069
  stDebug("streamStateFillSeekKeyPrev_rocksdb");
1!
4070
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
1✔
4071
  if (pState->pTdbState->recalc) {
1!
4072
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
4073
  }
4074
  SStreamStateCur* pCur = createStreamStateCursor();
1✔
4075
  if (pCur == NULL) {
1!
4076
    return NULL;
×
4077
  }
4078

4079
  pCur->db = wrapper->db;
1✔
4080
  pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot,
2✔
4081
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
1✔
4082
  pCur->number = pState->number;
1✔
4083

4084
  char buf[128] = {0};
1✔
4085
  int  len = winKeyEncode((void*)key, buf);
1✔
4086
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
1!
4087
    streamStateFreeCur(pCur);
×
4088
    return NULL;
×
4089
  }
4090
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
1!
4091
    rocksdb_iter_prev(pCur->iter);
×
4092
  }
4093

4094
  if (rocksdb_iter_valid(pCur->iter)) {
1!
4095
    SWinKey curKey;
4096
    size_t  kLen = 0;
1✔
4097
    char*   keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
1✔
4098
    TAOS_UNUSED(winKeyDecode((void*)&curKey, keyStr));
1✔
4099
    if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) > 0) {
1!
4100
      return pCur;
×
4101
    }
4102
    rocksdb_iter_prev(pCur->iter);
1✔
4103
    return pCur;
1✔
4104
  }
4105

4106
  streamStateFreeCur(pCur);
×
4107
  return NULL;
×
4108
}
4109

4110
SStreamStateCur* streamStateFillSeekToLast_rocksdb(SStreamState* pState) {
×
4111
  SWinKey key = {.groupId = UINT64_MAX, .ts = INT64_MAX};
×
4112
  return streamStateFillSeekKeyPrev_rocksdb(pState, &key);
×
4113
}
4114

4115
#ifdef BUILD_NO_CALL
4116
int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
4117
  stDebug("streamStateSessionGetKeyByRange_rocksdb");
4118
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
4119
  if (pState->pTdbState->recalc) {
4120
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
4121
  }
4122
  SStreamStateCur* pCur = createStreamStateCursor();
4123
  if (pCur == NULL) {
4124
    return -1;
4125
  }
4126
  pCur->db = wrapper->db;
4127
  pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
4128
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
4129
  pCur->number = pState->number;
4130

4131
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
4132
  int32_t          c = 0;
4133
  char             buf[128] = {0};
4134
  int              len = stateSessionKeyEncode(&sKey, buf);
4135
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
4136
    streamStateFreeCur(pCur);
4137
    return -1;
4138
  }
4139

4140
  size_t           kLen;
4141
  const char*      iKeyStr = rocksdb_iter_key(pCur->iter, (size_t*)&kLen);
4142
  SStateSessionKey iKey = {0};
4143
  stateSessionKeyDecode(&iKey, (char*)iKeyStr);
4144

4145
  c = stateSessionKeyCmpr(&sKey, sizeof(sKey), &iKey, sizeof(iKey));
4146

4147
  SSessionKey resKey = *key;
4148
  int32_t     code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &resKey, NULL, NULL);
4149
  if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
4150
    *curKey = resKey;
4151
    streamStateFreeCur(pCur);
4152
    return code;
4153
  }
4154

4155
  if (c > 0) {
4156
    streamStateCurNext_rocksdb(pCur);
4157
    code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &resKey, NULL, NULL);
4158
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
4159
      *curKey = resKey;
4160
      streamStateFreeCur(pCur);
4161
      return code;
4162
    }
4163
  } else if (c < 0) {
4164
    streamStateCurPrev(pState, pCur);
4165
    code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &resKey, NULL, NULL);
4166
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
4167
      *curKey = resKey;
4168
      streamStateFreeCur(pCur);
4169
      return code;
4170
    }
4171
  }
4172

4173
  streamStateFreeCur(pCur);
4174
  return -1;
4175
}
4176
#endif
4177

4178
int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
1✔
4179
                                                int32_t* pVLen) {
4180
  stDebug("streamStateSessionAddIfNotExist_rocksdb");
1!
4181
  // todo refactor
4182
  int32_t     res = 0;
1✔
4183
  SSessionKey originKey = *key;
1✔
4184
  SSessionKey searchKey = *key;
1✔
4185
  searchKey.win.skey = key->win.skey - gap;
1✔
4186
  searchKey.win.ekey = key->win.ekey + gap;
1✔
4187
  int32_t valSize = *pVLen;
1✔
4188

4189
  void* tmp = taosMemoryMalloc(valSize);
1!
4190
  if (tmp == NULL) {
1!
4191
    return terrno;
×
4192
  }
4193

4194
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key);
1✔
4195
  int32_t          code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, key, pVal, pVLen);
1✔
4196

4197
  if (code == 0) {
1!
4198
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
1!
4199
      memcpy(tmp, *pVal, *pVLen);
1✔
4200
      taosMemoryFreeClear(*pVal);
1!
4201
      goto _end;
1✔
4202
    }
4203
    taosMemoryFreeClear(*pVal);
×
4204
    streamStateCurNext_rocksdb(pCur);
×
4205
  } else {
4206
    *key = originKey;
×
4207
    streamStateFreeCur(pCur);
×
4208
    taosMemoryFreeClear(*pVal);
×
4209
    pCur = streamStateSessionSeekKeyNext_rocksdb(pState, key);
×
4210
  }
4211

4212
  code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, key, pVal, pVLen);
×
4213
  if (code == 0) {
×
4214
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
×
4215
      memcpy(tmp, *pVal, *pVLen);
×
4216
      goto _end;
×
4217
    }
4218
  }
4219

4220
  *key = originKey;
×
4221
  res = 1;
×
4222
  memset(tmp, 0, valSize);
×
4223

4224
_end:
1✔
4225
  taosMemoryFree(*pVal);
1!
4226
  *pVal = tmp;
1✔
4227
  streamStateFreeCur(pCur);
1✔
4228
  return res;
1✔
4229
}
4230
void streamStateSessionClear_rocksdb(SStreamState* pState) {
1✔
4231
  stDebug("streamStateSessionClear_rocksdb");
1!
4232
  SSessionKey      key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
1✔
4233
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, &key);
1✔
4234

4235
  while (1) {
×
4236
    SSessionKey delKey = {0};
1✔
4237
    void*       buf = NULL;
1✔
4238
    int32_t     size = 0;
1✔
4239
    int32_t     code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &delKey, &buf, &size);
1✔
4240
    if (code == 0 && size > 0) {
1!
4241
      memset(buf, 0, size);
×
4242
      // refactor later
4243
      TAOS_UNUSED(streamStateSessionPut_rocksdb(pState, &delKey, buf, size));
×
4244
    } else {
4245
      taosMemoryFreeClear(buf);
1!
4246
      break;
1✔
4247
    }
4248
    taosMemoryFreeClear(buf);
×
4249

4250
    streamStateCurNext_rocksdb(pCur);
×
4251
  }
4252
  streamStateFreeCur(pCur);
1✔
4253
}
1✔
4254
int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, char* pKeyData,
1✔
4255
                                              int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) {
4256
  stDebug("streamStateStateAddIfNotExist_rocksdb");
1!
4257
  // todo refactor
4258
  int32_t     res = 0;
1✔
4259
  SSessionKey tmpKey = *key;
1✔
4260
  int32_t     valSize = *pVLen;
1✔
4261

4262
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key);
1✔
4263
  int32_t          code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, key, pVal, pVLen);
1✔
4264
  if (code == 0) {
1!
4265
    if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) {
1!
4266
      goto _end;
1✔
4267
    }
4268

4269
    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
×
4270
    if (fn(pKeyData, stateKey) == true) {
×
4271
      goto _end;
×
4272
    }
4273

4274
    streamStateCurNext_rocksdb(pCur);
×
4275
  } else {
4276
    *key = tmpKey;
×
4277
    streamStateFreeCur(pCur);
×
4278
    pCur = streamStateSessionSeekKeyNext_rocksdb(pState, key);
×
4279
  }
4280
  taosMemoryFreeClear(*pVal);
×
4281
  code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, key, pVal, pVLen);
×
4282
  if (code == 0) {
×
4283
    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
×
4284
    if (fn(pKeyData, stateKey) == true) {
×
4285
      goto _end;
×
4286
    }
4287
  }
4288
  taosMemoryFreeClear(*pVal);
×
4289

4290
  *key = tmpKey;
×
4291
  res = 1;
×
4292

4293
_end:
1✔
4294
  if (res == 0 && valSize > *pVLen){
1!
4295
    stError("[InternalERR] [skey:%"PRId64 ",ekey:%"PRId64 ",groupId:%"PRIu64 "],valSize:%d bigger than get rocksdb len:%d", key->win.skey, key->win.ekey, key->groupId, valSize, *pVLen);
×
4296
  }
4297
  streamStateFreeCur(pCur);
1✔
4298
  return res;
1✔
4299
}
4300

4301
//  partag cf
4302
int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen) {
×
4303
  int    code = 0;
×
4304
  char*  dst = NULL;
×
4305
  size_t size = 0;
×
4306
  if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL || tag == NULL) {
×
4307
    STREAM_STATE_PUT_ROCKSDB(pState, "partag", &groupId, tag, tagLen);
×
4308
    return code;
×
4309
  }
4310
  code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, tag, tagLen, &dst, &size);
×
4311
  if (code != 0) {
×
4312
    return code;
×
4313
  }
4314
  STREAM_STATE_PUT_ROCKSDB(pState, "partag", &groupId, dst, (int32_t)size);
×
4315
  taosMemoryFree(dst);
×
4316
  return code;
×
4317
}
4318

4319
void streamStateParTagSeekKeyNext_rocksdb(SStreamState* pState, const int64_t groupId, SStreamStateCur* pCur) {
×
4320
  if (pCur == NULL) {
×
4321
    return;
×
4322
  }
4323
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
×
4324
  if (pState->pTdbState->recalc) {
×
4325
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
4326
  }
4327

4328
  pCur->number = pState->number;
×
4329
  pCur->db = wrapper->db;
×
4330
  pCur->iter = streamStateIterCreate(pState, "partag", (rocksdb_snapshot_t**)&pCur->snapshot,
×
4331
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
×
4332
  int i = streamStateGetCfIdx(pState, "partag");
×
4333
  if (i < 0) {
×
4334
    stError("streamState failed to put to cf name:%s", "partag");
×
4335
    return;
×
4336
  }
4337

4338
  char    buf[128] = {0};
×
4339
  int32_t klen = ginitDict[i].enFunc((void*)&groupId, buf);
×
4340
  if (!streamStateIterSeekAndValid(pCur->iter, buf, klen)) {
×
4341
    return;
×
4342
  }
4343
  // skip ttl expired data
4344
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
×
4345
    rocksdb_iter_next(pCur->iter);
×
4346
  }
4347

4348
  if (rocksdb_iter_valid(pCur->iter)) {
×
4349
    int64_t curGroupId;
4350
    size_t  kLen = 0;
×
4351
    char*   keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
×
4352
    TAOS_UNUSED(parKeyDecode((void*)&curGroupId, keyStr));
×
4353
    if (curGroupId > groupId) return;
×
4354

4355
    rocksdb_iter_next(pCur->iter);
×
4356
  }
4357
}
4358

4359
int32_t streamStateParTagGetKVByCur_rocksdb(SStreamStateCur* pCur, int64_t* pGroupId, void** pVal, int32_t* pVLen) {
×
4360
  stDebug("streamStateFillGetKVByCur_rocksdb");
×
4361
  if (!pCur) {
×
4362
    return -1;
×
4363
  }
4364
  SWinKey winKey;
4365
  if (!rocksdb_iter_valid(pCur->iter) || iterValueIsStale(pCur->iter)) {
×
4366
    return -1;
×
4367
  }
4368

4369
  size_t klen, vlen;
4370
  char*  keyStr = (char*)rocksdb_iter_key(pCur->iter, &klen);
×
4371
  (void)parKeyDecode(pGroupId, keyStr);
×
4372

4373
  if (pVal) {
×
4374
    const char* valStr = rocksdb_iter_value(pCur->iter, &vlen);
×
4375
    int32_t     len = valueDecode((void*)valStr, vlen, NULL, (char**)pVal);
×
4376
    if (len < 0) {
×
4377
      return -1;
×
4378
    }
4379
    if (pVLen != NULL) *pVLen = len;
×
4380
  }
4381

4382
  return 0;
×
4383
}
4384

4385
int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen) {
×
4386
  int    code = 0;
×
4387
  char*  tVal;
4388
  size_t tValLen = 0;
×
4389
  STREAM_STATE_GET_ROCKSDB(pState, "partag", &groupId, &tVal, &tValLen);
×
4390
  if (code != 0) {
×
4391
    taosMemoryFree(tVal);
×
4392
    return code;
×
4393
  }
4394
  code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)tagVal, (size_t*)tagLen);
×
4395
  taosMemoryFree(tVal);
×
4396

4397
  return code;
×
4398
}
4399

4400
// parname cfg
4401
int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
210✔
4402
  int code = 0;
210✔
4403
  STREAM_STATE_PUT_ROCKSDB(pState, "parname", &groupId, (char*)tbname, TSDB_TABLE_NAME_LEN);
210!
4404
  return code;
210✔
4405
}
4406
int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, void** pVal) {
484✔
4407
  int    code = 0;
484✔
4408
  size_t tagLen;
4409
  STREAM_STATE_GET_ROCKSDB(pState, "parname", &groupId, pVal, &tagLen);
684!
4410
  return code;
484✔
4411
}
4412

4413
int32_t streamStateDeleteParName_rocksdb(SStreamState* pState, int64_t groupId) {
×
4414
  int code = 0;
×
4415
  STREAM_STATE_DEL_ROCKSDB(pState, "parname", &groupId);
×
4416
  return code;
×
4417
}
4418

4419
int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen) {
100✔
4420
  int code = 0;
100✔
4421
  STREAM_STATE_PUT_ROCKSDB(pState, "default", key, pVal, pVLen);
100!
4422
  return code;
100✔
4423
}
4424
int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen) {
153✔
4425
  int code = 0;
153✔
4426
  STREAM_STATE_GET_ROCKSDB(pState, "default", key, pVal, pVLen);
153!
4427
  return code;
153✔
4428
}
4429
int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key) {
×
4430
  int code = 0;
×
4431
  STREAM_STATE_DEL_ROCKSDB(pState, "default", key);
×
4432
  return code;
×
4433
}
4434

4435
int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result) {
1✔
4436
  int   code = 0;
1✔
4437
  char* err = NULL;
1✔
4438

4439
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
1✔
4440
  if (pState->pTdbState->recalc) {
1!
4441
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
4442
  }
4443
  rocksdb_snapshot_t*    snapshot = NULL;
1✔
4444
  rocksdb_readoptions_t* readopts = NULL;
1✔
4445
  rocksdb_iterator_t*    pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts);
1✔
4446
  if (pIter == NULL) {
1!
4447
    return -1;
×
4448
  }
4449
  size_t klen = 0;
1✔
4450
  rocksdb_iter_seek(pIter, start, strlen(start));
1✔
4451
  while (rocksdb_iter_valid(pIter)) {
101✔
4452
    const char* key = rocksdb_iter_key(pIter, &klen);
100✔
4453
    int32_t     vlen = 0;
100✔
4454
    const char* vval = rocksdb_iter_value(pIter, (size_t*)&vlen);
100✔
4455
    char*       val = NULL;
100✔
4456
    int32_t     len = valueDecode((void*)vval, vlen, NULL, NULL);
100✔
4457
    if (len < 0) {
100!
4458
      rocksdb_iter_next(pIter);
×
4459
      continue;
×
4460
    }
4461

4462
    if (end != NULL && strcmp(key, end) > 0) {
100!
4463
      break;
×
4464
    }
4465
    if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) {
100!
4466
      int64_t checkPoint = 0;
100✔
4467
      if (sscanf(key + strlen(key), ":%" PRId64, &checkPoint) == 1) {
100!
4468
        if (taosArrayPush(result, &checkPoint) == NULL) {
×
4469
          code = terrno;
×
4470
          break;
×
4471
        }
4472
      }
4473
    } else {
4474
      break;
4475
    }
4476
    rocksdb_iter_next(pIter);
100✔
4477
  }
4478
  rocksdb_release_snapshot(wrapper->db, snapshot);
1✔
4479
  rocksdb_readoptions_destroy(readopts);
1✔
4480
  rocksdb_iter_destroy(pIter);
1✔
4481
  return code;
1✔
4482
}
4483
#ifdef BUILD_NO_CALL
4484
void* streamDefaultIterCreate_rocksdb(SStreamState* pState) {
4485
  SStreamStateCur* pCur = createStreamStateCursor();
4486
  STaskDbWrapper*  wrapper = pState->pTdbState->pOwner->pBackend;
4487
  if (pState->pTdbState->recalc) {
4488
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
4489
  }
4490

4491
  pCur->db = wrapper->db;
4492
  pCur->iter = streamStateIterCreate(pState, "default", (rocksdb_snapshot_t**)&pCur->snapshot,
4493
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
4494
  pCur->number = pState->number;
4495
  return pCur;
4496
}
4497
bool streamDefaultIterValid_rocksdb(void* iter) {
4498
  if (iter) {
4499
    return false;
4500
  }
4501
  SStreamStateCur* pCur = iter;
4502
  return (rocksdb_iter_valid(pCur->iter) && !iterValueIsStale(pCur->iter)) ? true : false;
4503
}
4504
void streamDefaultIterSeek_rocksdb(void* iter, const char* key) {
4505
  SStreamStateCur* pCur = iter;
4506
  rocksdb_iter_seek(pCur->iter, key, strlen(key));
4507
}
4508
void streamDefaultIterNext_rocksdb(void* iter) {
4509
  SStreamStateCur* pCur = iter;
4510
  rocksdb_iter_next(pCur->iter);
4511
}
4512
char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len) {
4513
  SStreamStateCur* pCur = iter;
4514
  return (char*)rocksdb_iter_key(pCur->iter, (size_t*)len);
4515
}
4516
char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len) {
4517
  SStreamStateCur* pCur = iter;
4518
  char*            ret = NULL;
4519

4520
  int32_t     vlen = 0;
4521
  const char* val = rocksdb_iter_value(pCur->iter, (size_t*)&vlen);
4522
  *len = valueDecode((void*)val, vlen, NULL, &ret);
4523
  if (*len < 0) {
4524
    taosMemoryFree(ret);
4525
    return NULL;
4526
  }
4527

4528
  return ret;
4529
}
4530
#endif
4531
// batch func
4532
void* streamStateCreateBatch() {
12✔
4533
  rocksdb_writebatch_t* pBatch = rocksdb_writebatch_create();
12✔
4534
  return pBatch;
12✔
4535
}
4536
int32_t streamStateGetBatchSize(void* pBatch) {
14✔
4537
  if (pBatch == NULL) return 0;
14!
4538
  return rocksdb_writebatch_count(pBatch);
14✔
4539
}
4540

4541
void    streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_writebatch_t*)pBatch); }
1✔
4542
void    streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); }
12✔
4543
int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb_writebatch_t* pBatch, void* key,
4✔
4544
                            void* val, int32_t vlen, int64_t ttl) {
4545
  int32_t         code = 0;
4✔
4546
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
4✔
4547
  if (pState->pTdbState->recalc) {
4!
4548
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
4549
  }
4550
  TAOS_UNUSED(atomic_add_fetch_64(&wrapper->dataWritten, 1));
4✔
4551

4552
  int i = streamStateGetCfIdx(pState, cfKeyName);
4✔
4553
  if (i < 0) {
4!
4554
    stError("streamState failed to put to cf name:%s", cfKeyName);
×
4555
    return -1;
×
4556
  }
4557

4558
  char    buf[128] = {0};
4✔
4559
  int32_t klen = ginitDict[i].enFunc((void*)key, buf);
4✔
4560

4561
  char*   ttlV = NULL;
4✔
4562
  int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, ttl, &ttlV);
4✔
4563

4564
  rocksdb_column_family_handle_t* pCf = wrapper->pCf[ginitDict[i].idx];
4✔
4565
  rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
4✔
4566
  taosMemoryFree(ttlV);
4!
4567

4568
  {
4569
    char tbuf[256] = {0};
4✔
4570
    TAOS_UNUSED(ginitDict[i].toStrFunc((void*)key, tbuf));
4✔
4571
    stTrace("streamState str: %s succ to write to %s_%s, len: %d", tbuf, wrapper->idstr, ginitDict[i].key, vlen);
4!
4572
  }
4573
  return 0;
4✔
4574
}
4575

4576
int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb_writebatch_t* pBatch, void* key,
8✔
4577
                                    void* val, int32_t vlen, int64_t ttl, void* tmpBuf) {
4578
  int32_t code = 0;
8✔
4579
  char    buf[128] = {0};
8✔
4580
  char    toString[128] = {0};
8✔
4581

4582
  char*  dst = NULL;
8✔
4583
  size_t size = 0;
8✔
4584
  if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) {
8!
4585
    dst = val;
×
4586
    size = vlen;
×
4587
  } else {
4588
    code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, val, vlen, &dst, &size);
8✔
4589
    if (code != 0) {
8!
4590
      return code;
×
4591
    }
4592
  }
4593
  int32_t klen = ginitDict[cfIdx].enFunc((void*)key, buf);
8✔
4594

4595
  ginitDict[cfIdx].toStrFunc((void*)key, toString);
8✔
4596
  stDebug("[InternalERR] write cfIdx:%d key:%s vlen:%d", cfIdx, toString, vlen);
8!
4597

4598
  char*   ttlV = tmpBuf;
8✔
4599
  int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(dst, size, ttl, &ttlV);
8✔
4600

4601
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
8✔
4602
  if (pState->pTdbState->recalc) {
8!
4603
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
4604
  }
4605

4606
  TAOS_UNUSED(atomic_add_fetch_64(&wrapper->dataWritten, 1));
8✔
4607

4608
  rocksdb_column_family_handle_t* pCf = wrapper->pCf[ginitDict[cfIdx].idx];
8✔
4609
  rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
8✔
4610

4611
  if (pState->pResultRowStore.resultRowPut != NULL && pState->pExprSupp != NULL) {
8!
4612
    taosMemoryFree(dst);
8!
4613
  }
4614

4615
  if (tmpBuf == NULL) {
8!
4616
    taosMemoryFree(ttlV);
×
4617
  }
4618

4619
  {
4620
    char tbuf[256] = {0};
8✔
4621
    TAOS_UNUSED(ginitDict[cfIdx].toStrFunc((void*)key, tbuf));
8✔
4622
    stTrace("streamState str: %s succ to write to %s_%s", tbuf, wrapper->idstr, ginitDict[cfIdx].key);
8!
4623
  }
4624
  return 0;
8✔
4625
}
4626
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) {
8✔
4627
  char*           err = NULL;
8✔
4628
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
8✔
4629
  if (pState->pTdbState->recalc) {
8!
4630
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
4631
  }
4632
  TAOS_UNUSED(atomic_add_fetch_64(&wrapper->dataWritten, 1));
8✔
4633
  rocksdb_write(wrapper->db, wrapper->writeOpt, (rocksdb_writebatch_t*)pBatch, &err);
8✔
4634
  if (err != NULL) {
8!
4635
    stError("streamState failed to write batch, err:%s", err);
×
4636
    taosMemoryFree(err);
×
4637
    return -1;
×
4638
  } else {
4639
    stDebug("write batch to backend:%p", wrapper->db);
8✔
4640
  }
4641
  return 0;
8✔
4642
}
4643
uint32_t nextPow2(uint32_t x) {
14✔
4644
  if (x <= 1) return 2;
14✔
4645
  x = x - 1;
12✔
4646
  x = x | (x >> 1);
12✔
4647
  x = x | (x >> 2);
12✔
4648
  x = x | (x >> 4);
12✔
4649
  x = x | (x >> 8);
12✔
4650
  x = x | (x >> 16);
12✔
4651
  return x + 1;
12✔
4652
}
4653

4654
#ifdef BUILD_NO_CALL
4655
int32_t copyFiles(const char* src, const char* dst) {
4656
  int32_t code = 0;
4657
  // opt later, just hard link
4658
  int32_t sLen = strlen(src);
4659
  int32_t dLen = strlen(dst);
4660
  char*   srcName = taosMemoryCalloc(1, sLen + 64);
4661
  char*   dstName = taosMemoryCalloc(1, dLen + 64);
4662

4663
  TdDirPtr pDir = taosOpenDir(src);
4664
  if (pDir == NULL) {
4665
    taosMemoryFree(srcName);
4666
    taosMemoryFree(dstName);
4667
    return -1;
4668
  }
4669

4670
  TdDirEntryPtr de = NULL;
4671
  while ((de = taosReadDir(pDir)) != NULL) {
4672
    char* name = taosGetDirEntryName(de);
4673
    if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue;
4674

4675
    sprintf(srcName, "%s%s%s", src, TD_DIRSEP, name);
4676
    sprintf(dstName, "%s%s%s", dst, TD_DIRSEP, name);
4677
    if (!taosDirEntryIsDir(de)) {
4678
      code = taosCopyFile(srcName, dstName);
4679
      if (code == -1) {
4680
        goto _err;
4681
      }
4682
    }
4683

4684
    memset(srcName, 0, sLen + 64);
4685
    memset(dstName, 0, dLen + 64);
4686
  }
4687

4688
_err:
4689
  taosMemoryFreeClear(srcName);
4690
  taosMemoryFreeClear(dstName);
4691
  taosCloseDir(&pDir);
4692
  return code >= 0 ? 0 : -1;
4693
}
4694
#endif
4695

4696
int32_t isBkdDataMeta(char* name, int32_t len) {
12✔
4697
  const char* pCurrent = "CURRENT";
12✔
4698
  int32_t     currLen = strlen(pCurrent);
12✔
4699

4700
  const char* pManifest = "MANIFEST-";
12✔
4701
  int32_t     maniLen = strlen(pManifest);
12✔
4702

4703
  if (len >= maniLen && strncmp(name, pManifest, maniLen) == 0) {
12!
4704
    return 1;
×
4705
  } else if (len == currLen && strcmp(name, pCurrent) == 0) {
12!
4706
    return 1;
×
4707
  }
4708
  return 0;
12✔
4709
}
4710
int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) {
1✔
4711
  int32_t code = 0;
1✔
4712
  size_t  len = 0;
1✔
4713
  void*   pIter = taosHashIterate(p2, NULL);
1✔
4714
  while (pIter) {
7✔
4715
    char* name = taosHashGetKey(pIter, &len);
6✔
4716
    if (!isBkdDataMeta(name, len) && !taosHashGet(p1, name, len)) {
6!
4717
      int32_t cap = len + 1;
×
4718
      char*   fname = taosMemoryCalloc(1, cap);
×
4719
      if (fname == NULL) {
×
4720
        return terrno;
×
4721
      }
4722
      tstrncpy(fname, name, cap);
×
4723
      if (taosArrayPush(diff, &fname) == NULL) {
×
4724
        taosMemoryFree(fname);
×
4725
        return terrno;
×
4726
      }
4727
    }
4728
    pIter = taosHashIterate(p2, pIter);
6✔
4729
  }
4730
  return code;
1✔
4731
}
4732
int32_t compareHashTable(SHashObj* p1, SHashObj* p2, SArray* add, SArray* del) {
1✔
4733
  int32_t code = 0;
1✔
4734

4735
  code = compareHashTableImpl(p1, p2, add);
1✔
4736
  if (code != 0) {
1!
4737
    code = compareHashTableImpl(p2, p1, del);
×
4738
  }
4739

4740
  return code;
1✔
4741
}
4742

4743
void hashTableToDebug(SHashObj* pTbl, char** buf) {
6✔
4744
  size_t  sz = taosHashGetSize(pTbl);
6✔
4745
  int32_t total = 0;
6✔
4746
  int32_t cap = sz * 16 + 4;
6✔
4747

4748
  char* p = taosMemoryCalloc(1, cap);
6!
4749
  if (p == NULL) {
6!
4750
    stError("failed to alloc memory for stream snapshot debug info");
×
4751
    return;
×
4752
  }
4753

4754
  void* pIter = taosHashIterate(pTbl, NULL);
6✔
4755
  while (pIter) {
24✔
4756
    size_t len = 0;
18✔
4757
    char*  name = taosHashGetKey(pIter, &len);
18✔
4758
    if (name == NULL || len <= 0) {
18!
4759
      pIter = taosHashIterate(pTbl, pIter);
×
4760
      continue;
×
4761
    }
4762
    int32_t left = cap - strlen(p);
18✔
4763
    int32_t nBytes = snprintf(p + total, left, "%s,", name);
18✔
4764
    if (nBytes <= 0 || nBytes >= left) {
18!
4765
      stError("failed to debug snapshot info since %s", tstrerror(TSDB_CODE_OUT_OF_RANGE));
×
4766
      taosMemoryFree(p);
×
4767
      return;
×
4768
    }
4769

4770
    pIter = taosHashIterate(pTbl, pIter);
18✔
4771
    total += nBytes;
18✔
4772
  }
4773
  if (total > 0) {
6✔
4774
    p[total - 1] = 0;
3✔
4775
  }
4776
  *buf = p;
6✔
4777
}
4778
void strArrayDebugInfo(SArray* pArr, char** buf) {
6✔
4779
  int32_t sz = taosArrayGetSize(pArr);
6✔
4780
  if (sz <= 0) return;
6✔
4781

4782
  int32_t code = 0;
1✔
4783
  int32_t total = 0, nBytes = 0;
1✔
4784
  int32_t cap = 64 + sz * 64;
1✔
4785

4786
  char* p = (char*)taosMemoryCalloc(1, cap);
1!
4787
  if (p == NULL) {
1!
4788
    stError("failed to alloc memory for stream snapshot debug info");
×
4789
    return;
×
4790
  }
4791

4792
  for (int i = 0; i < sz; i++) {
7✔
4793
    char*   name = taosArrayGetP(pArr, i);
6✔
4794
    int32_t left = cap - strlen(p);
6✔
4795
    nBytes = snprintf(p + total, left, "%s,", name);
6✔
4796
    if (nBytes <= 0 || nBytes >= left) {
6!
4797
      code = TSDB_CODE_OUT_OF_RANGE;
×
4798
      stError("failed to debug snapshot info since %s", tstrerror(code));
×
4799
      taosMemoryFree(p);
×
4800
      return;
×
4801
    }
4802

4803
    total += nBytes;
6✔
4804
  }
4805

4806
  p[total - 1] = 0;
1✔
4807

4808
  *buf = p;
1✔
4809
}
4810
void dbChkpDebugInfo(SDbChkp* pDb) {
3✔
4811
  if (stDebugFlag & DEBUG_INFO) {
3!
4812
    char* p[4] = {NULL};
3✔
4813

4814
    hashTableToDebug(pDb->pSstTbl[pDb->idx], &p[0]);
3✔
4815
    if (p[0]) stTrace("chkp previous file: [%s]", p[0]);
3!
4816

4817
    hashTableToDebug(pDb->pSstTbl[1 - pDb->idx], &p[1]);
3✔
4818
    if (p[1]) stTrace("chkp curr file: [%s]", p[1]);
3!
4819

4820
    strArrayDebugInfo(pDb->pAdd, &p[2]);
3✔
4821
    if (p[2]) stTrace("chkp newly addded file: [%s]", p[2]);
3!
4822

4823
    strArrayDebugInfo(pDb->pDel, &p[3]);
3✔
4824
    if (p[3]) stTrace("chkp newly deleted file: [%s]", p[3]);
3!
4825

4826
    for (int i = 0; i < 4; i++) {
15✔
4827
      taosMemoryFree(p[i]);
12!
4828
    }
4829
  }
4830
}
3✔
4831
int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) {
3✔
4832
  int32_t code = 0;
3✔
4833
  int32_t nBytes;
4834
  TAOS_UNUSED(taosThreadRwlockWrlock(&p->rwLock));
3✔
4835

4836
  p->preCkptId = p->curChkpId;
3✔
4837
  p->curChkpId = chkpId;
3✔
4838
  const char* pCurrent = "CURRENT";
3✔
4839
  int32_t     currLen = strlen(pCurrent);
3✔
4840

4841
  const char* pManifest = "MANIFEST-";
3✔
4842
  int32_t     maniLen = strlen(pManifest);
3✔
4843

4844
  const char* pSST = ".sst";
3✔
4845
  int32_t     sstLen = strlen(pSST);
3✔
4846

4847
  memset(p->buf, 0, p->len);
3✔
4848

4849
  nBytes =
4850
      snprintf(p->buf, p->len, "%s%s%s%scheckpoint%" PRId64, p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId);
3✔
4851
  if (nBytes <= 0 || nBytes >= p->len) {
3!
4852
    TAOS_UNUSED(taosThreadRwlockUnlock(&p->rwLock));
×
4853
    return TSDB_CODE_OUT_OF_RANGE;
×
4854
  }
4855

4856
  taosArrayClearP(p->pAdd, NULL);
3✔
4857
  taosArrayClearP(p->pDel, NULL);
3✔
4858
  taosHashClear(p->pSstTbl[1 - p->idx]);
3✔
4859

4860
  TdDirPtr pDir = taosOpenDir(p->buf);
3✔
4861
  if (pDir == NULL) {
3!
4862
    TAOS_UNUSED(taosThreadRwlockUnlock(&p->rwLock));
×
4863
    return terrno;
×
4864
  }
4865

4866
  TdDirEntryPtr de = NULL;
3✔
4867
  int8_t        dummy = 0;
3✔
4868
  while ((de = taosReadDir(pDir)) != NULL) {
33✔
4869
    char* name = taosGetDirEntryName(de);
30✔
4870
    if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue;
30✔
4871
    if (strlen(name) == currLen && strcmp(name, pCurrent) == 0) {
24!
4872
      taosMemoryFreeClear(p->pCurrent);
3!
4873

4874
      p->pCurrent = taosStrdup(name);
3!
4875
      if (p->pCurrent == NULL) {
3!
4876
        code = terrno;
×
4877
        break;
×
4878
      }
4879
      continue;
3✔
4880
    }
4881

4882
    if (strlen(name) >= maniLen && strncmp(name, pManifest, maniLen) == 0) {
21✔
4883
      taosMemoryFreeClear(p->pManifest);
3!
4884
      p->pManifest = taosStrdup(name);
3!
4885
      if (p->pManifest == NULL) {
3!
4886
        code = terrno;
×
4887
        break;
×
4888
      }
4889
      continue;
3✔
4890
    }
4891
    if (strlen(name) >= sstLen && strncmp(name + strlen(name) - 4, pSST, sstLen) == 0) {
18!
4892
      if (taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy)) != 0) {
12!
4893
        break;
×
4894
      }
4895
      continue;
12✔
4896
    }
4897
  }
4898
  TAOS_UNUSED(taosCloseDir(&pDir));
3✔
4899
  if (code != 0) {
3!
4900
    TAOS_UNUSED(taosThreadRwlockUnlock(&p->rwLock));
×
4901
    return code;
×
4902
  }
4903

4904
  if (p->init == 0) {
3✔
4905
    void* pIter = taosHashIterate(p->pSstTbl[1 - p->idx], NULL);
2✔
4906
    while (pIter) {
8✔
4907
      size_t len = 0;
6✔
4908
      char*  name = taosHashGetKey(pIter, &len);
6✔
4909
      if (name != NULL && !isBkdDataMeta(name, len)) {
6!
4910
        int32_t cap = len + 1;
6✔
4911
        char*   fname = taosMemoryCalloc(1, cap);
6!
4912
        if (fname == NULL) {
6!
4913
          TAOS_UNUSED(taosThreadRwlockUnlock(&p->rwLock));
×
4914
          return terrno;
×
4915
        }
4916

4917
        tstrncpy(fname, name, cap);
6✔
4918
        if (taosArrayPush(p->pAdd, &fname) == NULL) {
12!
4919
          taosMemoryFree(fname);
×
4920
          TAOS_UNUSED(taosThreadRwlockUnlock(&p->rwLock));
×
4921
          return terrno;
×
4922
        }
4923
      }
4924
      pIter = taosHashIterate(p->pSstTbl[1 - p->idx], pIter);
6✔
4925
    }
4926
    if (taosArrayGetSize(p->pAdd) > 0) p->update = 1;
2✔
4927

4928
    p->init = 1;
2✔
4929
    p->preCkptId = -1;
2✔
4930
    p->curChkpId = chkpId;
2✔
4931
  } else {
4932
    int32_t code = compareHashTable(p->pSstTbl[p->idx], p->pSstTbl[1 - p->idx], p->pAdd, p->pDel);
1✔
4933
    if (code != 0) {
1!
4934
      // dead code
4935
      taosArrayClearP(p->pAdd, NULL);
×
4936
      taosArrayClearP(p->pDel, NULL);
×
4937
      taosHashClear(p->pSstTbl[1 - p->idx]);
×
4938
      p->update = 0;
×
4939
      return code;
×
4940
    }
4941

4942
    if (taosArrayGetSize(p->pAdd) == 0 && taosArrayGetSize(p->pDel) == 0) {
1!
4943
      p->update = 0;
1✔
4944
    }
4945

4946
    p->preCkptId = p->curChkpId;
1✔
4947
    p->curChkpId = chkpId;
1✔
4948
  }
4949

4950
  dbChkpDebugInfo(p);
3✔
4951

4952
  p->idx = 1 - p->idx;
3✔
4953

4954
  TAOS_UNUSED(taosThreadRwlockUnlock(&p->rwLock));
3✔
4955

4956
  return code;
3✔
4957
}
4958

4959
void dbChkpDestroy(SDbChkp* pChkp);
4960

4961
int32_t dbChkpCreate(char* path, int64_t initChkpId, SDbChkp** ppChkp) {
2✔
4962
  int32_t  code = 0;
2✔
4963
  SDbChkp* p = taosMemoryCalloc(1, sizeof(SDbChkp));
2!
4964
  if (p == NULL) {
2!
4965
    code = terrno;
×
4966
    goto _EXIT;
×
4967
  }
4968

4969
  p->curChkpId = initChkpId;
2✔
4970
  p->preCkptId = -1;
2✔
4971
  p->pSST = taosArrayInit(64, sizeof(void*));
2✔
4972
  if (p->pSST == NULL) {
2!
4973
    code = terrno;
×
4974
    dbChkpDestroy(p);
×
4975
    return code;
×
4976
  }
4977

4978
  p->path = path;
2✔
4979
  p->len = strlen(path) + 128;
2✔
4980
  p->buf = taosMemoryCalloc(1, p->len);
2!
4981
  if (p->buf == NULL) {
2!
4982
    code = terrno;
×
4983
    goto _EXIT;
×
4984
  }
4985

4986
  p->idx = 0;
2✔
4987
  p->pSstTbl[0] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
2✔
4988
  if (p->pSstTbl[0] == NULL) {
2!
4989
    code = terrno;
×
4990
    goto _EXIT;
×
4991
  }
4992

4993
  p->pSstTbl[1] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
2✔
4994
  if (p->pSstTbl[1] == NULL) {
2!
4995
    code = terrno;
×
4996
    goto _EXIT;
×
4997
  }
4998

4999
  p->pAdd = taosArrayInit(64, sizeof(void*));
2✔
5000
  if (p->pAdd == NULL) {
2!
5001
    code = terrno;
×
5002
    goto _EXIT;
×
5003
  }
5004

5005
  p->pDel = taosArrayInit(64, sizeof(void*));
2✔
5006
  if (p->pDel == NULL) {
2!
5007
    code = terrno;
×
5008
    goto _EXIT;
×
5009
  }
5010

5011
  p->update = 0;
2✔
5012
  TAOS_UNUSED(taosThreadRwlockInit(&p->rwLock, NULL));
2✔
5013

5014
  SArray* list = NULL;
2✔
5015
  code = dbChkpGetDelta(p, initChkpId, list);
2✔
5016
  if (code != 0) {
2!
5017
    goto _EXIT;
×
5018
  }
5019
  *ppChkp = p;
2✔
5020
  return code;
2✔
5021
_EXIT:
×
5022
  dbChkpDestroy(p);
×
5023
  return code;
×
5024
}
5025

5026
void dbChkpDestroy(SDbChkp* pChkp) {
1✔
5027
  if (pChkp == NULL) return;
1!
5028

5029
  taosMemoryFree(pChkp->buf);
1!
5030
  taosMemoryFree(pChkp->path);
1!
5031

5032
  taosArrayDestroyP(pChkp->pSST, NULL);
1✔
5033
  taosArrayDestroyP(pChkp->pAdd, NULL);
1✔
5034
  taosArrayDestroyP(pChkp->pDel, NULL);
1✔
5035

5036
  taosHashCleanup(pChkp->pSstTbl[0]);
1✔
5037
  taosHashCleanup(pChkp->pSstTbl[1]);
1✔
5038

5039
  taosMemoryFree(pChkp->pCurrent);
1!
5040
  taosMemoryFree(pChkp->pManifest);
1!
5041
  taosMemoryFree(pChkp);
1!
5042
}
5043
#ifdef BUILD_NO_CALL
5044
int32_t dbChkpInit(SDbChkp* p) {
5045
  if (p == NULL) return 0;
5046
  return 0;
5047
}
5048
#endif
5049
int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) {
3✔
5050
  static char* chkpMeta = "META";
5051
  int32_t      code = 0;
3✔
5052

5053
  TAOS_UNUSED(taosThreadRwlockRdlock(&p->rwLock));
3✔
5054

5055
  int32_t cap = p->len + 128;
3✔
5056

5057
  char* buffer = taosMemoryCalloc(4, cap);
3!
5058
  if (buffer == NULL) {
3!
5059
    code = terrno;
×
5060
    goto _ERROR;
×
5061
  }
5062

5063
  char* srcBuf = buffer;
3✔
5064
  char* dstBuf = &srcBuf[cap];
3✔
5065
  char* srcDir = &dstBuf[cap];
3✔
5066
  char* dstDir = &srcDir[cap];
3✔
5067

5068
  int nBytes = snprintf(srcDir, cap, "%s%s%s%s%s%" PRId64, p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP,
3✔
5069
                        "checkpoint", p->curChkpId);
5070
  if (nBytes <= 0 || nBytes >= cap) {
3!
5071
    code = TSDB_CODE_OUT_OF_RANGE;
×
5072
    goto _ERROR;
×
5073
  }
5074

5075
  nBytes = snprintf(dstDir, cap, "%s", dname);
3✔
5076
  if (nBytes <= 0 || nBytes >= cap) {
3!
5077
    code = TSDB_CODE_OUT_OF_RANGE;
×
5078
    goto _ERROR;
×
5079
  }
5080

5081
  if (!taosDirExist(srcDir)) {
3!
5082
    stError("failed to dump srcDir %s, reason: not exist such dir", srcDir);
×
5083
    code = TSDB_CODE_INVALID_PARA;
×
5084
    goto _ERROR;
×
5085
  }
5086
  int64_t chkpId = 0, processId = -1;
3✔
5087
  code = chkpLoadExtraInfo(srcDir, &chkpId, &processId);
3✔
5088
  if (code < 0) {
3!
5089
    stError("failed to load extra info from %s, reason:%s", srcDir, code != 0 ? "unkown" : tstrerror(code));
×
5090

5091
    goto _ERROR;
×
5092
  }
5093

5094
  // add file to $name dir
5095
  for (int i = 0; i < taosArrayGetSize(p->pAdd); i++) {
9✔
5096
    memset(srcBuf, 0, cap);
6✔
5097
    memset(dstBuf, 0, cap);
6✔
5098

5099
    char* filename = taosArrayGetP(p->pAdd, i);
6✔
5100
    nBytes = snprintf(srcBuf, cap, "%s%s%s", srcDir, TD_DIRSEP, filename);
6✔
5101
    if (nBytes <= 0 || nBytes >= cap) {
6!
5102
      code = TSDB_CODE_OUT_OF_RANGE;
×
5103
      goto _ERROR;
×
5104
    }
5105

5106
    nBytes = snprintf(dstBuf, cap, "%s%s%s", dstDir, TD_DIRSEP, filename);
6✔
5107
    if (nBytes <= 0 || nBytes >= cap) {
6!
5108
      code = TSDB_CODE_OUT_OF_RANGE;
×
5109
      goto _ERROR;
×
5110
    }
5111

5112
    if (taosCopyFile(srcBuf, dstBuf) < 0) {
6!
5113
      code = TAOS_SYSTEM_ERROR(ERRNO);
×
5114
      stError("failed to copy file from %s to %s, reason:%s", srcBuf, dstBuf, tstrerror(code));
×
5115
      goto _ERROR;
×
5116
    }
5117
  }
5118
  // del file in $name
5119
  for (int i = 0; i < taosArrayGetSize(p->pDel); i++) {
3!
5120
    char* filename = taosArrayGetP(p->pDel, i);
×
5121
    char* p = taosStrdup(filename);
×
5122
    if (p == NULL) {
×
5123
      code = terrno;
×
5124
      goto _ERROR;
×
5125
    }
5126
    if (taosArrayPush(list, &p) == NULL) {
×
5127
      taosMemoryFree(p);
×
5128
      code = terrno;
×
5129
      goto _ERROR;
×
5130
    }
5131
  }
5132

5133
  // copy current file to dst dir
5134
  memset(srcBuf, 0, cap);
3✔
5135
  memset(dstBuf, 0, cap);
3✔
5136

5137
  nBytes = snprintf(srcBuf, cap, "%s%s%s", srcDir, TD_DIRSEP, p->pCurrent);
3✔
5138
  if (nBytes <= 0 || nBytes >= cap) {
3!
5139
    code = TSDB_CODE_OUT_OF_RANGE;
×
5140
    goto _ERROR;
×
5141
  }
5142

5143
  nBytes = snprintf(dstBuf, cap, "%s%s%s_%" PRId64, dstDir, TD_DIRSEP, p->pCurrent, p->curChkpId);
3✔
5144
  if (nBytes <= 0 || nBytes >= cap) {
3!
5145
    code = TSDB_CODE_OUT_OF_RANGE;
×
5146
    goto _ERROR;
×
5147
  }
5148

5149
  if (taosCopyFile(srcBuf, dstBuf) < 0) {
3!
5150
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
5151
    stError("failed to copy file from %s to %s, reason:%s", srcBuf, dstBuf, tstrerror(code));
×
5152
    goto _ERROR;
×
5153
  }
5154

5155
  // copy manifest file to dst dir
5156
  memset(srcBuf, 0, cap);
3✔
5157
  memset(dstBuf, 0, cap);
3✔
5158

5159
  nBytes = snprintf(srcBuf, cap, "%s%s%s", srcDir, TD_DIRSEP, p->pManifest);
3✔
5160
  if (nBytes <= 0 || nBytes >= cap) {
3!
5161
    code = TSDB_CODE_OUT_OF_RANGE;
×
5162
    goto _ERROR;
×
5163
  }
5164

5165
  nBytes = snprintf(dstBuf, cap, "%s%s%s_%" PRId64, dstDir, TD_DIRSEP, p->pManifest, p->curChkpId);
3✔
5166
  if (nBytes <= 0 || nBytes >= cap) {
3!
5167
    code = TSDB_CODE_OUT_OF_RANGE;
×
5168
    goto _ERROR;
×
5169
  }
5170

5171
  if (taosCopyFile(srcBuf, dstBuf) < 0) {
3!
5172
    code = terrno;
×
5173
    stError("failed to copy file from %s to %s, reason:%s", srcBuf, dstBuf, tstrerror(code));
×
5174
    goto _ERROR;
×
5175
  }
5176
  memset(dstBuf, 0, cap);
3✔
5177
  nBytes = snprintf(dstBuf, cap, "%s%s%s", dstDir, TD_DIRSEP, chkpMeta);
3✔
5178
  if (nBytes <= 0 || nBytes >= cap) {
3!
5179
    code = TSDB_CODE_OUT_OF_RANGE;
×
5180
    goto _ERROR;
×
5181
  }
5182

5183
  TdFilePtr pFile = taosOpenFile(dstBuf, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
3✔
5184
  if (pFile == NULL) {
3!
5185
    code = terrno;
×
5186
    stError("chkp failed to create meta file: %s, reason:%s", dstDir, tstrerror(code));
×
5187
    goto _ERROR;
×
5188
  }
5189

5190
  char content[256] = {0};
3✔
5191
  nBytes = tsnprintf(content, sizeof(content), META_ON_S3_FORMATE, p->pCurrent, p->curChkpId, p->pManifest,
3✔
5192
                     p->curChkpId, "processVer", processId);
5193
  if (nBytes <= 0 || nBytes >= sizeof(content)) {
3!
5194
    code = TSDB_CODE_OUT_OF_RANGE;
×
5195
    stError("chkp failed to format meta file: %s, reason: invalid msg", dstDir);
×
5196
    TAOS_UNUSED(taosCloseFile(&pFile));
×
5197
    goto _ERROR;
×
5198
  }
5199

5200
  nBytes = taosWriteFile(pFile, content, strlen(content));
3✔
5201
  if (nBytes != strlen(content)) {
3!
5202
    code = terrno;
×
5203
    stError("chkp failed to write meta file: %s,reason:%s", dstDir, tstrerror(code));
×
5204
    TAOS_UNUSED(taosCloseFile(&pFile));
×
5205
    goto _ERROR;
×
5206
  }
5207
  TAOS_UNUSED(taosCloseFile(&pFile));
3✔
5208

5209
  // clear delta data buf
5210
  taosArrayClearP(p->pAdd, NULL);
3✔
5211
  taosArrayClearP(p->pDel, NULL);
3✔
5212
  code = 0;
3✔
5213

5214
_ERROR:
3✔
5215
  taosMemoryFree(buffer);
3!
5216
  TAOS_UNUSED(taosThreadRwlockUnlock(&p->rwLock));
3✔
5217
  return code;
3✔
5218
}
5219

5220
int32_t bkdMgtCreate(char* path, SBkdMgt** mgt) {
289✔
5221
  int32_t  code = 0;
289✔
5222
  SBkdMgt* p = taosMemoryCalloc(1, sizeof(SBkdMgt));
289!
5223
  if (p == NULL) {
289!
5224
    return terrno;
×
5225
  }
5226

5227
  p->pDbChkpTbl = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
289✔
5228
  if (p->pDbChkpTbl == NULL) {
289!
5229
    code = terrno;
×
5230
    bkdMgtDestroy(p);
×
5231
    return code;
×
5232
  }
5233

5234
  p->path = taosStrdup(path);
289!
5235
  if (p->path == NULL) {
289!
5236
    code = terrno;
×
5237
    bkdMgtDestroy(p);
×
5238
    return code;
×
5239
  }
5240

5241
  if (taosThreadRwlockInit(&p->rwLock, NULL) != 0) {
289!
5242
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
5243
    bkdMgtDestroy(p);
×
5244
    return code;
×
5245
  }
5246
  *mgt = p;
289✔
5247

5248
  return code;
289✔
5249
}
5250

5251
void bkdMgtDestroy(SBkdMgt* bm) {
280✔
5252
  if (bm == NULL) return;
280!
5253
  void* pIter = taosHashIterate(bm->pDbChkpTbl, NULL);
280✔
5254
  while (pIter) {
281✔
5255
    SDbChkp* pChkp = *(SDbChkp**)(pIter);
1✔
5256
    dbChkpDestroy(pChkp);
1✔
5257

5258
    pIter = taosHashIterate(bm->pDbChkpTbl, pIter);
1✔
5259
  }
5260

5261
  TAOS_UNUSED(taosThreadRwlockDestroy(&bm->rwLock));
280✔
5262
  taosMemoryFree(bm->path);
280!
5263
  taosHashCleanup(bm->pDbChkpTbl);
280✔
5264

5265
  taosMemoryFree(bm);
280!
5266
}
5267
int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, char* dname) {
3✔
5268
  int32_t code = 0;
3✔
5269
  TAOS_UNUSED(taosThreadRwlockWrlock(&bm->rwLock));
3✔
5270
  SDbChkp** ppChkp = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId));
3✔
5271
  SDbChkp*  pChkp = ppChkp != NULL ? *ppChkp : NULL;
3✔
5272

5273
  if (pChkp == NULL) {
3✔
5274
    int32_t cap = strlen(bm->path) + 64;
2✔
5275
    char*   path = taosMemoryCalloc(1, cap);
2!
5276
    if (path == NULL) {
2!
5277
      TAOS_UNUSED(taosThreadRwlockUnlock(&bm->rwLock));
×
5278
      return terrno;
×
5279
    }
5280

5281
    int32_t nBytes = snprintf(path, cap, "%s%s%s", bm->path, TD_DIRSEP, taskId);
2✔
5282
    if (nBytes <= 0 || nBytes >= cap) {
2!
5283
      taosMemoryFree(path);
×
5284
      TAOS_UNUSED(taosThreadRwlockUnlock(&bm->rwLock));
×
5285
      code = TSDB_CODE_OUT_OF_RANGE;
×
5286
      return code;
×
5287
    }
5288

5289
    SDbChkp* p = NULL;
2✔
5290
    code = dbChkpCreate(path, chkpId, &p);
2✔
5291
    if (code != 0) {
2!
5292
      taosMemoryFree(path);
×
5293
      TAOS_UNUSED(taosThreadRwlockUnlock(&bm->rwLock));
×
5294
      return code;
×
5295
    }
5296

5297
    if (taosHashPut(bm->pDbChkpTbl, taskId, strlen(taskId), &p, sizeof(void*)) != 0) {
2!
5298
      dbChkpDestroy(p);
×
5299
      TAOS_UNUSED(taosThreadRwlockUnlock(&bm->rwLock));
×
5300
      code = terrno;
×
5301
      return code;
×
5302
    }
5303

5304
    pChkp = p;
2✔
5305
    code = dbChkpDumpTo(pChkp, dname, list);
2✔
5306
    TAOS_UNUSED(taosThreadRwlockUnlock(&bm->rwLock));
2✔
5307
    return code;
2✔
5308
  } else {
5309
    code = dbChkpGetDelta(pChkp, chkpId, NULL);
1✔
5310
    if (code == 0) {
1!
5311
      code = dbChkpDumpTo(pChkp, dname, list);
1✔
5312
    }
5313
  }
5314

5315
  TAOS_UNUSED(taosThreadRwlockUnlock(&bm->rwLock));
1✔
5316
  return code;
1✔
5317
}
5318

5319
#ifdef BUILD_NO_CALL
5320
int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path) {
5321
  int32_t code = -1;
5322

5323
  taosThreadRwlockWrlock(&bm->rwLock);
5324
  SDbChkp** pp = taosHashGet(bm->pDbChkpTbl, task, strlen(task));
5325
  if (pp == NULL) {
5326
    SDbChkp* p = NULL;
5327
    code = dbChkpCreate(path, 0, &p);
5328
    if (code != 0) {
5329
      taosHashPut(bm->pDbChkpTbl, task, strlen(task), &p, sizeof(void*));
5330
      code = 0;
5331
    }
5332
  } else {
5333
    stError("task chkp already exists");
5334
  }
5335

5336
  taosThreadRwlockUnlock(&bm->rwLock);
5337

5338
  return code;
5339
}
5340

5341
int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname) {
5342
  int32_t code = 0;
5343
  taosThreadRwlockRdlock(&bm->rwLock);
5344

5345
  SDbChkp* p = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId));
5346
  code = dbChkpDumpTo(p, dname, NULL);
5347

5348
  taosThreadRwlockUnlock(&bm->rwLock);
5349
  return code;
5350
}
5351
#endif
5352

5353
SStreamStateCur* streamStateSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) {
×
5354
  stDebug("streamStateSeekKeyPrev_rocksdb");
×
5355
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
×
5356
  if (pState->pTdbState->recalc) {
×
5357
    wrapper = pState->pTdbState->pOwner->pRecalBackend;
×
5358
  }
5359

5360
  SStreamStateCur* pCur = createStreamStateCursor();
×
5361
  if (pCur == NULL) {
×
5362
    return NULL;
×
5363
  }
5364

5365
  pCur->db = wrapper->db;
×
5366
  pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
×
5367
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
×
5368
  pCur->number = pState->number;
×
5369

5370
  SStateKey sKey = {.key = *key, .opNum = pState->number};
×
5371
  char      buf[128] = {0};
×
5372
  int       len = stateKeyEncode((void*)&sKey, buf);
×
5373
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
×
5374
    streamStateFreeCur(pCur);
×
5375
    return NULL;
×
5376
  }
5377
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
×
5378
    rocksdb_iter_prev(pCur->iter);
×
5379
  }
5380

5381
  if (rocksdb_iter_valid(pCur->iter)) {
×
5382
    SWinKey curKey;
5383
    size_t  kLen = 0;
×
5384
    char*   keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
×
5385
    TAOS_UNUSED(winKeyDecode((void*)&curKey, keyStr));
×
5386
    if (winKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) > 0) {
×
5387
      return pCur;
×
5388
    }
5389
    rocksdb_iter_prev(pCur->iter);
×
5390
    return pCur;
×
5391
  }
5392

5393
  streamStateFreeCur(pCur);
×
5394
  return NULL;
×
5395
}
5396

5397
int32_t streamStateGetGroupKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SWinKey* pKey,
×
5398
                                           const void** pVal, int32_t* pVLen) {
5399
  if (!pCur) {
×
5400
    return -1;
×
5401
  }
5402
  uint64_t groupId = pKey->groupId;
×
5403

5404
  int32_t code = streamStateGetKVByCur_rocksdb(pState, pCur, pKey, pVal, pVLen);
×
5405
  if (code == 0) {
×
5406
    if (pKey->groupId == groupId) {
×
5407
      return 0;
×
5408
    }
5409
    if (pVal != NULL) {
×
5410
      taosMemoryFree((void*)*pVal);
×
5411
      *pVal = NULL;
×
5412
    }
5413
  }
5414
  return -1;
×
5415
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc