• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3531

19 Nov 2024 10:42AM UTC coverage: 60.213% (-0.006%) from 60.219%
#3531

push

travis-ci

web-flow
Merge pull request #28777 from taosdata/fix/3.0/TD-32366

fix:TD-32366/stmt add geometry datatype check

118529 of 252344 branches covered (46.97%)

Branch coverage included in aggregate %.

7 of 48 new or added lines in 3 files covered. (14.58%)

2282 existing lines in 115 files now uncovered.

199096 of 275161 relevant lines covered (72.36%)

6067577.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

40.05
/source/libs/stream/src/streamBackendRocksdb.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamBackendRocksdb.h"
17
#include "lz4.h"
18
#include "streamInt.h"
19
#include "tcommon.h"
20
#include "tref.h"
21

22
typedef struct SCompactFilteFactory {
23
  void* status;
24
} SCompactFilteFactory;
25

26
typedef struct {
27
  rocksdb_t*                       db;
28
  rocksdb_column_family_handle_t** pHandle;
29
  rocksdb_writeoptions_t*          wOpt;
30
  rocksdb_readoptions_t*           rOpt;
31
  rocksdb_options_t**              cfOpt;
32
  rocksdb_options_t*               dbOpt;
33
  RocksdbCfParam*                  param;
34
  void*                            pBackend;
35
  SListNode*                       pCompareNode;
36
  rocksdb_comparator_t**           pCompares;
37
} RocksdbCfInst;
38

39
int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf);
40

41
void            destroyRocksdbCfInst(RocksdbCfInst* inst);
42
int32_t         getCfIdx(const char* cfName);
43
STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath);
44

45
static int32_t backendCopyFiles(const char* src, const char* dst);
46

47
void        destroyCompactFilteFactory(void* arg);
48
void        destroyCompactFilte(void* arg);
49
const char* compactFilteFactoryName(void* arg);
50
const char* compactFilteFactoryNameSess(void* arg);
51
const char* compactFilteFactoryNameState(void* arg);
52
const char* compactFilteFactoryNameFunc(void* arg);
53
const char* compactFilteFactoryNameFill(void* arg);
54

55
const char* compactFilteName(void* arg);
56
const char* compactFilteNameSess(void* arg);
57
const char* compactFilteNameState(void* arg);
58
const char* compactFilteNameFill(void* arg);
59
const char* compactFilteNameFunc(void* arg);
60

61
unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
62
                           char** newval, size_t* newvlen, unsigned char* value_changed);
63
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx);
64
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterSess(void* arg, rocksdb_compactionfiltercontext_t* ctx);
65
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterState(void* arg, rocksdb_compactionfiltercontext_t* ctx);
66
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFunc(void* arg, rocksdb_compactionfiltercontext_t* ctx);
67
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFill(void* arg, rocksdb_compactionfiltercontext_t* ctx);
68

69
typedef int (*__db_key_encode_fn_t)(void* key, char* buf);
70
typedef int (*__db_key_decode_fn_t)(void* key, char* buf);
71
typedef int (*__db_key_tostr_fn_t)(void* key, char* buf);
72
typedef const char* (*__db_key_cmpname_fn_t)(void* statue);
73
typedef int (*__db_key_cmp_fn_t)(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
74
typedef void (*__db_key_cmp_destroy_fn_t)(void* state);
75
typedef int32_t (*__db_value_encode_fn_t)(void* value, int32_t vlen, int64_t ttl, char** dest);
76
typedef int32_t (*__db_value_decode_fn_t)(void* value, int32_t vlen, int64_t* ttl, char** dest);
77

78
typedef rocksdb_compactionfilter_t* (*__db_factory_create_fn_t)(void* arg, rocksdb_compactionfiltercontext_t* ctx);
79
typedef const char* (*__db_factory_name_fn_t)(void* arg);
80
typedef void (*__db_factory_destroy_fn_t)(void* arg);
81
typedef struct {
82
  const char*               key;
83
  int32_t                   len;
84
  int                       idx;
85
  __db_key_cmp_fn_t         cmpKey;
86
  __db_key_encode_fn_t      enFunc;
87
  __db_key_decode_fn_t      deFunc;
88
  __db_key_tostr_fn_t       toStrFunc;
89
  __db_key_cmpname_fn_t     cmpName;
90
  __db_key_cmp_destroy_fn_t destroyCmp;
91
  __db_value_encode_fn_t    enValueFunc;
92
  __db_value_decode_fn_t    deValueFunc;
93

94
  __db_factory_create_fn_t  createFilter;
95
  __db_factory_destroy_fn_t destroyFilter;
96
  __db_factory_name_fn_t    funcName;
97

98
} SCfInit;
99

100
const char* compareDefaultName(void* name);
101
const char* compareStateName(void* name);
102
const char* compareWinKeyName(void* name);
103
const char* compareSessionKeyName(void* name);
104
const char* compareFuncKeyName(void* name);
105
const char* compareParKeyName(void* name);
106
const char* comparePartagKeyName(void* name);
107

108
int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
109
int defaultKeyEncode(void* k, char* buf);
110
int defaultKeyDecode(void* k, char* buf);
111
int defaultKeyToString(void* k, char* buf);
112

113
int stateKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
114
int stateKeyEncode(void* k, char* buf);
115
int stateKeyDecode(void* k, char* buf);
116
int stateKeyToString(void* k, char* buf);
117

118
int stateSessionKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
119
int stateSessionKeyEncode(void* ses, char* buf);
120
int stateSessionKeyDecode(void* ses, char* buf);
121
int stateSessionKeyToString(void* k, char* buf);
122

123
int winKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
124
int winKeyEncode(void* k, char* buf);
125
int winKeyDecode(void* k, char* buf);
126
int winKeyToString(void* k, char* buf);
127

128
int tupleKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
129
int tupleKeyEncode(void* k, char* buf);
130
int tupleKeyDecode(void* k, char* buf);
131
int tupleKeyToString(void* k, char* buf);
132

133
int parKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
134
int parKeyEncode(void* k, char* buf);
135
int parKeyDecode(void* k, char* buf);
136
int parKeyToString(void* k, char* buf);
137

138
int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest);
139
int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest);
140
int32_t valueToString(void* k, char* buf);
141
int32_t valueIsStale(void* k, int64_t ts);
142

143
void        destroyCompare(void* arg);
144
static void cleanDir(const char* pPath, const char* id);
145

146
static bool                streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len);
147
static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName,
148
                                                 rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt);
149

150
void taskDbRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp);
151
void taskDbUnRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp);
152

153
int32_t chkpAddExtraInfo(char* pChkpIdDir, int64_t chkpId, int64_t processId);
154
int32_t chkpLoadExtraInfo(char* pChkpIdDir, int64_t* chkpId, int64_t* processId);
155

156
#define GEN_COLUMN_FAMILY_NAME(name, idstr, SUFFIX) sprintf(name, "%s_%s", idstr, (SUFFIX));
157
int32_t  copyFiles(const char* src, const char* dst);
158
uint32_t nextPow2(uint32_t x);
159

160
SCfInit ginitDict[] = {
161
    {"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName,
162
     destroyCompare, valueEncode, valueDecode, compactFilteFactoryCreateFilter, destroyCompactFilteFactory,
163
     compactFilteFactoryName},
164

165
    {"state", 5, 1, stateKeyDBComp, stateKeyEncode, stateKeyDecode, stateKeyToString, compareStateName, destroyCompare,
166
     valueEncode, valueDecode, compactFilteFactoryCreateFilterState, destroyCompactFilteFactory,
167
     compactFilteFactoryNameState},
168

169
    {"fill", 4, 2, winKeyDBComp, winKeyEncode, winKeyDecode, winKeyToString, compareWinKeyName, destroyCompare,
170
     valueEncode, valueDecode, compactFilteFactoryCreateFilterFill, destroyCompactFilteFactory,
171
     compactFilteFactoryNameFill},
172

173
    {"sess", 4, 3, stateSessionKeyDBComp, stateSessionKeyEncode, stateSessionKeyDecode, stateSessionKeyToString,
174
     compareSessionKeyName, destroyCompare, valueEncode, valueDecode, compactFilteFactoryCreateFilterSess,
175
     destroyCompactFilteFactory, compactFilteFactoryNameSess},
176

177
    {"func", 4, 4, tupleKeyDBComp, tupleKeyEncode, tupleKeyDecode, tupleKeyToString, compareFuncKeyName, destroyCompare,
178
     valueEncode, valueDecode, compactFilteFactoryCreateFilterFunc, destroyCompactFilteFactory,
179
     compactFilteFactoryNameFunc},
180

181
    {"parname", 7, 5, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, compareParKeyName, destroyCompare,
182
     valueEncode, valueDecode, compactFilteFactoryCreateFilter, destroyCompactFilteFactory, compactFilteFactoryName},
183

184
    {"partag", 6, 6, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, comparePartagKeyName, destroyCompare,
185
     valueEncode, valueDecode, compactFilteFactoryCreateFilter, destroyCompactFilteFactory, compactFilteFactoryName},
186
};
187

188
int32_t getCfIdx(const char* cfName) {
5,518✔
189
  int    idx = -1;
5,518✔
190
  size_t len = strlen(cfName);
5,518✔
191
  for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
5,702!
192
    if (len == ginitDict[i].len && strncmp(cfName, ginitDict[i].key, strlen(cfName)) == 0) {
5,702✔
193
      idx = i;
5,518✔
194
      break;
5,518✔
195
    }
196
  }
197
  return idx;
5,518✔
198
}
199

200
bool isValidCheckpoint(const char* dir) {
17✔
201
  // not implement yet
202
  return true;
17✔
203
}
204

205
/*
206
 *copy pChkpIdDir's file to state dir
207
 */
208
int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
×
209
  // impl later
210
  int32_t code = 0;
×
211
  int32_t cap = strlen(path) + 64;
×
212
  int32_t nBytes = 0;
×
213

214
  char* state = taosMemoryCalloc(1, cap);
×
215
  if (state == NULL) {
×
216
    return terrno;
×
217
  }
218

219
  nBytes = snprintf(state, cap, "%s%s%s", path, TD_DIRSEP, "state");
×
220
  if (nBytes <= 0 || nBytes >= cap) {
×
221
    taosMemoryFree(state);
×
222
    return TSDB_CODE_OUT_OF_RANGE;
×
223
  }
224

225
  if (chkpId != 0) {
×
226
    char* chkp = taosMemoryCalloc(1, cap);
×
227
    if (chkp == NULL) {
×
228
      taosMemoryFree(state);
×
229
      return terrno;
×
230
    }
231

232
    nBytes = snprintf(chkp, cap, "%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId);
×
233
    if (nBytes <= 0 || nBytes >= cap) {
×
234
      taosMemoryFree(state);
×
235
      taosMemoryFree(chkp);
×
236
      return TSDB_CODE_OUT_OF_RANGE;
×
237
    }
238

239
    if (taosIsDir(chkp) && isValidCheckpoint(chkp)) {
×
240
      cleanDir(state, "");
×
241
      code = backendCopyFiles(chkp, state);
×
242
      if (code != 0) {
×
243
        stError("failed to restart stream backend from %s, reason: %s", chkp, tstrerror(TAOS_SYSTEM_ERROR(code)));
×
244
      } else {
245
        stInfo("start to restart stream backend at checkpoint path: %s", chkp);
×
246
      }
247

248
    } else {
249
      stError("failed to start stream backend at %s, reason: %s, restart from default state dir:%s", chkp,
×
250
              tstrerror(terrno), state);
251
      code = taosMkDir(state);
×
252
      if (code != 0) {
×
253
        code = TAOS_SYSTEM_ERROR(errno);
×
254
      }
255
    }
256

257
    taosMemoryFree(chkp);
×
258
  }
259

260
  *dst = state;
×
261
  return code;
×
262
}
263

264
typedef struct {
265
  char    pCurrName[24];
266
  int64_t currChkptId;
267

268
  char    pManifestName[24];
269
  int64_t manifestChkptId;
270

271
  char    processName[24];
272
  int64_t processId;
273
} SSChkpMetaOnS3;
274

275
int32_t remoteChkp_readMetaData(char* path, SSChkpMetaOnS3** pMeta) {
×
276
  int32_t   code = 0;
×
277
  int32_t   cap = strlen(path) + 32;
×
278
  TdFilePtr pFile = NULL;
×
279

280
  char* metaPath = taosMemoryCalloc(1, cap);
×
281
  if (metaPath == NULL) {
×
282
    return terrno;
×
283
  }
284

285
  int32_t n = snprintf(metaPath, cap, "%s%s%s", path, TD_DIRSEP, "META");
×
286
  if (n <= 0 || n >= cap) {
×
287
    taosMemoryFree(metaPath);
×
288
    return TSDB_CODE_OUT_OF_MEMORY;
×
289
  }
290

291
  pFile = taosOpenFile(path, TD_FILE_READ);
×
292
  if (pFile == NULL) {
×
293
    code = terrno;
×
294
    goto _EXIT;
×
295
  }
296

297
  char buf[256] = {0};
×
298
  if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) {
×
299
    code = terrno;
×
300
    goto _EXIT;
×
301
  }
302

303
  SSChkpMetaOnS3* p = taosMemoryCalloc(1, sizeof(SSChkpMetaOnS3));
×
304
  if (p == NULL) {
×
305
    code = terrno;
×
306
    goto _EXIT;
×
307
  }
308
  n = sscanf(buf, META_ON_S3_FORMATE, p->pCurrName, &p->currChkptId, p->pManifestName, &p->manifestChkptId,
×
309
             p->processName, &p->processId);
×
310
  if (n != 6) {
×
311
    code = TSDB_CODE_INVALID_MSG;
×
312
    taosMemoryFree(p);
×
313
    goto _EXIT;
×
314
  }
315

316
  if (p->currChkptId != p->manifestChkptId) {
×
317
    code = TSDB_CODE_INVALID_MSG;
×
318
    taosMemoryFree(p);
×
319
    goto _EXIT;
×
320
  }
321
  *pMeta = p;
×
322
  code = 0;
×
323
_EXIT:
×
324
  taosCloseFile(&pFile);
×
325
  taosMemoryFree(metaPath);
×
326
  return code;
×
327
}
328

329
int32_t remoteChkp_validAndCvtMeta(char* path, SSChkpMetaOnS3* pMeta, int64_t chkpId) {
×
330
  int32_t code = 0;
×
331
  int32_t nBytes = 0;
×
332

333
  int32_t cap = strlen(path) + 64;
×
334
  char*   src = taosMemoryCalloc(1, cap);
×
335
  char*   dst = taosMemoryCalloc(1, cap);
×
336
  if (src == NULL || dst == NULL) {
×
337
    code = terrno;
×
338
    goto _EXIT;
×
339
  }
340

341
  if (pMeta->currChkptId != chkpId || pMeta->manifestChkptId != chkpId) {
×
342
    code = TSDB_CODE_INVALID_CFG;
×
343
    goto _EXIT;
×
344
  }
345
  // rename current_chkp/mainfest to current
346
  for (int i = 0; i < 2; i++) {
×
347
    char* key = (i == 0 ? pMeta->pCurrName : pMeta->pManifestName);
×
348
    if (strlen(key) <= 0) {
×
349
      code = TSDB_CODE_INVALID_PARA;
×
350
      goto _EXIT;
×
351
    }
352

353
    nBytes = snprintf(src, cap, "%s%s%s_%" PRId64 "", path, TD_DIRSEP, key, pMeta->currChkptId);
×
354
    if (nBytes <= 0 || nBytes >= cap) {
×
355
      code = TSDB_CODE_OUT_OF_RANGE;
×
356
      goto _EXIT;
×
357
    }
358

359
    if (taosStatFile(src, NULL, NULL, NULL) != 0) {
×
360
      code = terrno;
×
361
      goto _EXIT;
×
362
    }
363

364
    nBytes = snprintf(dst, cap, "%s%s%s", path, TD_DIRSEP, key);
×
365
    if (nBytes <= 0 || nBytes >= cap) {
×
366
      code = TSDB_CODE_OUT_OF_RANGE;
×
367
      goto _EXIT;
×
368
    }
369

370
    code = taosRenameFile(src, dst);
×
371
    if (code != 0) {
×
372
      goto _EXIT;
×
373
    }
374

375
    memset(src, 0, cap);
×
376
    memset(dst, 0, cap);
×
377
  }
378
  code = 0;
×
379

380
// rename manifest_chkp to manifest
381
_EXIT:
×
382
  taosMemoryFree(src);
×
383
  taosMemoryFree(dst);
×
384
  return code;
×
385
}
386
int32_t remoteChkpGetDelFile(char* path, SArray* toDel) {
×
387
  int32_t code = 0;
×
388
  int32_t nBytes = 0;
×
389

390
  SSChkpMetaOnS3* pMeta = NULL;
×
391
  code = remoteChkp_readMetaData(path, &pMeta);
×
392
  if (code != 0) {
×
393
    return code;
×
394
  }
395

396
  for (int i = 0; i < 2; i++) {
×
397
    char* key = (i == 0 ? pMeta->pCurrName : pMeta->pManifestName);
×
398

399
    int32_t cap = strlen(key) + 32;
×
400
    char*   p = taosMemoryCalloc(1, cap);
×
401
    if (p == NULL) {
×
402
      taosMemoryFree(pMeta);
×
403
      return terrno;
×
404
    }
405

406
    nBytes = snprintf(p, cap, "%s_%" PRId64 "", key, pMeta->currChkptId);
×
407
    if (nBytes <= 0 || nBytes >= cap) {
×
408
      taosMemoryFree(pMeta);
×
409
      taosMemoryFree(p);
×
410
      return TSDB_CODE_OUT_OF_RANGE;
×
411
    }
412
    if (taosArrayPush(toDel, &p) == NULL) {
×
413
      taosMemoryFree(pMeta);
×
414
      taosMemoryFree(p);
×
415
      return terrno;
×
416
    }
417
  }
418

419
  return 0;
×
420
}
421

422
void cleanDir(const char* pPath, const char* id) {
2,727✔
423
  if (pPath == NULL) {
2,727!
424
    stError("%s try to clean dir, but path is NULL", id);
×
425
    return;
×
426
  }
427

428
  if (taosIsDir(pPath)) {
2,727!
429
    taosRemoveDir(pPath);
2,727✔
430
    if (taosMkDir(pPath) != 0) {
2,727!
431
      stError("%s failed to create dir:%s", id, pPath);
×
432
    }
433
    stInfo("%s clear dir:%s, succ", id, pPath);
2,727✔
434
  }
435
}
436

437
int32_t createDirIfNotExist(const char* pPath) {
8,179✔
438
  if (!taosIsDir(pPath)) {
8,179✔
439
    return taosMulMkDir(pPath);
8,065✔
440
  } else {
441
    return 0;
114✔
442
  }
443
}
444

445
int32_t rebuildFromRemoteChkp_rsync(const char* key, char* checkpointPath, int64_t checkpointId, char* defaultPath) {
×
446
  int32_t code = 0;
×
447
  if (taosIsDir(checkpointPath)) {
×
448
    taosRemoveDir(checkpointPath);
×
449
    stDebug("remove local checkpoint data dir:%s succ", checkpointPath);
×
450
  }
451

452
  cleanDir(defaultPath, key);
×
453
  stDebug("clear local default dir before downloading checkpoint data:%s succ", defaultPath);
×
454

455
  code = streamTaskDownloadCheckpointData(key, checkpointPath, checkpointId);
×
456
  if (code != 0) {
×
457
    stError("failed to download checkpoint data:%s", key);
×
458
    return code;
×
459
  }
460

461
  stDebug("download remote checkpoint data for checkpointId:%" PRId64 ", %s", checkpointId, key);
×
462
  return backendCopyFiles(checkpointPath, defaultPath);
×
463
}
464

465
int32_t rebuildDataFromS3(char* chkpPath, int64_t chkpId) {
×
466
  SSChkpMetaOnS3* pMeta = NULL;
×
467

468
  int32_t code = remoteChkp_readMetaData(chkpPath, &pMeta);
×
469
  if (code != 0) {
×
470
    return code;
×
471
  }
472

473
  if (pMeta->currChkptId != chkpId || pMeta->manifestChkptId != chkpId) {
×
474
    taosMemoryFree(pMeta);
×
475
    return TSDB_CODE_INVALID_PARA;
×
476
  }
477

478
  code = remoteChkp_validAndCvtMeta(chkpPath, pMeta, chkpId);
×
479
  if (code != 0) {
×
480
    taosMemoryFree(pMeta);
×
481
    return code;
×
482
  }
483
  taosMemoryFree(pMeta);
×
484

485
  return chkpAddExtraInfo(chkpPath, chkpId, pMeta->processId);
×
486
}
487

488
int32_t rebuildFromRemoteChkp_s3(const char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
×
489
  int8_t  rename = 0;
×
490
  int32_t code = streamTaskDownloadCheckpointData(key, chkpPath, chkpId);
×
491
  if (code != 0) {
×
492
    return code;
×
493
  }
494

495
  int32_t cap = strlen(defaultPath) + 32;
×
496

497
  char* defaultTmp = taosMemoryCalloc(1, cap);
×
498
  if (defaultTmp == NULL) {
×
499
    return terrno;
×
500
  }
501

502
  int32_t nBytes = snprintf(defaultPath, cap, "%s%s", defaultPath, "_tmp");
×
503
  if (nBytes <= 0 || nBytes >= cap) {
×
504
    taosMemoryFree(defaultPath);
×
505
    return TSDB_CODE_OUT_OF_RANGE;
×
506
  }
507

508
  if (taosIsDir(defaultTmp)) taosRemoveDir(defaultTmp);
×
509
  if (taosIsDir(defaultPath)) {
×
510
    code = taosRenameFile(defaultPath, defaultTmp);
×
511
    if (code != 0) {
×
512
      goto _EXIT;
×
513
    } else {
514
      rename = 1;
×
515
    }
516
  } else {
517
    code = taosMkDir(defaultPath);
×
518
    if (code != 0) {
×
519
      code = TAOS_SYSTEM_ERROR(errno);
×
520
      goto _EXIT;
×
521
    }
522
  }
523

524
  code = rebuildDataFromS3(chkpPath, chkpId);
×
525
  if (code != 0) {
×
526
    goto _EXIT;
×
527
  }
528

529
  code = backendCopyFiles(chkpPath, defaultPath);
×
530
  if (code != 0) {
×
531
    goto _EXIT;
×
532
  }
533
  code = 0;
×
534

535
_EXIT:
×
536
  if (code != 0) {
×
537
    if (rename) {
×
538
      TAOS_UNUSED(taosRenameFile(defaultTmp, defaultPath));
×
539
    }
540
  }
541

542
  if (taosIsDir(defaultPath)) {
×
543
    taosRemoveDir(defaultPath);
×
544
  }
545

546
  taosMemoryFree(defaultTmp);
×
547
  return code;
×
548
}
549

550
int32_t rebuildFromRemoteCheckpoint(const char* key, char* checkpointPath, int64_t checkpointId, char* defaultPath) {
10✔
551
  ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType();
10✔
552
  if (type == DATA_UPLOAD_S3) {
10!
553
    return rebuildFromRemoteChkp_s3(key, checkpointPath, checkpointId, defaultPath);
×
554
  } else if (type == DATA_UPLOAD_RSYNC) {
10!
555
    return rebuildFromRemoteChkp_rsync(key, checkpointPath, checkpointId, defaultPath);
×
556
  } else {
557
    stError("%s no remote backup checkpoint data for:%" PRId64, key, checkpointId);
10!
558
  }
559

560
  return -1;
10✔
561
}
562

563
int32_t copyFiles_create(char* src, char* dst, int8_t type) {
34✔
564
  // create and copy file
565
  int32_t err = taosCopyFile(src, dst);
34✔
566

567
  if (errno == EXDEV || errno == ENOTSUP) {
34!
568
    errno = 0;
×
569
    return 0;
×
570
  }
571
  return 0;
34✔
572
}
573
int32_t copyFiles_hardlink(char* src, char* dst, int8_t type) {
88✔
574
  // same fs and hard link
575
  return taosLinkFile(src, dst);
88✔
576
}
577

578
int32_t backendFileCopyFilesImpl(const char* src, const char* dst) {
17✔
579
  const char* current = "CURRENT";
17✔
580
  size_t      currLen = strlen(current);
17✔
581

582
  const char* info = "info";
17✔
583
  size_t      infoLen = strlen(info);
17✔
584

585
  int32_t code = 0;
17✔
586
  int32_t sLen = strlen(src);
17✔
587
  int32_t dLen = strlen(dst);
17✔
588
  int32_t cap = TMAX(sLen, dLen) + 64;
17✔
589
  int32_t nBytes = 0;
17✔
590

591
  char* srcName = taosMemoryCalloc(1, cap);
17✔
592
  char* dstName = taosMemoryCalloc(1, cap);
17✔
593
  if (srcName == NULL || dstName == NULL) {
17!
594
    taosMemoryFree(srcName);
×
595
    taosMemoryFree(dstName);
×
596
    return terrno;
×
597
  }
598

599
  // copy file to dst
600
  TdDirPtr pDir = taosOpenDir(src);
17✔
601
  if (pDir == NULL) {
17!
602
    code = terrno;
×
603
    goto _ERROR;
×
604
  }
605

606
  errno = 0;
17✔
607
  TdDirEntryPtr de = NULL;
17✔
608
  while ((de = taosReadDir(pDir)) != NULL) {
173✔
609
    char* name = taosGetDirEntryName(de);
156✔
610
    if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) {
156✔
611
      continue;
34✔
612
    }
613

614
    nBytes = snprintf(srcName, cap, "%s%s%s", src, TD_DIRSEP, name);
122✔
615
    if (nBytes <= 0 || nBytes >= cap) {
122!
616
      code = TSDB_CODE_OUT_OF_RANGE;
×
617
      goto _ERROR;
×
618
    }
619

620
    nBytes = snprintf(dstName, cap, "%s%s%s", dst, TD_DIRSEP, name);
122✔
621
    if (nBytes <= 0 || nBytes >= cap) {
122!
622
      code = TSDB_CODE_OUT_OF_RANGE;
×
623
      goto _ERROR;
×
624
    }
625

626
    if (strncmp(name, current, strlen(name) <= currLen ? strlen(name) : currLen) == 0) {
122✔
627
      code = copyFiles_create(srcName, dstName, 0);
17✔
628
      if (code != 0) {
17!
629
        code = TAOS_SYSTEM_ERROR(errno);
×
630
        stError("failed to copy file, detail: %s to %s reason:%s", srcName, dstName, tstrerror(code));
×
631
        goto _ERROR;
×
632
      }
633
    } else if (strncmp(name, info, strlen(name) <= infoLen ? strlen(name) : infoLen) == 0) {
105✔
634
      code = copyFiles_create(srcName, dstName, 0);
17✔
635
      if (code != 0) {
17!
636
        code = TAOS_SYSTEM_ERROR(errno);
×
637
        stError("failed to copy file, detail: %s to %s reason:%s", srcName, dstName, tstrerror(code));
×
638
        goto _ERROR;
×
639
      }
640

641
    } else {
642
      code = copyFiles_hardlink(srcName, dstName, 0);
88✔
643
      if (code != 0) {
88!
644
        code = TAOS_SYSTEM_ERROR(errno);
×
645
        stError("failed to hard link file, detail:%s to %s, reason:%s", srcName, dstName, tstrerror(code));
×
646
        goto _ERROR;
×
647
      } else {
648
        stDebug("succ hard link file:%s to %s", srcName, dstName);
88✔
649
      }
650
    }
651

652
    memset(srcName, 0, cap);
122✔
653
    memset(dstName, 0, cap);
122✔
654
  }
655

656
  taosMemoryFreeClear(srcName);
17!
657
  taosMemoryFreeClear(dstName);
17!
658
  TAOS_UNUSED(taosCloseDir(&pDir));
17✔
659
  return code;
17✔
660

661
_ERROR:
×
662
  taosMemoryFreeClear(srcName);
×
663
  taosMemoryFreeClear(dstName);
×
664
  TAOS_UNUSED(taosCloseDir(&pDir));
×
665
  return code;
×
666
}
667

668
int32_t backendCopyFiles(const char* src, const char* dst) { return backendFileCopyFilesImpl(src, dst); }
17✔
669

670
static int32_t rebuildFromLocalCheckpoint(const char* pTaskIdStr, const char* checkpointPath, int64_t checkpointId,
27✔
671
                                          const char* defaultPath, int64_t* processVer) {
672
  int32_t code = 0;
27✔
673
  cleanDir(defaultPath, pTaskIdStr);
27✔
674

675
  if (taosIsDir(checkpointPath) && isValidCheckpoint(checkpointPath)) {
27!
676
    stDebug("%s local checkpoint data existed, checkpointId:%" PRId64 " copy to backend dir", pTaskIdStr, checkpointId);
17✔
677

678
    code = backendCopyFiles(checkpointPath, defaultPath);
17✔
679
    if (code != TSDB_CODE_SUCCESS) {
17!
680
      cleanDir(defaultPath, pTaskIdStr);
×
681
      stError("%s failed to start stream backend from local %s, reason:%s, try download checkpoint from remote",
×
682
              pTaskIdStr, checkpointPath, tstrerror(TAOS_SYSTEM_ERROR(code)));
683
      code = TSDB_CODE_SUCCESS;
×
684
    } else {
685
      stInfo("%s copy checkpoint data from:%s to:%s succ, try to start stream backend", pTaskIdStr, checkpointPath,
17!
686
             defaultPath);
687
    }
688
  } else {
689
    code = terrno;
10✔
690
    stError("%s no valid data for checkpointId:%" PRId64 " in %s", pTaskIdStr, checkpointId, checkpointPath);
10!
691
  }
692

693
  return code;
27✔
694
}
695

696
int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId, char** dbPrefixPath, char** dbPath,
2,726✔
697
                              int64_t* processVer) {
698
  int32_t code = 0;
2,726✔
699

700
  char* prefixPath = NULL;
2,726✔
701
  char* defaultPath = NULL;
2,726✔
702
  char* checkpointPath = NULL;
2,726✔
703
  char* checkpointRoot = NULL;
2,726✔
704

705
  int32_t cap = strlen(path) + 128;
2,726✔
706
  int32_t nBytes;
707

708
  // alloc buf
709
  prefixPath = taosMemoryCalloc(1, cap);
2,726✔
710
  defaultPath = taosMemoryCalloc(1, cap);
2,727✔
711
  checkpointPath = taosMemoryCalloc(1, cap);
2,727✔
712
  checkpointRoot = taosMemoryCalloc(1, cap);
2,727✔
713
  if (prefixPath == NULL || defaultPath == NULL || checkpointPath == NULL || checkpointRoot == NULL) {
2,727!
714
    code = terrno;
×
715
    goto _EXIT;
×
716
  }
717

718
  nBytes = snprintf(prefixPath, cap, "%s%s%s", path, TD_DIRSEP, key);
2,727✔
719
  if (nBytes <= 0 || nBytes >= cap) {
2,727!
720
    code = TSDB_CODE_OUT_OF_RANGE;
×
721
    goto _EXIT;
×
722
  }
723

724
  code = createDirIfNotExist(prefixPath);
2,727✔
725
  if (code != 0) {
2,726!
726
    code = TAOS_SYSTEM_ERROR(errno);
×
727
    goto _EXIT;
×
728
  }
729

730
  nBytes = snprintf(defaultPath, cap, "%s%s%s", prefixPath, TD_DIRSEP, "state");
2,726✔
731
  if (nBytes <= 0 || nBytes >= cap) {
2,726!
732
    code = TSDB_CODE_OUT_OF_RANGE;
×
733
    goto _EXIT;
×
734
  }
735

736
  code = createDirIfNotExist(defaultPath);
2,727✔
737
  if (code != 0) {
2,726!
738
    code = TAOS_SYSTEM_ERROR(errno);
×
739
    goto _EXIT;
×
740
  }
741

742
  nBytes = snprintf(checkpointRoot, cap, "%s%s%s", prefixPath, TD_DIRSEP, "checkpoints");
2,726✔
743
  if (nBytes <= 0 || nBytes >= cap) {
2,726!
744
    code = TSDB_CODE_OUT_OF_RANGE;
×
745
    goto _EXIT;
×
746
  }
747

748
  code = createDirIfNotExist(checkpointRoot);
2,726✔
749
  if (code != 0) {
2,726!
750
    code = TAOS_SYSTEM_ERROR(errno);
×
751
    goto _EXIT;
×
752
  }
753

754
  stDebug("%s check local backend dir:%s, checkpointId:%" PRId64 " succ", key, defaultPath, chkptId);
2,726✔
755
  if (chkptId > 0) {
2,727✔
756
    nBytes = snprintf(checkpointPath, cap, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP,
27✔
757
                      "checkpoint", chkptId);
758
    if (nBytes <= 0 || nBytes >= cap) {
27!
759
      code = TSDB_CODE_OUT_OF_RANGE;
×
760
      goto _EXIT;
×
761
    }
762

763
    code = rebuildFromLocalCheckpoint(key, checkpointPath, chkptId, defaultPath, processVer);
27✔
764
    if (code != 0) {
27✔
765
      code = rebuildFromRemoteCheckpoint(key, checkpointPath, chkptId, defaultPath);
10✔
766
    }
767

768
    if (code != 0) {
27✔
769
      stError("failed to start stream backend at %s, restart from defaultPath:%s, reason:%s", checkpointPath,
10!
770
              defaultPath, tstrerror(code));
771
      code = 0;  // reset the error code
10✔
772
    }
773
  } else {  // no valid checkpoint id
774
    stInfo("%s no valid checkpoint ever generated, no need to copy checkpoint data, clean defaultPath:%s", key,
2,700!
775
           defaultPath);
776
    cleanDir(defaultPath, key);
2,700✔
777
  }
778

779
  *dbPath = defaultPath;
2,727✔
780
  *dbPrefixPath = prefixPath;
2,727✔
781
  defaultPath = NULL;
2,727✔
782
  prefixPath = NULL;
2,727✔
783

784
  code = 0;
2,727✔
785

786
_EXIT:
2,726✔
787
  taosMemoryFree(defaultPath);
2,726✔
788
  taosMemoryFree(prefixPath);
2,727✔
789
  taosMemoryFree(checkpointPath);
2,727✔
790
  taosMemoryFree(checkpointRoot);
2,727✔
791
  return code;
2,727✔
792
}
793
bool streamBackendDataIsExist(const char* path, int64_t chkpId) {
36✔
794
  bool    exist = true;
36✔
795
  int32_t cap = strlen(path) + 32;
36✔
796

797
  char* state = taosMemoryCalloc(1, cap);
36✔
798
  if (state == NULL) {
36!
799
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
800
    return false;
×
801
  }
802

803
  int16_t nBytes = snprintf(state, cap, "%s%s%s", path, TD_DIRSEP, "state");
36✔
804
  if (nBytes <= 0 || nBytes >= cap) {
36!
805
    terrno = TSDB_CODE_OUT_OF_RANGE;
×
806
    exist = false;
×
807
  } else {
808
    if (!taosDirExist(state)) {
36!
809
      exist = false;
36✔
810
    }
811
  }
812

813
  taosMemoryFree(state);
36✔
814
  return exist;
36✔
815
}
816

817
int32_t streamBackendInit(const char* streamPath, int64_t chkpId, int32_t vgId, SBackendWrapper** pBackend) {
×
818
  char*   backendPath = NULL;
×
819
  int32_t code = 0;
×
820
  int32_t lino = 0;
×
821
  char*   err = NULL;
×
822
  size_t  nCf = 0;
×
823

824
  *pBackend = NULL;
×
825

826
  code = rebuildDirFromCheckpoint(streamPath, chkpId, &backendPath);
×
827
  TSDB_CHECK_CODE(code, lino, _EXIT);
×
828

829
  stDebug("start to init stream backend:%s, checkpointId:%" PRId64 " vgId:%d", backendPath, chkpId, vgId);
×
830

831
  uint32_t         dbMemLimit = nextPow2(tsMaxStreamBackendCache) << 20;
×
832
  SBackendWrapper* pHandle = taosMemoryCalloc(1, sizeof(SBackendWrapper));
×
833
  TSDB_CHECK_NULL(pHandle, code, lino, _EXIT, terrno);
×
834

835
  pHandle->list = tdListNew(sizeof(SCfComparator));
×
836
  TSDB_CHECK_NULL(pHandle->list, code, lino, _EXIT, terrno);
×
837

838
  code = taosThreadMutexInit(&pHandle->mutex, NULL);
×
839
  TSDB_CHECK_CODE(code, lino, _EXIT);
×
840

841
  code = taosThreadMutexInit(&pHandle->cfMutex, NULL);
×
842
  TSDB_CHECK_CODE(code, lino, _EXIT);
×
843

844
  pHandle->cfInst = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
×
845
  TSDB_CHECK_NULL(pHandle->cfInst, code, lino, _EXIT, terrno);
×
846

847
  rocksdb_env_t* env = rocksdb_create_default_env();  // rocksdb_envoptions_create();
×
848

849
  int32_t nBGThread = tsNumOfSnodeStreamThreads <= 2 ? 1 : tsNumOfSnodeStreamThreads / 2;
×
850
  rocksdb_env_set_low_priority_background_threads(env, nBGThread);
×
851
  rocksdb_env_set_high_priority_background_threads(env, nBGThread);
×
852

853
  rocksdb_cache_t* cache = rocksdb_cache_create_lru(dbMemLimit / 2);
×
854

855
  rocksdb_options_t* opts = rocksdb_options_create();
×
856
  rocksdb_options_set_env(opts, env);
×
857
  rocksdb_options_set_create_if_missing(opts, 1);
×
858
  rocksdb_options_set_create_missing_column_families(opts, 1);
×
859
  rocksdb_options_set_max_total_wal_size(opts, dbMemLimit);
×
860
  rocksdb_options_set_recycle_log_file_num(opts, 6);
×
861
  rocksdb_options_set_max_write_buffer_number(opts, 3);
×
862
  rocksdb_options_set_info_log_level(opts, 1);
×
863
  rocksdb_options_set_db_write_buffer_size(opts, dbMemLimit);
×
864
  rocksdb_options_set_write_buffer_size(opts, dbMemLimit / 2);
×
865
  rocksdb_options_set_atomic_flush(opts, 1);
×
866

867
  pHandle->env = env;
×
868
  pHandle->dbOpt = opts;
×
869
  pHandle->cache = cache;
×
870
  pHandle->filterFactory = rocksdb_compactionfilterfactory_create(
×
871
      NULL, destroyCompactFilteFactory, compactFilteFactoryCreateFilter, compactFilteFactoryName);
872
  rocksdb_options_set_compaction_filter_factory(pHandle->dbOpt, pHandle->filterFactory);
×
873

874
  char** cfs = rocksdb_list_column_families(opts, backendPath, &nCf, &err);
×
875
  if (nCf == 0 || nCf == 1 || err != NULL) {
×
876
    taosMemoryFreeClear(err);
×
877
    pHandle->db = rocksdb_open(opts, backendPath, &err);
×
878
    if (err != NULL) {
×
879
      stError("failed to open rocksdb, path:%s, reason:%s", backendPath, err);
×
880
      taosMemoryFreeClear(err);
×
881
      rocksdb_list_column_families_destroy(cfs, nCf);
×
882
      goto _EXIT;
×
883
    }
884
  } else {
885
    /*
886
      list all cf and get prefix
887
    */
888
    code = streamStateOpenBackendCf(pHandle, (char*)backendPath, cfs, nCf);
×
889
    if (code != 0) {
×
890
      rocksdb_list_column_families_destroy(cfs, nCf);
×
891
      goto _EXIT;
×
892
    }
893
  }
894

895
  if (cfs != NULL) {
×
896
    rocksdb_list_column_families_destroy(cfs, nCf);
×
897
  }
898

899
  stDebug("init stream backend at %s, backend:%p, vgId:%d", backendPath, pHandle, vgId);
×
900
  taosMemoryFreeClear(backendPath);
×
901

902
  *pBackend = pHandle;
×
903
  return code;
×
904

905
_EXIT:
×
906
  rocksdb_options_destroy(opts);
×
907
  rocksdb_cache_destroy(cache);
×
908
  rocksdb_env_destroy(env);
×
909
  streamMutexDestroy(&pHandle->mutex);
×
910
  streamMutexDestroy(&pHandle->cfMutex);
×
911
  taosHashCleanup(pHandle->cfInst);
×
912
  pHandle->list = tdListFree(pHandle->list);
×
913
  taosMemoryFree(pHandle);
×
914
  stDebug("failed to init stream backend at %s, vgId:%d line:%d code:%s", backendPath, vgId, lino, tstrerror(code));
×
915
  taosMemoryFree(backendPath);
×
916
  return code;
×
917
}
918
void streamBackendCleanup(void* arg) {
×
919
  SBackendWrapper* pHandle = (SBackendWrapper*)arg;
×
920

921
  void* pIter = taosHashIterate(pHandle->cfInst, NULL);
×
922
  while (pIter != NULL) {
×
923
    RocksdbCfInst* inst = *(RocksdbCfInst**)pIter;
×
924
    destroyRocksdbCfInst(inst);
×
925
    pIter = taosHashIterate(pHandle->cfInst, pIter);
×
926
  }
927

928
  taosHashCleanup(pHandle->cfInst);
×
929

930
  if (pHandle->db) {
×
931
    rocksdb_close(pHandle->db);
×
932
    pHandle->db = NULL;
×
933
  }
934
  rocksdb_options_destroy(pHandle->dbOpt);
×
935
  rocksdb_env_destroy(pHandle->env);
×
936
  rocksdb_cache_destroy(pHandle->cache);
×
937

938
  SListNode* head = tdListPopHead(pHandle->list);
×
939
  while (head != NULL) {
×
940
    streamStateDestroyCompar(head->data);
×
941
    taosMemoryFree(head);
×
942
    head = tdListPopHead(pHandle->list);
×
943
  }
944

945
  pHandle->list = tdListFree(pHandle->list);
×
946
  streamMutexDestroy(&pHandle->mutex);
×
947

948
  streamMutexDestroy(&pHandle->cfMutex);
×
949
  stDebug("destroy stream backend :%p", pHandle);
×
950
  taosMemoryFree(pHandle);
×
951
  return;
×
952
}
953
void streamBackendHandleCleanup(void* arg) {
×
954
  SBackendCfWrapper* wrapper = arg;
×
955
  bool               remove = wrapper->remove;
×
956
  TAOS_UNUSED(taosThreadRwlockWrlock(&wrapper->rwLock));
×
957

958
  stDebug("start to do-close backendwrapper %p, %s", wrapper, wrapper->idstr);
×
959
  if (wrapper->rocksdb == NULL) {
×
960
    TAOS_UNUSED(taosThreadRwlockUnlock(&wrapper->rwLock));
×
961
    return;
×
962
  }
963

964
  int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
×
965

966
  char* err = NULL;
×
967
  if (remove) {
×
968
    for (int i = 0; i < cfLen; i++) {
×
969
      if (wrapper->pHandle[i] != NULL) rocksdb_drop_column_family(wrapper->rocksdb, wrapper->pHandle[i], &err);
×
970
      if (err != NULL) {
×
971
        stError("failed to drop cf:%s_%s, reason:%s", wrapper->idstr, ginitDict[i].key, err);
×
972
        taosMemoryFreeClear(err);
×
973
      }
974
    }
975
  } else {
976
    rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
×
977
    rocksdb_flushoptions_set_wait(flushOpt, 1);
×
978

979
    for (int i = 0; i < cfLen; i++) {
×
980
      if (wrapper->pHandle[i] != NULL) rocksdb_flush_cf(wrapper->rocksdb, flushOpt, wrapper->pHandle[i], &err);
×
981
      if (err != NULL) {
×
982
        stError("failed to flush cf:%s_%s, reason:%s", wrapper->idstr, ginitDict[i].key, err);
×
983
        taosMemoryFreeClear(err);
×
984
      }
985
    }
986
    rocksdb_flushoptions_destroy(flushOpt);
×
987
  }
988

989
  for (int i = 0; i < cfLen; i++) {
×
990
    if (wrapper->pHandle[i] != NULL) {
×
991
      rocksdb_column_family_handle_destroy(wrapper->pHandle[i]);
×
992
    }
993
  }
994
  taosMemoryFreeClear(wrapper->pHandle);
×
995

996
  for (int i = 0; i < cfLen; i++) {
×
997
    rocksdb_options_destroy(wrapper->cfOpts[i]);
×
998
    rocksdb_block_based_options_destroy(((RocksdbCfParam*)wrapper->param)[i].tableOpt);
×
999
  }
1000

1001
  if (remove) {
×
1002
    streamBackendDelCompare(wrapper->pBackend, wrapper->pComparNode);
×
1003
  }
1004
  rocksdb_writeoptions_destroy(wrapper->writeOpts);
×
1005
  wrapper->writeOpts = NULL;
×
1006

1007
  rocksdb_readoptions_destroy(wrapper->readOpts);
×
1008
  wrapper->readOpts = NULL;
×
1009
  taosMemoryFreeClear(wrapper->cfOpts);
×
1010
  taosMemoryFreeClear(wrapper->param);
×
1011
  TAOS_UNUSED(taosThreadRwlockUnlock(&wrapper->rwLock));
×
1012

1013
  TAOS_UNUSED(taosThreadRwlockDestroy(&wrapper->rwLock));
×
1014
  wrapper->rocksdb = NULL;
×
1015
  // taosReleaseRef(streamBackendId, wrapper->backendId);
1016

1017
  stDebug("end to do-close backendwrapper %p, %s", wrapper, wrapper->idstr);
×
1018
  taosMemoryFree(wrapper);
×
1019
  return;
×
1020
}
1021

1022
#ifdef BUILD_NO_CALL
1023
int32_t getLatestCheckpoint(void* arg, int64_t* checkpoint) {
1024
  SStreamMeta* pMeta = arg;
1025
  taosWLockLatch(&pMeta->chkpDirLock);
1026
  int64_t tc = 0;
1027
  int32_t sz = taosArrayGetSize(pMeta->chkpSaved);
1028
  if (sz <= 0) {
1029
    taosWUnLockLatch(&pMeta->chkpDirLock);
1030
    return -1;
1031
  } else {
1032
    tc = *(int64_t*)taosArrayGetLast(pMeta->chkpSaved);
1033
  }
1034

1035
  taosArrayPush(pMeta->chkpInUse, &tc);
1036

1037
  *checkpoint = tc;
1038
  taosWUnLockLatch(&pMeta->chkpDirLock);
1039
  return 0;
1040
}
1041
/*
1042
 *  checkpointSave |--cp1--|--cp2--|--cp3--|--cp4--|--cp5--|
1043
 *  chkpInUse: |--cp2--|--cp4--|
1044
 *  chkpInUse is doing translation, cannot del until
1045
 *  replication is finished
1046
 */
1047
int32_t delObsoleteCheckpoint(void* arg, const char* path) {
1048
  SStreamMeta* pMeta = arg;
1049

1050
  taosWLockLatch(&pMeta->chkpDirLock);
1051

1052
  SArray* chkpDel = taosArrayInit(10, sizeof(int64_t));
1053
  SArray* chkpDup = taosArrayInit(10, sizeof(int64_t));
1054

1055
  int64_t firsId = 0;
1056
  if (taosArrayGetSize(pMeta->chkpInUse) >= 1) {
1057
    firsId = *(int64_t*)taosArrayGet(pMeta->chkpInUse, 0);
1058

1059
    for (int i = 0; i < taosArrayGetSize(pMeta->chkpSaved); i++) {
1060
      int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpSaved, i);
1061
      if (id >= firsId) {
1062
        taosArrayPush(chkpDup, &id);
1063
      } else {
1064
        taosArrayPush(chkpDel, &id);
1065
      }
1066
    }
1067
  } else {
1068
    int32_t sz = taosArrayGetSize(pMeta->chkpSaved);
1069
    int32_t dsz = sz - pMeta->chkpCap;  // del size
1070

1071
    for (int i = 0; i < dsz; i++) {
1072
      int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpSaved, i);
1073
      taosArrayPush(chkpDel, &id);
1074
    }
1075
    for (int i = dsz < 0 ? 0 : dsz; i < sz; i++) {
1076
      int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpSaved, i);
1077
      taosArrayPush(chkpDup, &id);
1078
    }
1079
  }
1080
  taosArrayDestroy(pMeta->chkpSaved);
1081
  pMeta->chkpSaved = chkpDup;
1082

1083
  taosWUnLockLatch(&pMeta->chkpDirLock);
1084

1085
  for (int i = 0; i < taosArrayGetSize(chkpDel); i++) {
1086
    int64_t id = *(int64_t*)taosArrayGet(chkpDel, i);
1087
    char    tbuf[256] = {0};
1088
    sprintf(tbuf, "%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, id);
1089
    if (taosIsDir(tbuf)) {
1090
      taosRemoveDir(tbuf);
1091
    }
1092
  }
1093
  taosArrayDestroy(chkpDel);
1094
  return 0;
1095
}
1096
#endif
1097
/*
1098
 *  checkpointSave |--cp1--|--cp2--|--cp3--|--cp4--|--cp5--|
1099
 *  chkpInUse: |--cp2--|--cp4--|
1100
 *  chkpInUse is doing translation, cannot del until
1101
 *  replication is finished
1102
 */
1103
int32_t chkpMayDelObsolete(void* arg, int64_t chkpId, char* path) {
1,363✔
1104
  int32_t         code = 0;
1,363✔
1105
  STaskDbWrapper* pBackend = arg;
1,363✔
1106
  SArray *        chkpDel = NULL, *chkpDup = NULL;
1,363✔
1107
  TAOS_UNUSED(taosThreadRwlockWrlock(&pBackend->chkpDirLock));
1,363✔
1108

1109
  if (taosArrayPush(pBackend->chkpSaved, &chkpId) == NULL) {
2,726!
1110
    TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
1111
  }
1112

1113
  chkpDel = taosArrayInit(8, sizeof(int64_t));
1,363✔
1114
  if (chkpDel == NULL) {
1,363!
1115
    TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
1116
  }
1117

1118
  chkpDup = taosArrayInit(8, sizeof(int64_t));
1,363✔
1119
  if (chkpDup == NULL) {
1,363!
1120
    TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
1121
  }
1122

1123
  int64_t firsId = 0;
1,363✔
1124
  if (taosArrayGetSize(pBackend->chkpInUse) >= 1) {
1,363!
1125
    firsId = *(int64_t*)taosArrayGet(pBackend->chkpInUse, 0);
×
1126

1127
    for (int i = 0; i < taosArrayGetSize(pBackend->chkpSaved); i++) {
×
1128
      int64_t id = *(int64_t*)taosArrayGet(pBackend->chkpSaved, i);
×
1129
      if (id >= firsId) {
×
1130
        if (taosArrayPush(chkpDup, &id) == NULL) {
×
1131
          TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
1132
        }
1133
      } else {
1134
        if (taosArrayPush(chkpDel, &id) == NULL) {
×
1135
          TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
1136
        }
1137
      }
1138
    }
1139
  } else {
1140
    int32_t sz = taosArrayGetSize(pBackend->chkpSaved);
1,363✔
1141
    int32_t dsz = sz - pBackend->chkpCap;  // del size
1,363✔
1142

1143
    for (int i = 0; i < dsz; i++) {
1,363!
1144
      int64_t id = *(int64_t*)taosArrayGet(pBackend->chkpSaved, i);
×
1145
      if (taosArrayPush(chkpDel, &id) == NULL) {
×
1146
        TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
1147
      }
1148
    }
1149
    for (int i = dsz < 0 ? 0 : dsz; i < sz; i++) {
2,778✔
1150
      int64_t id = *(int64_t*)taosArrayGet(pBackend->chkpSaved, i);
1,415✔
1151
      if (taosArrayPush(chkpDup, &id) == NULL) {
1,415!
1152
        TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
1153
      }
1154
    }
1155
  }
1156

1157
  taosArrayDestroy(pBackend->chkpSaved);
1,363✔
1158
  pBackend->chkpSaved = chkpDup;
1,363✔
1159

1160
  TAOS_UNUSED(taosThreadRwlockUnlock(&pBackend->chkpDirLock));
1,363✔
1161

1162
  for (int i = 0; i < taosArrayGetSize(chkpDel); i++) {
1,363!
1163
    int64_t id = *(int64_t*)taosArrayGet(chkpDel, i);
×
1164
    char    tbuf[256] = {0};
×
1165
    sprintf(tbuf, "%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, id);
×
1166

1167
    stInfo("backend remove obsolete checkpoint: %s", tbuf);
×
1168
    if (taosIsDir(tbuf)) {
×
1169
      taosRemoveDir(tbuf);
×
1170
    }
1171
  }
1172
  taosArrayDestroy(chkpDel);
1,363✔
1173
  return 0;
1,363✔
1174
_exception:
×
1175
  taosArrayDestroy(chkpDup);
×
1176
  taosArrayDestroy(chkpDel);
×
1177
  TAOS_UNUSED(taosThreadRwlockUnlock(&pBackend->chkpDirLock));
×
1178
  return code;
×
1179
}
1180

1181
int chkpIdComp(const void* a, const void* b) {
16✔
1182
  int64_t x = *(int64_t*)a;
16✔
1183
  int64_t y = *(int64_t*)b;
16✔
1184
  if (x == y) return 0;
16!
1185

1186
  return x < y ? -1 : 1;
16!
1187
}
1188
int32_t taskDbLoadChkpInfo(STaskDbWrapper* pBackend) {
2,727✔
1189
  int32_t code = 0;
2,727✔
1190
  char*   pChkpDir = taosMemoryCalloc(1, 256);
2,727✔
1191
  if (pChkpDir == NULL) {
2,727!
1192
    return terrno;
×
1193
  }
1194

1195
  sprintf(pChkpDir, "%s%s%s", pBackend->path, TD_DIRSEP, "checkpoints");
2,727✔
1196
  if (!taosIsDir(pChkpDir)) {
2,727!
1197
    taosMemoryFree(pChkpDir);
×
1198
    return 0;
×
1199
  }
1200
  TdDirPtr pDir = taosOpenDir(pChkpDir);
2,727✔
1201
  if (pDir == NULL) {
2,727!
UNCOV
1202
    taosMemoryFree(pChkpDir);
×
1203
    return 0;
×
1204
  }
1205
  TdDirEntryPtr de = NULL;
2,727✔
1206
  while ((de = taosReadDir(pDir)) != NULL) {
8,217✔
1207
    if (strcmp(taosGetDirEntryName(de), ".") == 0 || strcmp(taosGetDirEntryName(de), "..") == 0) continue;
5,490✔
1208

1209
    if (taosDirEntryIsDir(de)) {
36!
1210
      char    checkpointPrefix[32] = {0};
36✔
1211
      int64_t checkpointId = 0;
36✔
1212

1213
      int ret = sscanf(taosGetDirEntryName(de), "checkpoint%" PRId64 "", &checkpointId);
36✔
1214
      if (ret == 1) {
36!
1215
        if (taosArrayPush(pBackend->chkpSaved, &checkpointId) == NULL) {
72!
1216
          TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
1217
        }
1218
      }
1219
    } else {
1220
      continue;
×
1221
    }
1222
  }
1223
  taosArraySort(pBackend->chkpSaved, chkpIdComp);
2,727✔
1224

1225
  taosMemoryFree(pChkpDir);
2,726✔
1226
  TAOS_UNUSED(taosCloseDir(&pDir));
2,727✔
1227

1228
  return 0;
2,727✔
1229
_exception:
×
1230
  taosMemoryFree(pChkpDir);
×
1231
  TAOS_UNUSED(taosCloseDir(&pDir));
×
1232
  return code;
×
1233
}
1234
int32_t chkpGetAllDbCfHandle2(STaskDbWrapper* pBackend, rocksdb_column_family_handle_t*** ppHandle) {
1,362✔
1235
  int32_t code = 0;
1,362✔
1236
  SArray* pHandle = taosArrayInit(8, POINTER_BYTES);
1,362✔
1237
  for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
10,904✔
1238
    if (pBackend->pCf[i]) {
9,541✔
1239
      rocksdb_column_family_handle_t* p = pBackend->pCf[i];
3,511✔
1240
      if (taosArrayPush(pHandle, &p) == NULL) {
3,511!
1241
        code = terrno;
×
1242
        goto _exception;
×
1243
      }
1244
    }
1245
  }
1246
  int32_t nCf = taosArrayGetSize(pHandle);
1,363✔
1247
  if (nCf == 0) {
1,363!
1248
    taosArrayDestroy(pHandle);
×
1249
    return nCf;
×
1250
  }
1251

1252
  rocksdb_column_family_handle_t** ppCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
1,363✔
1253
  if (ppCf == NULL) {
1,362!
1254
    TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
1255
  }
1256
  for (int i = 0; i < nCf; i++) {
4,870✔
1257
    ppCf[i] = taosArrayGetP(pHandle, i);
3,508✔
1258
  }
1259

1260
  taosArrayDestroy(pHandle);
1,362✔
1261

1262
  *ppHandle = ppCf;
1,363✔
1263
  return nCf;
1,363✔
1264
_exception:
×
1265
  taosArrayDestroy(pHandle);
×
1266
  return code;
×
1267
}
1268

1269
int32_t chkpDoDbCheckpoint(rocksdb_t* db, char* path) {
1,363✔
1270
  int32_t               code = -1;
1,363✔
1271
  char*                 err = NULL;
1,363✔
1272
  rocksdb_checkpoint_t* cp = rocksdb_checkpoint_object_create(db, &err);
1,363✔
1273
  if (cp == NULL || err != NULL) {
1,363!
1274
    stError("failed to do checkpoint at:%s, reason:%s", path, err);
×
1275
    taosMemoryFreeClear(err);
×
1276
    code = TSDB_CODE_THIRDPARTY_ERROR;
×
1277
    goto _ERROR;
×
1278
  }
1279
  rocksdb_checkpoint_create(cp, path, UINT64_MAX, &err);
1,363✔
1280
  if (err != NULL) {
1,363!
1281
    stError("failed to do checkpoint at:%s, reason:%s", path, err);
×
1282
    taosMemoryFreeClear(err);
×
1283
    code = TSDB_CODE_THIRDPARTY_ERROR;
×
1284
  } else {
1285
    code = 0;
1,363✔
1286
  }
1287
_ERROR:
1,363✔
1288
  rocksdb_checkpoint_object_destroy(cp);
1,363✔
1289
  return code;
1,363✔
1290
}
1291

1292
int32_t chkpPreFlushDb(rocksdb_t* db, rocksdb_column_family_handle_t** cf, int32_t nCf) {
1,159✔
1293
  int   code = 0;
1,159✔
1294
  char* err = NULL;
1,159✔
1295

1296
  rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
1,159✔
1297
  if (flushOpt == NULL) {
1,159!
1298
    return TSDB_CODE_OUT_OF_MEMORY;
×
1299
  }
1300

1301
  rocksdb_flushoptions_set_wait(flushOpt, 1);
1,159✔
1302

1303
  rocksdb_flush_cfs(db, flushOpt, cf, nCf, &err);
1,159✔
1304
  if (err != NULL) {
1,159!
1305
    stError("failed to flush db before streamBackend clean up, reason:%s", err);
×
1306
    taosMemoryFree(err);
×
1307
    code = TSDB_CODE_THIRDPARTY_ERROR;
×
1308
  }
1309
  rocksdb_flushoptions_destroy(flushOpt);
1,159✔
1310
  return code;
1,159✔
1311
}
1312

1313
int32_t chkpPreBuildDir(char* path, int64_t chkpId, char** chkpDir, char** chkpIdDir) {
1,363✔
1314
  int32_t code = 0;
1,363✔
1315
  int32_t cap = strlen(path) + 256;
1,363✔
1316
  int32_t nBytes = 0;
1,363✔
1317

1318
  char* pChkpDir = taosMemoryCalloc(1, cap);
1,363✔
1319
  char* pChkpIdDir = taosMemoryCalloc(1, cap);
1,363✔
1320
  if (pChkpDir == NULL || pChkpIdDir == NULL) {
1,363!
1321
    code = terrno;
×
1322
    goto _EXIT;
×
1323
  }
1324

1325
  nBytes = snprintf(pChkpDir, cap, "%s%s%s", path, TD_DIRSEP, "checkpoints");
1,363✔
1326
  if (nBytes <= 0 || nBytes >= cap) {
1,363!
1327
    code = TSDB_CODE_OUT_OF_RANGE;
×
1328
    goto _EXIT;
×
1329
  }
1330

1331
  nBytes = snprintf(pChkpIdDir, cap, "%s%s%s%" PRId64, pChkpDir, TD_DIRSEP, "checkpoint", chkpId);
1,363✔
1332
  if (nBytes <= 0 || nBytes >= cap) {
1,363!
1333
    code = TSDB_CODE_OUT_OF_RANGE;
×
1334
    goto _EXIT;
×
1335
  }
1336

1337
  code = taosMulModeMkDir(pChkpDir, 0755, true);
1,363✔
1338
  if (code != 0) {
1,363!
1339
    code = terrno;
×
1340
    stError("failed to prepare checkpoint dir, path:%s, reason:%s", path, tstrerror(code));
×
1341
    goto _EXIT;
×
1342
  }
1343

1344
  if (taosIsDir(pChkpIdDir)) {
1,363!
1345
    stInfo("stream rm exist checkpoint%s", pChkpIdDir);
×
1346
    taosRemoveDir(pChkpIdDir);
×
1347
  }
1348

1349
  *chkpDir = pChkpDir;
1,362✔
1350
  *chkpIdDir = pChkpIdDir;
1,362✔
1351
  return 0;
1,362✔
1352
_EXIT:
×
1353
  taosMemoryFree(pChkpDir);
×
1354
  taosMemoryFree(pChkpIdDir);
×
1355
  return code;
×
1356
}
1357

1358
int32_t taskDbBuildSnap(void* arg, SArray* pSnap) {
85✔
1359
  // vnode task->db
1360
  SStreamMeta* pMeta = arg;
85✔
1361

1362
  streamMutexLock(&pMeta->backendMutex);
85✔
1363
  void*   pIter = taosHashIterate(pMeta->pTaskDbUnique, NULL);
85✔
1364
  int32_t code = 0;
85✔
1365

1366
  while (pIter) {
85!
1367
    STaskDbWrapper* pTaskDb = *(STaskDbWrapper**)pIter;
×
1368

1369
    void* p = taskDbAddRef(pTaskDb);
×
1370
    if (p == NULL) {
×
1371
      terrno = 0;
×
1372
      pIter = taosHashIterate(pMeta->pTaskDbUnique, pIter);
×
1373
      continue;
×
1374
    }
1375

1376
    // add chkpId to in-use-ckpkIdSet
1377
    taskDbRefChkp(pTaskDb, pTaskDb->chkpId);
×
1378

1379
    code = taskDbDoCheckpoint(pTaskDb, pTaskDb->chkpId, ((SStreamTask*)pTaskDb->pTask)->chkInfo.processedVer);
×
1380
    if (code != 0) {
×
1381
      // remove chkpId from in-use-ckpkIdSet
1382
      taskDbUnRefChkp(pTaskDb, pTaskDb->chkpId);
×
1383
      taskDbRemoveRef(pTaskDb);
×
1384
      break;
×
1385
    }
1386

1387
    SStreamTask*    pTask = pTaskDb->pTask;
×
1388
    SStreamTaskSnap snap = {.streamId = pTask->id.streamId,
×
1389
                            .taskId = pTask->id.taskId,
×
1390
                            .chkpId = pTaskDb->chkpId,
×
1391
                            .dbPrefixPath = taosStrdup(pTaskDb->path)};
×
1392
    if (snap.dbPrefixPath == NULL) {
×
1393
      // remove chkpid from chkp-in-use set
1394
      taskDbUnRefChkp(pTaskDb, pTaskDb->chkpId);
×
1395
      taskDbRemoveRef(pTaskDb);
×
1396
      code = terrno;
×
1397
      break;
×
1398
    }
1399
    if (taosArrayPush(pSnap, &snap) == NULL) {
×
1400
      taskDbUnRefChkp(pTaskDb, pTaskDb->chkpId);
×
1401
      taskDbRemoveRef(pTaskDb);
×
1402
      code = terrno;
×
1403
      break;
×
1404
    }
1405

1406
    taskDbRemoveRef(pTaskDb);
×
1407
    pIter = taosHashIterate(pMeta->pTaskDbUnique, pIter);
×
1408
  }
1409
  streamMutexUnlock(&pMeta->backendMutex);
85✔
1410
  return code;
85✔
1411
}
1412
int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo) {
85✔
1413
  if (pSnapInfo == NULL) return 0;
85!
1414
  SStreamMeta* pMeta = arg;
85✔
1415
  int32_t      code = 0;
85✔
1416
  streamMutexLock(&pMeta->backendMutex);
85✔
1417

1418
  char buf[128] = {0};
85✔
1419
  for (int i = 0; i < taosArrayGetSize(pSnapInfo); i++) {
85!
1420
    SStreamTaskSnap* pSnap = taosArrayGet(pSnapInfo, i);
×
1421
    sprintf(buf, "0x%" PRIx64 "-0x%x", pSnap->streamId, (int32_t)pSnap->taskId);
×
1422
    STaskDbWrapper** pTaskDb = taosHashGet(pMeta->pTaskDbUnique, buf, strlen(buf));
×
1423
    if (pTaskDb == NULL || *pTaskDb == NULL) {
×
1424
      stWarn("stream backend:%p failed to find task db, streamId:% " PRId64 "", pMeta, pSnap->streamId);
×
1425
      memset(buf, 0, sizeof(buf));
×
1426
      continue;
×
1427
    }
1428
    memset(buf, 0, sizeof(buf));
×
1429

1430
    taskDbUnRefChkp(*pTaskDb, pSnap->chkpId);
×
1431
  }
1432
  streamMutexUnlock(&pMeta->backendMutex);
85✔
1433
  return 0;
85✔
1434
}
1435
#ifdef BUILD_NO_CALL
1436
int32_t streamBackendAddInUseChkp(void* arg, int64_t chkpId) {
1437
  // if (arg == NULL) return 0;
1438

1439
  // SStreamMeta* pMeta = arg;
1440
  // taosWLockLatch(&pMeta->chkpDirLock);
1441
  // taosArrayPush(pMeta->chkpInUse, &chkpId);
1442
  // taosWUnLockLatch(&pMeta->chkpDirLock);
1443
  return 0;
1444
}
1445
int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId) {
1446
  return 0;
1447
  // if (arg == NULL) return 0;
1448

1449
  // SStreamMeta* pMeta = arg;
1450
  // taosWLockLatch(&pMeta->chkpDirLock);
1451
  // if (taosArrayGetSize(pMeta->chkpInUse) > 0) {
1452
  //   int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpInUse, 0);
1453
  //   if (id == chkpId) {
1454
  //     taosArrayPopFrontBatch(pMeta->chkpInUse, 1);
1455
  //   }
1456
  // }
1457
  // taosWUnLockLatch(&pMeta->chkpDirLock);
1458
}
1459
#endif
1460

1461
/*
1462
   0
1463
*/
1464

1465
void* taskAcquireDb(int64_t refId) {
×
1466
  // acquire
1467
  void* p = taosAcquireRef(taskDbWrapperId, refId);
×
1468
  return p;
×
1469
}
1470
void taskReleaseDb(int64_t refId) {
×
1471
  // release
1472
  TAOS_UNUSED(taosReleaseRef(taskDbWrapperId, refId));
×
1473
}
×
1474

1475
int64_t taskGetDBRef(void* arg) {
×
1476
  if (arg == NULL) return -1;
×
1477
  STaskDbWrapper* pDb = arg;
×
1478
  return pDb->refId;
×
1479
}
1480

1481
int32_t chkpLoadExtraInfo(char* pChkpIdDir, int64_t* chkpId, int64_t* processId) {
2,727✔
1482
  TdFilePtr pFile = NULL;
2,727✔
1483
  int32_t   code = 0;
2,727✔
1484

1485
  char    buf[256] = {0};
2,727✔
1486
  int32_t nBytes = 0;
2,727✔
1487

1488
  int32_t len = strlen(pChkpIdDir);
2,727✔
1489
  if (len == 0) {
2,727!
1490
    code = TSDB_CODE_INVALID_PARA;
×
1491
    stError("failed to load extra info, dir:%s, reason:%s", pChkpIdDir, tstrerror(code));
×
1492
    return code;
×
1493
  }
1494

1495
  int32_t cap = len + 64;
2,727✔
1496
  char*   pDst = taosMemoryCalloc(1, cap);
2,727✔
1497
  if (pDst == NULL) {
2,727!
1498
    code = terrno;
×
1499
    stError("failed to alloc memory to load extra info, dir:%s", pChkpIdDir);
×
1500
    goto _EXIT;
×
1501
  }
1502

1503
  nBytes = snprintf(pDst, cap, "%s%sinfo", pChkpIdDir, TD_DIRSEP);
2,727✔
1504
  if (nBytes <= 0 || nBytes >= cap) {
2,727!
1505
    code = TSDB_CODE_OUT_OF_RANGE;
×
1506
    stError("failed to build dst to load extra info, dir:%s", pChkpIdDir);
×
1507
    goto _EXIT;
×
1508
  }
1509

1510
  pFile = taosOpenFile(pDst, TD_FILE_READ);
2,727✔
1511
  if (pFile == NULL) {
2,726✔
1512
    // compatible with previous version
1513
    *processId = -1;
2,709✔
1514
    code = 0;
2,709✔
1515
    stWarn("failed to open file to load extra info, file:%s, reason:%s", pDst, tstrerror(terrno));
2,709!
1516
    goto _EXIT;
2,710✔
1517
  }
1518

1519
  if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) {
17!
1520
    code = terrno;
×
1521
    stError("failed to read file to load extra info, file:%s, reason:%s", pDst, tstrerror(code));
×
1522
    goto _EXIT;
×
1523
  }
1524

1525
  if (sscanf(buf, "%" PRId64 " %" PRId64 "", chkpId, processId) < 2) {
17!
1526
    code = TSDB_CODE_INVALID_PARA;
×
1527
    stError("failed to read file content to load extra info, file:%s, reason:%s", pDst, tstrerror(code));
×
1528
    goto _EXIT;
×
1529
  }
1530
  code = 0;
17✔
1531
_EXIT:
2,727✔
1532
  taosMemoryFree(pDst);
2,727✔
1533
  TAOS_UNUSED(taosCloseFile(&pFile));
2,727✔
1534
  return code;
2,727✔
1535
}
1536
int32_t chkpAddExtraInfo(char* pChkpIdDir, int64_t chkpId, int64_t processId) {
1,363✔
1537
  int32_t code = 0;
1,363✔
1538

1539
  TdFilePtr pFile = NULL;
1,363✔
1540

1541
  char    buf[256] = {0};
1,363✔
1542
  int32_t nBytes = 0;
1,363✔
1543

1544
  int32_t len = strlen(pChkpIdDir);
1,363✔
1545
  if (len == 0) {
1,363!
1546
    code = TSDB_CODE_INVALID_PARA;
×
1547
    stError("failed to add extra info, dir:%s, reason:%s", pChkpIdDir, tstrerror(code));
×
1548
    return code;
×
1549
  }
1550

1551
  int32_t cap = len + 64;
1,363✔
1552
  char*   pDst = taosMemoryCalloc(1, cap);
1,363✔
1553
  if (pDst == NULL) {
1,363!
1554
    code = terrno;
×
1555
    stError("failed to alloc memory to add extra info, dir:%s", pChkpIdDir);
×
1556
    goto _EXIT;
×
1557
  }
1558

1559
  nBytes = snprintf(pDst, cap, "%s%sinfo", pChkpIdDir, TD_DIRSEP);
1,363✔
1560
  if (nBytes <= 0 || nBytes >= cap) {
1,363!
1561
    code = TSDB_CODE_OUT_OF_RANGE;
×
1562
    stError("failed to build dst to add extra info, dir:%s, reason:%s", pChkpIdDir, tstrerror(code));
×
1563
    goto _EXIT;
×
1564
  }
1565

1566
  pFile = taosOpenFile(pDst, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
1,363✔
1567
  if (pFile == NULL) {
1,363!
1568
    code = terrno;
×
1569
    stError("failed to open file to add extra info, file:%s, reason:%s", pDst, tstrerror(code));
×
1570
    goto _EXIT;
×
1571
  }
1572

1573
  nBytes = snprintf(buf, sizeof(buf), "%" PRId64 " %" PRId64 "", chkpId, processId);
1,363✔
1574
  if (nBytes <= 0 || nBytes >= sizeof(buf)) {
1,363!
1575
    code = TSDB_CODE_OUT_OF_RANGE;
×
1576
    stError("failed to build content to add extra info, dir:%s,reason:%s", pChkpIdDir, tstrerror(code));
×
1577
    goto _EXIT;
×
1578
  }
1579

1580
  if (nBytes != taosWriteFile(pFile, buf, nBytes)) {
1,363!
1581
    code = terrno;
×
1582
    stError("failed to write file to add extra info, file:%s, reason:%s", pDst, tstrerror(code));
×
1583
    goto _EXIT;
×
1584
  }
1585
  code = 0;
1,363✔
1586

1587
_EXIT:
1,363✔
1588
  TAOS_UNUSED(taosCloseFile(&pFile));
1,363✔
1589
  taosMemoryFree(pDst);
1,363✔
1590
  return code;
1,363✔
1591
}
1592
int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId, int64_t processId) {
1,363✔
1593
  STaskDbWrapper* pTaskDb = arg;
1,363✔
1594
  int64_t         st = taosGetTimestampMs();
1,363✔
1595
  int32_t         code = 0;
1,363✔
1596
  int64_t         refId = pTaskDb->refId;
1,363✔
1597

1598
  if (taosAcquireRef(taskDbWrapperId, refId) == NULL) {
1,363!
1599
    code = terrno;
×
1600
    return code;
×
1601
  }
1602

1603
  char* pChkpDir = NULL;
1,363✔
1604
  char* pChkpIdDir = NULL;
1,363✔
1605
  if ((code = chkpPreBuildDir(pTaskDb->path, chkpId, &pChkpDir, &pChkpIdDir)) < 0) {
1,363!
1606
    goto _EXIT;
×
1607
  }
1608
  // Get all cf and acquire cfWrappter
1609
  rocksdb_column_family_handle_t** ppCf = NULL;
1,362✔
1610

1611
  int32_t nCf = chkpGetAllDbCfHandle2(pTaskDb, &ppCf);
1,362✔
1612
  stDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pTaskDb, pChkpIdDir, nCf);
1,363✔
1613

1614
  int64_t written = atomic_load_64(&pTaskDb->dataWritten);
1,363✔
1615

1616
  // flush db
1617
  if (written > 0) {
1,363✔
1618
    stDebug("stream backend:%p start to flush db at:%s, data written:%" PRId64 "", pTaskDb, pChkpIdDir, written);
1,159✔
1619
    code = chkpPreFlushDb(pTaskDb->db, ppCf, nCf);
1,159✔
1620
    if (code != 0) goto _EXIT;
1,159!
1621
  } else {
1622
    stDebug("stream backend:%p not need flush db at:%s, data written:%" PRId64 "", pTaskDb, pChkpIdDir, written);
204!
1623
  }
1624

1625
  // do checkpoint
1626
  if ((code = chkpDoDbCheckpoint(pTaskDb->db, pChkpIdDir)) != 0) {
1,363!
1627
    stError("stream backend:%p failed to do checkpoint at:%s", pTaskDb, pChkpIdDir);
×
1628
    goto _EXIT;
×
1629
  } else {
1630
    stDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pTaskDb, pChkpIdDir,
2,720✔
1631
            taosGetTimestampMs() - st);
1632
  }
1633

1634
  // add extra info to checkpoint
1635
  if ((code = chkpAddExtraInfo(pChkpIdDir, chkpId, processId)) != 0) {
1,363!
1636
    stError("stream backend:%p failed to add extra info to checkpoint at:%s", pTaskDb, pChkpIdDir);
×
1637
    goto _EXIT;
×
1638
  }
1639

1640
  // delete ttl checkpoint
1641
  code = chkpMayDelObsolete(pTaskDb, chkpId, pChkpDir);
1,363✔
1642
  if (code < 0) {
1,363!
1643
    goto _EXIT;
×
1644
  }
1645

1646
  TAOS_UNUSED(atomic_store_64(&pTaskDb->dataWritten, 0));
1,363✔
1647
  pTaskDb->chkpId = chkpId;
1,363✔
1648

1649
_EXIT:
1,363✔
1650

1651
  // clear checkpoint dir if failed
1652
  if (code != 0 && pChkpDir != NULL) {
1,363!
1653
    if (taosDirExist(pChkpIdDir)) {
×
1654
      TAOS_UNUSED(taosRemoveDir(pChkpIdDir));
×
1655
    }
1656
  }
1657
  taosMemoryFree(pChkpIdDir);
1,363✔
1658
  taosMemoryFree(pChkpDir);
1,363✔
1659

1660
  TAOS_UNUSED(taosReleaseRef(taskDbWrapperId, refId));
1,363✔
1661
  taosMemoryFree(ppCf);
1,363✔
1662
  return code;
1,363✔
1663
}
1664

1665
int32_t streamBackendDoCheckpoint(void* arg, int64_t chkpId, int64_t processVer) {
1,363✔
1666
  return taskDbDoCheckpoint(arg, chkpId, processVer);
1,363✔
1667
}
1668

1669
SListNode* streamBackendAddCompare(void* backend, void* arg) {
×
1670
  SBackendWrapper* pHandle = (SBackendWrapper*)backend;
×
1671
  SListNode*       node = NULL;
×
1672
  streamMutexLock(&pHandle->mutex);
×
1673
  node = tdListAdd(pHandle->list, arg);
×
1674
  streamMutexUnlock(&pHandle->mutex);
×
1675
  return node;
×
1676
}
1677
void streamBackendDelCompare(void* backend, void* arg) {
×
1678
  SBackendWrapper* pHandle = (SBackendWrapper*)backend;
×
1679
  SListNode*       node = NULL;
×
1680
  streamMutexLock(&pHandle->mutex);
×
1681
  node = tdListPopNode(pHandle->list, arg);
×
1682
  streamMutexUnlock(&pHandle->mutex);
×
1683
  if (node) {
×
1684
    streamStateDestroyCompar(node->data);
×
1685
    taosMemoryFree(node);
×
1686
  }
1687
}
×
1688
#ifdef BUILD_NO_CALL
1689
void streamStateDestroy_rocksdb(SStreamState* pState, bool remove) { streamStateCloseBackend(pState, remove); }
1690
#endif
1691
void destroyRocksdbCfInst(RocksdbCfInst* inst) {
×
1692
  int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
×
1693
  if (inst->pHandle) {
×
1694
    for (int i = 0; i < cfLen; i++) {
×
1695
      if (inst->pHandle[i]) rocksdb_column_family_handle_destroy((inst->pHandle)[i]);
×
1696
    }
1697
    taosMemoryFree(inst->pHandle);
×
1698
  }
1699

1700
  if (inst->cfOpt) {
×
1701
    for (int i = 0; i < cfLen; i++) {
×
1702
      rocksdb_options_destroy(inst->cfOpt[i]);
×
1703
      rocksdb_block_based_options_destroy(((RocksdbCfParam*)inst->param)[i].tableOpt);
×
1704
    }
1705
    taosMemoryFreeClear(inst->cfOpt);
×
1706
    taosMemoryFreeClear(inst->param);
×
1707
  }
1708
  if (inst->wOpt) rocksdb_writeoptions_destroy(inst->wOpt);
×
1709
  if (inst->rOpt) rocksdb_readoptions_destroy(inst->rOpt);
×
1710

1711
  taosMemoryFree(inst);
×
1712
}
×
1713

1714
// |key|-----value------|
1715
// |key|ttl|len|userData|
1716

1717
int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
18,156✔
1718
  int len = aLen < bLen ? aLen : bLen;
18,156✔
1719
  int ret = memcmp(aBuf, bBuf, len);
18,156✔
1720
  if (ret == 0) {
18,156✔
1721
    if (aLen < bLen)
5,433!
1722
      return -1;
×
1723
    else if (aLen > bLen)
5,433!
1724
      return 1;
×
1725
    else
1726
      return 0;
5,433✔
1727
  } else {
1728
    return ret;
12,723✔
1729
  }
1730
}
1731
int streamStateValueIsStale(char* v) {
15,509✔
1732
  int64_t ts = 0;
15,509!
1733
  TAOS_UNUSED(taosDecodeFixedI64(v, &ts));
1734
  return (ts != 0 && ts < taosGetTimestampMs()) ? 1 : 0;
15,509!
1735
}
1736
int iterValueIsStale(rocksdb_iterator_t* iter) {
7,674✔
1737
  size_t len;
1738
  char*  v = (char*)rocksdb_iter_value(iter, &len);
7,674✔
1739
  return streamStateValueIsStale(v);
7,674✔
1740
}
1741
int defaultKeyEncode(void* k, char* buf) {
16,425✔
1742
  int len = strlen((char*)k);
16,425✔
1743
  memcpy(buf, (char*)k, len);
16,425✔
1744
  return len;
16,425✔
1745
}
1746
int defaultKeyDecode(void* k, char* buf) {
×
1747
  int len = strlen(buf);
×
1748
  memcpy(k, buf, len);
×
1749
  return len;
×
1750
}
1751
int defaultKeyToString(void* k, char* buf) {
12,996✔
1752
  // just to debug
1753
  return sprintf(buf, "key: %s", (char*)k);
12,996✔
1754
}
1755
//
1756
//  SStateKey
1757
//  |--groupid--|---ts------|--opNum----|
1758
//  |--uint64_t-|-uint64_t--|--int64_t--|
1759
//
1760
//
1761
//
1762
int stateKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
745,913✔
1763
  SStateKey key1, key2;
1764
  memset(&key1, 0, sizeof(key1));
745,913✔
1765
  memset(&key2, 0, sizeof(key2));
745,913✔
1766

1767
  char* p1 = (char*)aBuf;
745,913✔
1768
  char* p2 = (char*)bBuf;
745,913✔
1769

1770
  p1 = taosDecodeFixedU64(p1, &key1.key.groupId);
745,913✔
1771
  p2 = taosDecodeFixedU64(p2, &key2.key.groupId);
745,913✔
1772

1773
  p1 = taosDecodeFixedI64(p1, &key1.key.ts);
745,913✔
1774
  p2 = taosDecodeFixedI64(p2, &key2.key.ts);
745,913✔
1775

1776
  TAOS_UNUSED(taosDecodeFixedI64(p1, &key1.opNum));
1777
  TAOS_UNUSED(taosDecodeFixedI64(p2, &key2.opNum));
1778

1779
  return stateKeyCmpr(&key1, sizeof(key1), &key2, sizeof(key2));
745,913✔
1780
}
1781

1782
int stateKeyEncode(void* k, char* buf) {
126,967✔
1783
  SStateKey* key = k;
126,967✔
1784
  int        len = 0;
126,967✔
1785
  len += taosEncodeFixedU64((void**)&buf, key->key.groupId);
126,967!
1786
  len += taosEncodeFixedI64((void**)&buf, key->key.ts);
126,967!
1787
  len += taosEncodeFixedI64((void**)&buf, key->opNum);
126,967!
1788
  return len;
126,967✔
1789
}
1790
int stateKeyDecode(void* k, char* buf) {
9✔
1791
  SStateKey* key = k;
9✔
1792
  int        len = 0;
9✔
1793
  char*      p = buf;
9✔
1794
  p = taosDecodeFixedU64(p, &key->key.groupId);
9!
1795
  p = taosDecodeFixedI64(p, &key->key.ts);
9!
1796
  p = taosDecodeFixedI64(p, &key->opNum);
9!
1797
  return p - buf;
9✔
1798
}
1799

1800
int stateKeyToString(void* k, char* buf) {
123,651✔
1801
  SStateKey* key = k;
123,651✔
1802
  int        n = 0;
123,651✔
1803
  n += sprintf(buf + n, "[groupId:%" PRIu64 ",", key->key.groupId);
123,651✔
1804
  n += sprintf(buf + n, "ts:%" PRIi64 ",", key->key.ts);
123,651✔
1805
  n += sprintf(buf + n, "opNum:%" PRIi64 "]", key->opNum);
123,651✔
1806
  return n;
123,651✔
1807
}
1808

1809
//
1810
// SStateSessionKey
1811
//  |-----------SSessionKey----------|
1812
//  |-----STimeWindow-----|
1813
//  |---skey--|---ekey----|--groupId-|--opNum--|
1814
//  |---int64-|--int64_t--|--uint64--|--int64_t|
1815
// |
1816
//
1817
int stateSessionKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
186,193✔
1818
  SStateSessionKey w1, w2;
1819
  memset(&w1, 0, sizeof(w1));
186,193✔
1820
  memset(&w2, 0, sizeof(w2));
186,193✔
1821

1822
  char* p1 = (char*)aBuf;
186,193✔
1823
  char* p2 = (char*)bBuf;
186,193✔
1824

1825
  p1 = taosDecodeFixedI64(p1, &w1.key.win.skey);
186,193✔
1826
  p2 = taosDecodeFixedI64(p2, &w2.key.win.skey);
186,193✔
1827

1828
  p1 = taosDecodeFixedI64(p1, &w1.key.win.ekey);
186,193✔
1829
  p2 = taosDecodeFixedI64(p2, &w2.key.win.ekey);
186,193✔
1830

1831
  p1 = taosDecodeFixedU64(p1, &w1.key.groupId);
186,193✔
1832
  p2 = taosDecodeFixedU64(p2, &w2.key.groupId);
186,193✔
1833

1834
  p1 = taosDecodeFixedI64(p1, &w1.opNum);
186,193✔
1835
  p2 = taosDecodeFixedI64(p2, &w2.opNum);
186,193✔
1836

1837
  return stateSessionKeyCmpr(&w1, sizeof(w1), &w2, sizeof(w2));
186,193✔
1838
}
1839
int stateSessionKeyEncode(void* k, char* buf) {
7,017✔
1840
  SStateSessionKey* sess = k;
7,017✔
1841
  int               len = 0;
7,017✔
1842
  len += taosEncodeFixedI64((void**)&buf, sess->key.win.skey);
7,017!
1843
  len += taosEncodeFixedI64((void**)&buf, sess->key.win.ekey);
7,017!
1844
  len += taosEncodeFixedU64((void**)&buf, sess->key.groupId);
7,017!
1845
  len += taosEncodeFixedI64((void**)&buf, sess->opNum);
7,017!
1846
  return len;
7,017✔
1847
}
1848
int stateSessionKeyDecode(void* k, char* buf) {
3,044✔
1849
  SStateSessionKey* sess = k;
3,044✔
1850
  int               len = 0;
3,044✔
1851

1852
  char* p = buf;
3,044✔
1853
  p = taosDecodeFixedI64(p, &sess->key.win.skey);
3,044!
1854
  p = taosDecodeFixedI64(p, &sess->key.win.ekey);
3,044!
1855
  p = taosDecodeFixedU64(p, &sess->key.groupId);
3,044!
1856
  p = taosDecodeFixedI64(p, &sess->opNum);
3,044!
1857
  return p - buf;
3,044✔
1858
}
1859
int stateSessionKeyToString(void* k, char* buf) {
938✔
1860
  SStateSessionKey* key = k;
938✔
1861
  int               n = 0;
938✔
1862
  n += sprintf(buf + n, "[skey:%" PRIi64 ",", key->key.win.skey);
938✔
1863
  n += sprintf(buf + n, "ekey:%" PRIi64 ",", key->key.win.ekey);
938✔
1864
  n += sprintf(buf + n, "groupId:%" PRIu64 ",", key->key.groupId);
938✔
1865
  n += sprintf(buf + n, "opNum:%" PRIi64 "]", key->opNum);
938✔
1866
  return n;
938✔
1867
}
1868

1869
/**
1870
 *  SWinKey
1871
 *  |------groupId------|-----ts------|
1872
 *  |------uint64-------|----int64----|
1873
 */
1874
int winKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
59,951✔
1875
  SWinKey w1, w2;
1876
  memset(&w1, 0, sizeof(w1));
59,951✔
1877
  memset(&w2, 0, sizeof(w2));
59,951✔
1878

1879
  char* p1 = (char*)aBuf;
59,951✔
1880
  char* p2 = (char*)bBuf;
59,951!
1881

1882
  p1 = taosDecodeFixedU64(p1, &w1.groupId);
59,951!
1883
  p2 = taosDecodeFixedU64(p2, &w2.groupId);
59,951!
1884

1885
  p1 = taosDecodeFixedI64(p1, &w1.ts);
59,951✔
1886
  p2 = taosDecodeFixedI64(p2, &w2.ts);
59,951✔
1887

1888
  return winKeyCmpr(&w1, sizeof(w1), &w2, sizeof(w2));
59,951✔
1889
}
1890

1891
int winKeyEncode(void* k, char* buf) {
5,292✔
1892
  SWinKey* key = k;
5,292✔
1893
  int      len = 0;
5,292✔
1894
  len += taosEncodeFixedU64((void**)&buf, key->groupId);
5,292!
1895
  len += taosEncodeFixedI64((void**)&buf, key->ts);
5,292!
1896
  return len;
5,292✔
1897
}
1898

1899
int winKeyDecode(void* k, char* buf) {
4,375✔
1900
  SWinKey* key = k;
4,375✔
1901
  int      len = 0;
4,375✔
1902
  char*    p = buf;
4,375✔
1903
  p = taosDecodeFixedU64(p, &key->groupId);
4,375!
1904
  p = taosDecodeFixedI64(p, &key->ts);
4,375!
1905
  return len;
4,375✔
1906
}
1907

1908
int winKeyToString(void* k, char* buf) {
958✔
1909
  SWinKey* key = k;
958✔
1910
  int      n = 0;
958✔
1911
  n += sprintf(buf + n, "[groupId:%" PRIu64 ",", key->groupId);
958✔
1912
  n += sprintf(buf + n, "ts:%" PRIi64 "]", key->ts);
958✔
1913
  return n;
958✔
1914
}
1915
/*
1916
 * STupleKey
1917
 * |---groupId---|---ts---|---exprIdx---|
1918
 * |---uint64--|---int64--|---int32-----|
1919
 */
1920
int tupleKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
×
1921
  STupleKey w1, w2;
1922
  memset(&w1, 0, sizeof(w1));
×
1923
  memset(&w2, 0, sizeof(w2));
×
1924

1925
  char* p1 = (char*)aBuf;
×
1926
  char* p2 = (char*)bBuf;
×
1927

1928
  p1 = taosDecodeFixedU64(p1, &w1.groupId);
×
1929
  p2 = taosDecodeFixedU64(p2, &w2.groupId);
×
1930

1931
  p1 = taosDecodeFixedI64(p1, &w1.ts);
×
1932
  p2 = taosDecodeFixedI64(p2, &w2.ts);
×
1933

1934
  p1 = taosDecodeFixedI32(p1, &w1.exprIdx);
×
1935
  p2 = taosDecodeFixedI32(p2, &w2.exprIdx);
×
1936

1937
  return STupleKeyCmpr(&w1, sizeof(w1), &w2, sizeof(w2));
×
1938
}
1939

1940
int tupleKeyEncode(void* k, char* buf) {
×
1941
  STupleKey* key = k;
×
1942
  int        len = 0;
×
1943
  len += taosEncodeFixedU64((void**)&buf, key->groupId);
×
1944
  len += taosEncodeFixedI64((void**)&buf, key->ts);
×
1945
  len += taosEncodeFixedI32((void**)&buf, key->exprIdx);
×
1946
  return len;
×
1947
}
1948
int tupleKeyDecode(void* k, char* buf) {
×
1949
  STupleKey* key = k;
×
1950
  int        len = 0;
×
1951
  char*      p = buf;
×
1952
  p = taosDecodeFixedU64(p, &key->groupId);
×
1953
  p = taosDecodeFixedI64(p, &key->ts);
×
1954
  p = taosDecodeFixedI32(p, &key->exprIdx);
×
1955
  return len;
×
1956
}
1957
int tupleKeyToString(void* k, char* buf) {
×
1958
  int        n = 0;
×
1959
  STupleKey* key = k;
×
1960
  n += sprintf(buf + n, "[groupId:%" PRIu64 ",", key->groupId);
×
1961
  n += sprintf(buf + n, "ts:%" PRIi64 ",", key->ts);
×
1962
  n += sprintf(buf + n, "exprIdx:%d]", key->exprIdx);
×
1963
  return n;
×
1964
}
1965

1966
int parKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
9,043✔
1967
  int64_t w1, w2;
1968
  memset(&w1, 0, sizeof(w1));
1969
  memset(&w2, 0, sizeof(w2));
1970
  char* p1 = (char*)aBuf;
9,043✔
1971
  char* p2 = (char*)bBuf;
9,043!
1972

1973
  TAOS_UNUSED(taosDecodeFixedI64(p1, &w1));
1974
  TAOS_UNUSED(taosDecodeFixedI64(p2, &w2));
1975
  if (w1 == w2) {
9,043✔
1976
    return 0;
613✔
1977
  } else {
1978
    return w1 < w2 ? -1 : 1;
8,430✔
1979
  }
1980
}
1981
int parKeyEncode(void* k, char* buf) {
476,562✔
1982
  int64_t* groupid = k;
476,562✔
1983
  int      len = taosEncodeFixedI64((void**)&buf, *groupid);
476,562!
1984
  return len;
476,562✔
1985
}
1986
int parKeyDecode(void* k, char* buf) {
237✔
1987
  char*    p = buf;
237✔
1988
  int64_t* groupid = k;
237!
1989

1990
  p = taosDecodeFixedI64(p, groupid);
237✔
1991
  return p - buf;
237✔
1992
}
1993
int parKeyToString(void* k, char* buf) {
5,695✔
1994
  int64_t* key = k;
5,695✔
1995
  int      n = 0;
5,695✔
1996
  n = sprintf(buf + n, "[groupId:%" PRIi64 "]", *key);
5,695✔
1997
  return n;
5,695✔
1998
}
1999
int32_t valueToString(void* k, char* buf) {
×
2000
  SStreamValue* key = k;
×
2001
  int           n = 0;
×
2002
  n += sprintf(buf + n, "[unixTimestamp:%" PRIi64 ",", key->unixTimestamp);
×
2003
  n += sprintf(buf + n, "len:%d,", key->len);
×
2004
  n += sprintf(buf + n, "data:%s]", key->data);
×
2005
  return n;
×
2006
}
2007

2008
/*1: stale, 0: no stale*/
2009
int32_t valueIsStale(void* k, int64_t ts) {
×
2010
  SStreamValue* key = k;
×
2011
  if (key->unixTimestamp < ts) {
×
2012
    return 1;
×
2013
  }
2014
  return 0;
×
2015
}
2016

2017
void destroyCompare(void* arg) {
18,400✔
2018
  TAOS_UNUSED(arg);
2019
  return;
18,400✔
2020
}
2021

2022
int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest) {
125,346✔
2023
  int32_t      code = 0;
125,346✔
2024
  SStreamValue key = {.unixTimestamp = ttl, .len = vlen, .rawLen = vlen, .compress = 0, .data = (char*)(value)};
125,346✔
2025
  int32_t      len = 0;
125,346✔
2026
  char*        dst = NULL;
125,346✔
2027
  if (vlen > 512) {
125,346✔
2028
    dst = taosMemoryCalloc(1, vlen + 128);
1,729✔
2029
    if (dst == NULL) {
1,729!
2030
      return terrno;
×
2031
    }
2032
    int32_t dstCap = vlen + 128;
1,729✔
2033
    int32_t compressedSize = LZ4_compress_default((char*)value, dst, vlen, dstCap);
1,729✔
2034
    if (compressedSize < vlen) {
1,729!
2035
      key.compress = 1;
1,729✔
2036
      key.len = compressedSize;
1,729✔
2037
      value = dst;
1,729✔
2038
    }
2039
  }
2040

2041
  if (*dest == NULL) {
125,346✔
2042
    size_t size = sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len;
10,958✔
2043
    char*  p = taosMemoryCalloc(1, size);
10,958✔
2044
    if (p == NULL) {
10,958!
2045
      code = terrno;
×
2046
      goto _exception;
×
2047
    }
2048
    char* buf = p;
10,958✔
2049
    len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp);
10,958!
2050
    len += taosEncodeFixedI32((void**)&buf, key.len);
10,958!
2051
    len += taosEncodeFixedI32((void**)&buf, key.rawLen);
10,958!
2052
    len += taosEncodeFixedI8((void**)&buf, key.compress);
10,958!
2053
    if (value != NULL && key.len != 0) {
10,958✔
2054
      len += taosEncodeBinary((void**)&buf, (char*)value, key.len);
14,708!
2055
    }
2056
    *dest = p;
10,958✔
2057
  } else {
2058
    char* buf = *dest;
114,388✔
2059
    len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp);
114,388!
2060
    len += taosEncodeFixedI32((void**)&buf, key.len);
114,388!
2061
    len += taosEncodeFixedI32((void**)&buf, key.rawLen);
114,388!
2062
    len += taosEncodeFixedI8((void**)&buf, key.compress);
114,388!
2063
    if (value != NULL && key.len != 0) {
114,388!
2064
      len += taosEncodeBinary((void**)&buf, (char*)value, key.len);
228,776!
2065
    }
2066
  }
2067

2068
  taosMemoryFree(dst);
125,346✔
2069
  return len;
125,346✔
2070
_exception:
×
2071
  taosMemoryFree(dst);
×
2072
  return code;
×
2073
}
2074

2075
/*
2076
 *  ret >= 0 : found valid value
2077
 *  ret < 0 : error or timeout
2078
 */
2079
int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest) {
7,564✔
2080
  int32_t      code = -1;
7,564✔
2081
  SStreamValue key = {0};
7,564✔
2082
  char*        p = value;
7,564✔
2083

2084
  char* pCompressData = NULL;
7,564✔
2085
  char* pOutput = NULL;
7,564✔
2086
  if (streamStateValueIsStale(p)) {
7,564!
2087
    code = TSDB_CODE_INVALID_DATA_FMT;
×
2088
    goto _EXCEPT;
×
2089
  }
2090

2091
  p = taosDecodeFixedI64(p, &key.unixTimestamp);
7,564!
2092
  p = taosDecodeFixedI32(p, &key.len);
7,564✔
2093
  if (key.len == 0) {
7,564!
2094
    code = 0;
×
2095
    goto _EXCEPT;
×
2096
  }
2097
  if (vlen == (sizeof(key.unixTimestamp) + sizeof(key.len) + key.len)) {
7,564!
2098
    // compatiable with previous data
2099
    p = taosDecodeBinary(p, (void**)&pOutput, key.len);
×
2100
    if (p == NULL) {
×
2101
      code = terrno;
×
2102
      goto _EXCEPT;
×
2103
    }
2104

2105
  } else {
2106
    p = taosDecodeFixedI32(p, &key.rawLen);
7,564✔
2107
    p = taosDecodeFixedI8(p, &key.compress);
7,564✔
2108
    if (vlen != (sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len)) {
7,564!
2109
      stError("vlen: %d, read len: %d", vlen, key.len);
×
2110
      code = TSDB_CODE_INVALID_DATA_FMT;
×
2111
      goto _EXCEPT;
×
2112
    }
2113
    if (key.compress == 1) {
7,564✔
2114
      p = taosDecodeBinary(p, (void**)&pCompressData, key.len);
2,364✔
2115
      if (p == NULL) {
2,364!
2116
        code = terrno;
×
2117
        goto _EXCEPT;
×
2118
      }
2119
      pOutput = taosMemoryCalloc(1, key.rawLen);
2,364✔
2120
      if (pOutput == NULL) {
2,363!
2121
        code = terrno;
×
2122
        goto _EXCEPT;
×
2123
      }
2124

2125
      int32_t rawLen = LZ4_decompress_safe(pCompressData, pOutput, key.len, key.rawLen);
2,363✔
2126
      if (rawLen != key.rawLen) {
2,364!
2127
        stError("read invalid read, rawlen: %d, currlen: %d", key.rawLen, key.len);
×
2128
        code = TSDB_CODE_INVALID_DATA_FMT;
×
2129
        goto _EXCEPT;
×
2130
      }
2131
      key.len = rawLen;
2,364✔
2132
    } else {
2133
      p = taosDecodeBinary(p, (void**)&pOutput, key.len);
5,200✔
2134
      if (p == NULL) {
5,200!
2135
        code = terrno;
×
2136
        goto _EXCEPT;
×
2137
      }
2138
    }
2139
  }
2140

2141
  if (ttl != NULL) *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - taosGetTimestampMs();
7,564!
2142

2143
  code = 0;
7,564✔
2144
  if (dest) {
7,564✔
2145
    *dest = pOutput;
7,270✔
2146
    pOutput = NULL;
7,270✔
2147
  }
2148
  taosMemoryFree(pCompressData);
7,564✔
2149
  taosMemoryFree(pOutput);
7,564✔
2150
  return key.len;
7,564✔
2151

2152
_EXCEPT:
×
2153
  if (dest != NULL) *dest = NULL;
×
2154
  if (ttl != NULL) *ttl = 0;
×
2155

2156
  taosMemoryFree(pOutput);
×
2157
  taosMemoryFree(pCompressData);
×
2158
  return code;
×
2159
}
2160

2161
const char* compareDefaultName(void* arg) {
43,416✔
2162
  TAOS_UNUSED(arg);
2163
  return ginitDict[0].key;
43,416✔
2164
}
2165
const char* compareStateName(void* arg) {
16,602✔
2166
  TAOS_UNUSED(arg);
2167
  return ginitDict[1].key;
16,602✔
2168
}
2169
const char* compareWinKeyName(void* arg) { return ginitDict[2].key; }
898✔
2170
const char* compareSessionKeyName(void* arg) {
2,889✔
2171
  TAOS_UNUSED(arg);
2172
  return ginitDict[3].key;
2,889✔
2173
}
2174
const char* compareFuncKeyName(void* arg) {
×
2175
  TAOS_UNUSED(arg);
2176
  return ginitDict[4].key;
×
2177
}
2178
const char* compareParKeyName(void* arg) {
9,344✔
2179
  TAOS_UNUSED(arg);
2180
  return ginitDict[5].key;
9,344✔
2181
}
2182
const char* comparePartagKeyName(void* arg) {
234✔
2183
  TAOS_UNUSED(arg);
2184
  return ginitDict[6].key;
234✔
2185
}
2186

2187
void destroyCompactFilteFactory(void* arg) {
21,032✔
2188
  if (arg == NULL) return;
21,032!
2189
}
2190
const char* compactFilteFactoryName(void* arg) {
29,591✔
2191
  SCompactFilteFactory* state = arg;
29,591✔
2192
  return "stream_compact_factory_filter_default";
29,591✔
2193
}
2194
const char* compactFilteFactoryNameSess(void* arg) {
1,494✔
2195
  SCompactFilteFactory* state = arg;
1,494✔
2196
  return "stream_compact_factory_filter_sess";
1,494✔
2197
}
2198
const char* compactFilteFactoryNameState(void* arg) {
9,043✔
2199
  SCompactFilteFactory* state = arg;
9,043✔
2200
  return "stream_compact_factory_filter_state";
9,043✔
2201
}
2202
const char* compactFilteFactoryNameFill(void* arg) {
451✔
2203
  SCompactFilteFactory* state = arg;
451✔
2204
  return "stream_compact_factory_filter_fill";
451✔
2205
}
2206
const char* compactFilteFactoryNameFunc(void* arg) {
×
2207
  SCompactFilteFactory* state = arg;
×
2208
  return "stream_compact_factory_filter_func";
×
2209
}
2210

2211
void          destroyCompactFilte(void* arg) { TAOS_UNUSED(arg); }
182✔
2212
unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
271✔
2213
                           char** newval, size_t* newvlen, unsigned char* value_changed) {
2214
  return streamStateValueIsStale((char*)val) ? 1 : 0;
271✔
2215
}
2216
const char* compactFilteName(void* arg) { return "stream_filte_default"; }
×
2217
const char* compactFilteNameSess(void* arg) { return "stream_filte_sess"; }
×
2218
const char* compactFilteNameState(void* arg) { return "stream_filte_state"; }
×
2219
const char* compactFilteNameFill(void* arg) { return "stream_filte_fill"; }
×
2220
const char* compactFilteNameFunc(void* arg) { return "stream_filte_func"; }
×
2221

2222
unsigned char compactFilteSess(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
127✔
2223
                               char** newval, size_t* newvlen, unsigned char* value_changed) {
2224
  // not impl yet
2225
  return 0;
127✔
2226
}
2227

2228
unsigned char compactFilteState(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
233✔
2229
                                char** newval, size_t* newvlen, unsigned char* value_changed) {
2230
  // not impl yet
2231
  return 0;
233✔
2232
}
2233

2234
unsigned char compactFilteFill(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
15✔
2235
                               char** newval, size_t* newvlen, unsigned char* value_changed) {
2236
  // not impl yet
2237
  return 0;
15✔
2238
}
2239

2240
unsigned char compactFilteFunc(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
×
2241
                               char** newval, size_t* newvlen, unsigned char* value_changed) {
2242
  // not impl yet
2243
  return 0;
×
2244
  // return streamStateValueIsStale((char*)val) ? 1 : 0;
2245
}
2246

2247
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx) {
101✔
2248
  SCompactFilteFactory*       state = arg;
101✔
2249
  rocksdb_compactionfilter_t* filter =
2250
      rocksdb_compactionfilter_create(state, destroyCompactFilte, compactFilte, compactFilteName);
101✔
2251
  return filter;
101✔
2252
}
2253
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterSess(void* arg, rocksdb_compactionfiltercontext_t* ctx) {
41✔
2254
  SCompactFilteFactory*       state = arg;
41✔
2255
  rocksdb_compactionfilter_t* filter =
2256
      rocksdb_compactionfilter_create(state, destroyCompactFilte, compactFilteSess, compactFilteNameSess);
41✔
2257
  return filter;
41✔
2258
}
2259
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterState(void* arg, rocksdb_compactionfiltercontext_t* ctx) {
31✔
2260
  SCompactFilteFactory*       state = arg;
31✔
2261
  rocksdb_compactionfilter_t* filter =
2262
      rocksdb_compactionfilter_create(state, destroyCompactFilte, compactFilteState, compactFilteNameState);
31✔
2263
  return filter;
31✔
2264
}
2265
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFill(void* arg, rocksdb_compactionfiltercontext_t* ctx) {
9✔
2266
  SCompactFilteFactory*       state = arg;
9✔
2267
  rocksdb_compactionfilter_t* filter =
2268
      rocksdb_compactionfilter_create(state, destroyCompactFilte, compactFilteFill, compactFilteNameFill);
9✔
2269
  return filter;
9✔
2270
}
2271
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilterFunc(void* arg, rocksdb_compactionfiltercontext_t* ctx) {
×
2272
  SCompactFilteFactory*       state = arg;
×
2273
  rocksdb_compactionfilter_t* filter =
2274
      rocksdb_compactionfilter_create(state, destroyCompactFilte, compactFilteFunc, compactFilteNameFunc);
×
2275
  return filter;
×
2276
}
2277

2278
int32_t taskDbOpenCfs(STaskDbWrapper* pTask, char* path, char** pCfNames, int32_t nCf) {
2,727✔
2279
  int32_t code = -1;
2,727✔
2280
  char*   err = NULL;
2,727✔
2281

2282
  rocksdb_options_t** cfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*));
2,727✔
2283
  if (cfOpts == NULL) {
2,727!
2284
    return terrno;
×
2285
  }
2286
  rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
2,727✔
2287
  if (cfHandle == NULL) {
2,727!
2288
    return terrno;
×
2289
  }
2290

2291
  for (int i = 0; i < nCf; i++) {
5,486✔
2292
    int32_t idx = getCfIdx(pCfNames[i]);
2,759✔
2293
    cfOpts[i] = pTask->pCfOpts[idx];
2,759✔
2294
  }
2295

2296
  rocksdb_t* db = rocksdb_open_column_families(pTask->dbOpt, path, nCf, (const char* const*)pCfNames,
2,727✔
2297
                                               (const rocksdb_options_t* const*)cfOpts, cfHandle, &err);
2298

2299
  if (err != NULL) {
2,727!
2300
    stError("failed to open cf path: %s", err);
×
2301
    taosMemoryFree(err);
×
2302
    goto _EXIT;
×
2303
  }
2304

2305
  for (int i = 0; i < nCf; i++) {
5,486✔
2306
    int32_t idx = getCfIdx(pCfNames[i]);
2,759✔
2307
    pTask->pCf[idx] = cfHandle[i];
2,759✔
2308
  }
2309

2310
  pTask->db = db;
2,727✔
2311
  code = 0;
2,727✔
2312

2313
_EXIT:
2,727✔
2314
  taosMemoryFree(cfOpts);
2,727✔
2315
  taosMemoryFree(cfHandle);
2,727✔
2316
  return code;
2,727✔
2317
}
2318

2319
void* taskDbAddRef(void* pTaskDb) {
1,404✔
2320
  STaskDbWrapper* pBackend = pTaskDb;
1,404✔
2321
  return taosAcquireRef(taskDbWrapperId, pBackend->refId);
1,404✔
2322
}
2323

2324
void taskDbRemoveRef(void* pTaskDb) {
4,033✔
2325
  if (pTaskDb == NULL) {
4,033!
2326
    return;
×
2327
  }
2328

2329
  STaskDbWrapper* pBackend = pTaskDb;
4,033✔
2330
  TAOS_UNUSED(taosReleaseRef(taskDbWrapperId, pBackend->refId));
4,033✔
2331
}
2332

2333
void taskDbSetClearFileFlag(void* pTaskDb) {
1,703✔
2334
  if (pTaskDb == NULL) {
1,703!
2335
    return;
×
2336
  }
2337

2338
  STaskDbWrapper* pBackend = pTaskDb;
1,703✔
2339
  atomic_store_8(&pBackend->removeAllFiles, 1);
1,703✔
2340
}
2341

2342
void taskDbInitOpt(STaskDbWrapper* pTaskDb) {
2,727✔
2343
  rocksdb_env_t* env = rocksdb_create_default_env();
2,727✔
2344

2345
  rocksdb_cache_t*   cache = rocksdb_cache_create_lru(256);
2,727✔
2346
  rocksdb_options_t* opts = rocksdb_options_create();
2,727✔
2347
  rocksdb_options_set_env(opts, env);
2,727✔
2348
  rocksdb_options_set_create_if_missing(opts, 1);
2,726✔
2349
  rocksdb_options_set_create_missing_column_families(opts, 1);
2,727✔
2350
  // rocksdb_options_set_max_total_wal_size(opts, dbMemLimit);
2351
  // rocksdb_options_set_ecycle_log_file_num(opts, 6);
2352
  rocksdb_options_set_max_write_buffer_number(opts, 3);
2,727✔
2353
  rocksdb_options_set_info_log_level(opts, 1);
2,726✔
2354
  rocksdb_options_set_db_write_buffer_size(opts, 256 << 20);
2,727✔
2355
  rocksdb_options_set_write_buffer_size(opts, 128 << 20);
2,727✔
2356
  rocksdb_options_set_atomic_flush(opts, 1);
2,727✔
2357

2358
  pTaskDb->dbOpt = opts;
2,727✔
2359
  pTaskDb->env = env;
2,727✔
2360
  pTaskDb->cache = cache;
2,727✔
2361
  pTaskDb->filterFactory = rocksdb_compactionfilterfactory_create(
2,727✔
2362
      NULL, destroyCompactFilteFactory, compactFilteFactoryCreateFilter, compactFilteFactoryName);
2363
  rocksdb_options_set_compaction_filter_factory(pTaskDb->dbOpt, pTaskDb->filterFactory);
2,726✔
2364
  pTaskDb->readOpt = rocksdb_readoptions_create();
2,727✔
2365
  pTaskDb->writeOpt = rocksdb_writeoptions_create();
2,727✔
2366
  rocksdb_writeoptions_disable_WAL(pTaskDb->writeOpt, 1);
2,726✔
2367

2368
  size_t nCf = sizeof(ginitDict) / sizeof(ginitDict[0]);
2,727✔
2369
  pTaskDb->pCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
2,727✔
2370
  pTaskDb->pCfParams = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam));
2,727✔
2371
  pTaskDb->pCfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*));
2,727✔
2372
  pTaskDb->pCompares = taosMemoryCalloc(nCf, sizeof(rocksdb_comparator_t*));
2,727✔
2373
  if (pTaskDb->pCf == NULL || pTaskDb->pCfParams == NULL || pTaskDb->pCfOpts == NULL || pTaskDb->pCompares == NULL) {
2,727!
2374
    stError("failed to alloc memory for cf");
1!
2375
    taosMemoryFreeClear(pTaskDb->pCf);
×
2376
    taosMemoryFreeClear(pTaskDb->pCfParams);
×
2377
    taosMemoryFreeClear(pTaskDb->pCfOpts);
×
2378
    taosMemoryFreeClear(pTaskDb->pCompares);
×
2379
    return;
×
2380
  }
2381

2382
  for (int i = 0; i < nCf; i++) {
21,814✔
2383
    rocksdb_options_t*                   opt = rocksdb_options_create_copy(pTaskDb->dbOpt);
19,087✔
2384
    rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create();
19,089✔
2385
    rocksdb_block_based_options_set_block_cache(tableOpt, pTaskDb->cache);
19,089✔
2386
    rocksdb_block_based_options_set_partition_filters(tableOpt, 1);
19,089✔
2387

2388
    rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15);
19,089✔
2389
    rocksdb_block_based_options_set_filter_policy(tableOpt, filter);
19,089✔
2390

2391
    rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)opt, tableOpt);
19,089✔
2392

2393
    SCfInit* cfPara = &ginitDict[i];
19,089✔
2394

2395
    rocksdb_comparator_t* compare =
2396
        rocksdb_comparator_create(NULL, cfPara->destroyCmp, cfPara->cmpKey, cfPara->cmpName);
19,089✔
2397
    rocksdb_options_set_comparator((rocksdb_options_t*)opt, compare);
19,089✔
2398

2399
    rocksdb_compactionfilterfactory_t* filterFactory =
2400
        rocksdb_compactionfilterfactory_create(NULL, cfPara->destroyFilter, cfPara->createFilter, cfPara->funcName);
19,087✔
2401
    rocksdb_options_set_compaction_filter_factory(opt, filterFactory);
19,089✔
2402

2403
    pTaskDb->pCompares[i] = compare;
19,088✔
2404
    pTaskDb->pCfOpts[i] = opt;
19,088✔
2405
    pTaskDb->pCfParams[i].tableOpt = tableOpt;
19,088✔
2406
  }
2407
  return;
2,727✔
2408
}
2409
void taskDbInitChkpOpt(STaskDbWrapper* pTaskDb) {
2,727✔
2410
  pTaskDb->chkpId = -1;
2,727✔
2411
  pTaskDb->chkpCap = 4;
2,727✔
2412
  pTaskDb->chkpSaved = taosArrayInit(4, sizeof(int64_t));
2,727✔
2413
  TAOS_UNUSED(taskDbLoadChkpInfo(pTaskDb));
2,727✔
2414

2415
  pTaskDb->chkpInUse = taosArrayInit(4, sizeof(int64_t));
2,727✔
2416

2417
  TAOS_UNUSED(taosThreadRwlockInit(&pTaskDb->chkpDirLock, NULL));
2,727✔
2418
}
2,727✔
2419

2420
void taskDbRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) {
×
2421
  TAOS_UNUSED(taosThreadRwlockWrlock(&pTaskDb->chkpDirLock));
×
2422
  if (taosArrayPush(pTaskDb->chkpInUse, &chkp) == NULL) {
×
2423
    stError("failed to push chkp: %" PRIi64 " into inuse", chkp);
×
2424
  }
2425
  taosArraySort(pTaskDb->chkpInUse, chkpIdComp);
×
2426
  TAOS_UNUSED(taosThreadRwlockUnlock(&pTaskDb->chkpDirLock));
×
2427
}
×
2428

2429
void taskDbUnRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) {
×
2430
  TAOS_UNUSED(taosThreadRwlockWrlock(&pTaskDb->chkpDirLock));
×
2431
  int32_t size = taosArrayGetSize(pTaskDb->chkpInUse);
×
2432
  for (int i = 0; i < size; i++) {
×
2433
    int64_t* p = taosArrayGet(pTaskDb->chkpInUse, i);
×
2434
    if (*p == chkp) {
×
2435
      taosArrayRemove(pTaskDb->chkpInUse, i);
×
2436
      break;
×
2437
    }
2438
  }
2439
  TAOS_UNUSED(taosThreadRwlockUnlock(&pTaskDb->chkpDirLock));
×
2440
}
×
2441

2442
void taskDbDestroyChkpOpt(STaskDbWrapper* pTaskDb) {
2,629✔
2443
  taosArrayDestroy(pTaskDb->chkpSaved);
2,629✔
2444
  taosArrayDestroy(pTaskDb->chkpInUse);
2,629✔
2445
  TAOS_UNUSED(taosThreadRwlockDestroy(&pTaskDb->chkpDirLock));
2,629✔
2446
}
2,629✔
2447

2448
int32_t taskDbBuildFullPath(char* path, char* key, char** dbFullPath, char** stateFullPath) {
×
2449
  int32_t code = 0;
×
2450
  char*   statePath = taosMemoryCalloc(1, strlen(path) + 128);
×
2451
  if (statePath == NULL) {
×
2452
    return terrno;
×
2453
  }
2454

2455
  sprintf(statePath, "%s%s%s", path, TD_DIRSEP, key);
×
2456
  if (!taosDirExist(statePath)) {
×
2457
    code = taosMulMkDir(statePath);
×
2458
    if (code != 0) {
×
2459
      code = TAOS_SYSTEM_ERROR(errno);
×
2460
      stError("failed to create dir: %s, reason:%s", statePath, tstrerror(code));
×
2461
      taosMemoryFree(statePath);
×
2462
      return code;
×
2463
    }
2464
  }
2465

2466
  char* dbPath = taosMemoryCalloc(1, strlen(statePath) + 128);
×
2467
  if (dbPath == NULL) {
×
2468
    taosMemoryFree(statePath);
×
2469
    return terrno;
×
2470
  }
2471

2472
  sprintf(dbPath, "%s%s%s", statePath, TD_DIRSEP, "state");
×
2473
  if (!taosDirExist(dbPath)) {
×
2474
    code = taosMulMkDir(dbPath);
×
2475
    if (code != 0) {
×
2476
      code = TAOS_SYSTEM_ERROR(errno);
×
2477
      stError("failed to create dir: %s, reason:%s", dbPath, tstrerror(code));
×
2478
      taosMemoryFree(statePath);
×
2479
      taosMemoryFree(dbPath);
×
2480
      return code;
×
2481
    }
2482
  }
2483

2484
  *dbFullPath = dbPath;
×
2485
  *stateFullPath = statePath;
×
2486
  return 0;
×
2487
}
2488

2489
void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId) {
×
2490
  STaskDbWrapper* p = pTaskDb;
×
2491
  TAOS_UNUSED(streamMutexLock(&p->mutex));
×
2492
  p->chkpId = chkpId;
×
2493
  TAOS_UNUSED(streamMutexUnlock(&p->mutex));
×
2494
}
×
2495

2496
STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) {
2,727✔
2497
  char*   err = NULL;
2,727✔
2498
  char**  cfNames = NULL;
2,727✔
2499
  size_t  nCf = 0;
2,727✔
2500
  int32_t code = 0;
2,727✔
2501
  int32_t lino = 0;
2,727✔
2502

2503
  STaskDbWrapper* pTaskDb = taosMemoryCalloc(1, sizeof(STaskDbWrapper));
2,727✔
2504
  TSDB_CHECK_NULL(pTaskDb, code, lino, _EXIT, terrno);
2,727!
2505

2506
  pTaskDb->idstr = key ? taosStrdup(key) : NULL;
2,727!
2507
  pTaskDb->path = statePath ? taosStrdup(statePath) : NULL;
2,727!
2508

2509
  code = taosThreadMutexInit(&pTaskDb->mutex, NULL);
2,727✔
2510
  TSDB_CHECK_CODE(code, lino, _EXIT);
2,727!
2511

2512
  taskDbInitChkpOpt(pTaskDb);
2,727✔
2513
  taskDbInitOpt(pTaskDb);
2,727✔
2514

2515
  cfNames = rocksdb_list_column_families(pTaskDb->dbOpt, dbPath, &nCf, &err);
2,727✔
2516
  if (nCf == 0) {
2,727✔
2517
    stInfo("%s newly create db in state-backend", key);
2,710!
2518
    // pre create db
2519
    pTaskDb->db = rocksdb_open(pTaskDb->pCfOpts[0], dbPath, &err);
2,710✔
2520
    if (pTaskDb->db == NULL) {
2,710!
2521
      stError("%s open state-backend failed, reason:%s", key, err);
×
2522
      code = TSDB_CODE_STREAM_INTERNAL_ERROR;
×
2523
      goto _EXIT;
×
2524
    }
2525

2526
    rocksdb_close(pTaskDb->db);
2,710✔
2527
    pTaskDb->db = NULL;
2,710✔
2528

2529
    if (cfNames != NULL) {
2,710!
2530
      rocksdb_list_column_families_destroy(cfNames, nCf);
2,710✔
2531
    }
2532

2533
    taosMemoryFree(err);
2,710✔
2534
    err = NULL;
2,710✔
2535

2536
    cfNames = rocksdb_list_column_families(pTaskDb->dbOpt, dbPath, &nCf, &err);
2,710✔
2537
    if (err != NULL) {
2,710!
2538
      stError("%s failed to create column-family, %s, %" PRIzu ", reason:%s", key, dbPath, nCf, err);
×
2539
      code = TSDB_CODE_STREAM_INTERNAL_ERROR;
×
2540
      goto _EXIT;
×
2541
    }
2542
  }
2543

2544
  if ((code = taskDbOpenCfs(pTaskDb, dbPath, cfNames, nCf)) != 0) {
2,727!
2545
    goto _EXIT;
×
2546
  }
2547

2548
  if (cfNames != NULL) {
2,727!
2549
    rocksdb_list_column_families_destroy(cfNames, nCf);
2,727✔
2550
    cfNames = NULL;
2,727✔
2551
  }
2552

2553
  stDebug("init s-task backend in:%s, backend:%p, %s", dbPath, pTaskDb, key);
2,727✔
2554
  return pTaskDb;
2,727✔
2555

2556
_EXIT:
×
2557
  stError("%s taskDb open failed, %s at line:%d code:%s", key, __func__, lino, tstrerror(code));
×
2558

2559
  taskDbDestroy(pTaskDb, false);
×
2560
  if (err) taosMemoryFree(err);
×
2561
  if (cfNames) rocksdb_list_column_families_destroy(cfNames, nCf);
×
2562
  return NULL;
×
2563
}
2564

2565
int32_t taskDbOpen(const char* path, const char* key, int64_t chkptId, int64_t* processVer, STaskDbWrapper** ppTaskDb) {
2,726✔
2566
  char* statePath = NULL;
2,726✔
2567
  char* dbPath = NULL;
2,726✔
2568
  int   code = 0;
2,726✔
2569
  if ((code = restoreCheckpointData(path, key, chkptId, &statePath, &dbPath, processVer)) < 0) {
2,726!
2570
    stError("failed to restore checkpoint data, path:%s, key:%s, checkpointId: %" PRId64 "reason:%s", path, key,
×
2571
            chkptId, tstrerror(code));
2572
    return code;
×
2573
  }
2574

2575
  STaskDbWrapper* pTaskDb = taskDbOpenImpl(key, statePath, dbPath);
2,727✔
2576
  if (pTaskDb != NULL) {
2,727!
2577
    int64_t chkpId = -1, ver = -1;
2,727✔
2578
    if ((code = chkpLoadExtraInfo(dbPath, &chkpId, &ver)) == 0) {
2,727!
2579
      *processVer = ver;
2,727✔
2580
    } else {
2581
      stError("failed to load extra info, path:%s, key:%s, checkpointId: %" PRId64 "reason:%s", path, key, chkptId,
×
2582
              tstrerror(code));
2583
      taskDbDestroy(pTaskDb, false);
×
2584
      return code;
×
2585
    }
2586
  } else {
2587
    code = TSDB_CODE_INVALID_PARA;
×
2588
  }
2589

2590
  taosMemoryFree(dbPath);
2,727✔
2591
  taosMemoryFree(statePath);
2,727✔
2592
  *ppTaskDb = pTaskDb;
2,727✔
2593
  return code;
2,727✔
2594
}
2595

2596
void taskDbDestroy(void* pDb, bool flush) {
2,629✔
2597
  STaskDbWrapper* wrapper = pDb;
2,629✔
2598
  if (wrapper == NULL) return;
2,629!
2599

2600
  streamMetaRemoveDB(wrapper->pMeta, wrapper->idstr);
2,629✔
2601

2602
  stDebug("succ to destroy stream backend:%p", wrapper);
2,629✔
2603

2604
  int8_t nCf = tListLen(ginitDict);
2,629✔
2605
  if (flush && wrapper->removeAllFiles == 0) {
2,629!
2606
    if (wrapper->db && wrapper->pCf) {
930!
2607
      rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
930✔
2608
      rocksdb_flushoptions_set_wait(flushOpt, 1);
930✔
2609

2610
      char*                            err = NULL;
930✔
2611
      rocksdb_column_family_handle_t** cfs = taosMemoryCalloc(1, sizeof(rocksdb_column_family_handle_t*) * nCf);
930✔
2612
      int                              numOfFlushCf = 0;
930✔
2613
      for (int i = 0; i < nCf; i++) {
7,440✔
2614
        if (wrapper->pCf[i] != NULL) {
6,510✔
2615
          cfs[numOfFlushCf++] = wrapper->pCf[i];
2,344✔
2616
        }
2617
      }
2618
      if (numOfFlushCf != 0) {
930!
2619
        rocksdb_flush_cfs(wrapper->db, flushOpt, cfs, numOfFlushCf, &err);
930✔
2620
        if (err != NULL) {
930!
2621
          stError("failed to flush all cfs, reason:%s", err);
×
2622
          taosMemoryFreeClear(err);
×
2623
        }
2624
      }
2625
      taosMemoryFree(cfs);
930✔
2626
      rocksdb_flushoptions_destroy(flushOpt);
930✔
2627
    }
2628
  }
2629

2630
  if (wrapper->pCf != NULL) {
2,629!
2631
    for (int i = 0; i < nCf; i++) {
21,030✔
2632
      if (wrapper->pCf[i] != NULL) {
18,402✔
2633
        rocksdb_column_family_handle_destroy(wrapper->pCf[i]);
6,601✔
2634
      }
2635
    }
2636
  }
2637

2638
  if (wrapper->db) {
2,628!
2639
    rocksdb_close(wrapper->db);
2,629✔
2640
    wrapper->db = NULL;
2,629✔
2641
  }
2642

2643
  rocksdb_options_destroy(wrapper->dbOpt);
2,628✔
2644
  rocksdb_readoptions_destroy(wrapper->readOpt);
2,629✔
2645
  rocksdb_writeoptions_destroy(wrapper->writeOpt);
2,629✔
2646
  rocksdb_env_destroy(wrapper->env);
2,629✔
2647
  rocksdb_cache_destroy(wrapper->cache);
2,629✔
2648

2649
  taosMemoryFree(wrapper->pCf);
2,629✔
2650
  for (int i = 0; i < nCf; i++) {
21,031✔
2651
    rocksdb_options_t*                   opt = wrapper->pCfOpts[i];
18,402✔
2652
    rocksdb_comparator_t*                compare = wrapper->pCompares[i];
18,402✔
2653
    rocksdb_block_based_table_options_t* tblOpt = wrapper->pCfParams[i].tableOpt;
18,402✔
2654

2655
    rocksdb_options_destroy(opt);
18,402✔
2656
    rocksdb_comparator_destroy(compare);
18,403✔
2657
    rocksdb_block_based_options_destroy(tblOpt);
18,403✔
2658
  }
2659
  taosMemoryFree(wrapper->pCompares);
2,629✔
2660
  taosMemoryFree(wrapper->pCfOpts);
2,629✔
2661
  taosMemoryFree(wrapper->pCfParams);
2,629✔
2662

2663
  streamMutexDestroy(&wrapper->mutex);
2,629✔
2664

2665
  taskDbDestroyChkpOpt(wrapper);
2,629✔
2666

2667
  taosMemoryFree(wrapper->idstr);
2,629✔
2668

2669
  if (wrapper->removeAllFiles) {
2,629✔
2670
    char* err = NULL;
1,699✔
2671
    stInfo("drop task remove backend dat:%s", wrapper->path);
1,699!
2672
    taosRemoveDir(wrapper->path);
1,699✔
2673
  }
2674
  taosMemoryFree(wrapper->path);
2,629✔
2675
  taosMemoryFree(wrapper);
2,629✔
2676

2677
  return;
2,629✔
2678
}
2679

2680
void taskDbDestroy2(void* pDb) { taskDbDestroy(pDb, true); }
2,629✔
2681

2682
int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char** path) {
×
2683
  int32_t code = 0;
×
2684
  int64_t refId = pDb->refId;
×
2685
  int32_t nBytes = 0;
×
2686

2687
  if (taosAcquireRef(taskDbWrapperId, refId) == NULL) {
×
2688
    code = terrno;
×
2689
    return code;
×
2690
  }
2691

2692
  int32_t cap = strlen(pDb->path) + 128;
×
2693

2694
  char* buf = taosMemoryCalloc(1, cap);
×
2695
  if (buf == NULL) {
×
2696
    TAOS_UNUSED(taosReleaseRef(taskDbWrapperId, refId));
×
2697
    return terrno;
×
2698
  }
2699

2700
  nBytes =
2701
      snprintf(buf, cap, "%s%s%s%s%s%" PRId64 "", pDb->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkpId);
×
2702
  if (nBytes <= 0 || nBytes >= cap) {
×
2703
    taosMemoryFree(buf);
×
2704
    TAOS_UNUSED(taosReleaseRef(taskDbWrapperId, refId));
×
2705
    return TSDB_CODE_OUT_OF_RANGE;
×
2706
  }
2707

2708
  if (taosIsDir(buf)) {
×
2709
    code = 0;
×
2710
    *path = buf;
×
2711
  } else {
2712
    taosMemoryFree(buf);
×
2713
  }
2714

2715
  TAOS_UNUSED(taosReleaseRef(taskDbWrapperId, refId));
×
2716
  return code;
×
2717
}
2718

2719
int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64_t chkpId, char** path, SArray* list,
×
2720
                                    const char* idStr) {
2721
  int32_t  code = 0;
×
2722
  int32_t  cap = strlen(pDb->path) + 32;
×
2723
  SBkdMgt* p = (SBkdMgt*)bkdChkpMgt;
×
2724

2725
  char* temp = taosMemoryCalloc(1, cap);
×
2726
  if (temp == NULL) {
×
2727
    return terrno;
×
2728
  }
2729

2730
  int32_t nBytes = snprintf(temp, cap, "%s%s%s%" PRId64, pDb->path, TD_DIRSEP, "tmp", chkpId);
×
2731
  if (nBytes <= 0 || nBytes >= cap) {
×
2732
    taosMemoryFree(temp);
×
2733
    return TSDB_CODE_OUT_OF_RANGE;
×
2734
  }
2735

2736
  if (taosDirExist(temp)) {
×
2737
    cleanDir(temp, idStr);
×
2738
  } else {
2739
    code = taosMkDir(temp);
×
2740
    if (code != 0) {
×
2741
      taosMemoryFree(temp);
×
2742
      return TAOS_SYSTEM_ERROR(errno);
×
2743
    }
2744
  }
2745

2746
  code = bkdMgtGetDelta(p, pDb->idstr, chkpId, list, temp);
×
2747
  *path = temp;
×
2748

2749
  return code;
×
2750
}
2751

2752
int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list,
×
2753
                                const char* idStr) {
2754
  int32_t                 code = -1;
×
2755
  STaskDbWrapper*         pDb = arg;
×
2756
  ECHECKPOINT_BACKUP_TYPE utype = type;
×
2757

2758
  taskDbRefChkp(pDb, chkpId);
×
2759
  if (utype == DATA_UPLOAD_RSYNC) {
×
2760
    code = taskDbGenChkpUploadData__rsync(pDb, chkpId, path);
×
2761
  } else if (utype == DATA_UPLOAD_S3) {
×
2762
    code = taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path, list, idStr);
×
2763
  }
2764
  taskDbUnRefChkp(pDb, chkpId);
×
2765
  return code;
×
2766
}
2767

2768
int32_t taskDbOpenCfByKey(STaskDbWrapper* pDb, const char* key) {
×
2769
  int32_t code = 0;
×
2770
  char*   err = NULL;
×
2771
  int8_t  idx = getCfIdx(key);
×
2772

2773
  if (idx == -1) return -1;
×
2774

2775
  if (pDb->pCf[idx] != NULL) return code;
×
2776

2777
  rocksdb_column_family_handle_t* cf =
2778
      rocksdb_create_column_family(pDb->db, pDb->pCfOpts[idx], ginitDict[idx].key, &err);
×
2779
  if (err != NULL) {
×
2780
    stError("failed to open cf, key:%s, reason: %s", key, err);
×
2781
    taosMemoryFree(err);
×
2782
    code = -1;
×
2783
    return code;
×
2784
  }
2785

2786
  pDb->pCf[idx] = cf;
×
2787
  return code;
×
2788
}
2789
int32_t copyDataAt(RocksdbCfInst* pSrc, STaskDbWrapper* pDst, int8_t i) {
×
2790
  int32_t WRITE_BATCH = 1024;
×
2791
  char*   err = NULL;
×
2792
  int     code = 0;
×
2793

2794
  rocksdb_readoptions_t* pRdOpt = rocksdb_readoptions_create();
×
2795

2796
  rocksdb_writebatch_t* wb = rocksdb_writebatch_create();
×
2797
  rocksdb_iterator_t*   pIter = rocksdb_create_iterator_cf(pSrc->db, pRdOpt, pSrc->pHandle[i]);
×
2798
  rocksdb_iter_seek_to_first(pIter);
×
2799
  while (rocksdb_iter_valid(pIter)) {
×
2800
    if (rocksdb_writebatch_count(wb) >= WRITE_BATCH) {
×
2801
      rocksdb_write(pDst->db, pDst->writeOpt, wb, &err);
×
2802
      if (err != NULL) {
×
2803
        code = -1;
×
2804
        goto _EXIT;
×
2805
      }
2806
      rocksdb_writebatch_clear(wb);
×
2807
    }
2808

2809
    size_t klen = 0, vlen = 0;
×
2810
    char*  key = (char*)rocksdb_iter_key(pIter, &klen);
×
2811
    char*  val = (char*)rocksdb_iter_value(pIter, &vlen);
×
2812

2813
    rocksdb_writebatch_put_cf(wb, pDst->pCf[i], key, klen, val, vlen);
×
2814
    rocksdb_iter_next(pIter);
×
2815
  }
2816

2817
  if (rocksdb_writebatch_count(wb) > 0) {
×
2818
    rocksdb_write(pDst->db, pDst->writeOpt, wb, &err);
×
2819
    if (err != NULL) {
×
2820
      code = -1;
×
2821
      goto _EXIT;
×
2822
    }
2823
  }
2824

2825
_EXIT:
×
2826
  rocksdb_writebatch_destroy(wb);
×
2827
  rocksdb_iter_destroy(pIter);
×
2828
  rocksdb_readoptions_destroy(pRdOpt);
×
2829
  taosMemoryFree(err);
×
2830

2831
  return code;
×
2832
}
2833

2834
int32_t streamStateCvtDataFormat(char* path, char* key, void* pCfInst) {
×
2835
  int nCf = sizeof(ginitDict) / sizeof(ginitDict[0]);
×
2836

2837
  int32_t code = 0;
×
2838

2839
  int64_t         processVer = -1;
×
2840
  STaskDbWrapper* pTaskDb = NULL;
×
2841

2842
  code = taskDbOpen(path, key, 0, &processVer, &pTaskDb);
×
2843
  RocksdbCfInst* pSrcBackend = pCfInst;
×
2844

2845
  for (int i = 0; i < nCf; i++) {
×
2846
    rocksdb_column_family_handle_t* pSrcCf = pSrcBackend->pHandle[i];
×
2847
    if (pSrcCf == NULL) continue;
×
2848

2849
    code = taskDbOpenCfByKey(pTaskDb, ginitDict[i].key);
×
2850
    if (code != 0) goto _EXIT;
×
2851

2852
    code = copyDataAt(pSrcBackend, pTaskDb, i);
×
2853
    if (code != 0) goto _EXIT;
×
2854
  }
2855

2856
_EXIT:
×
2857
  taskDbDestroy(pTaskDb, true);
×
2858

2859
  return code;
×
2860
}
2861
int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf) {
×
2862
  SBackendWrapper* handle = backend;
×
2863
  char*            err = NULL;
×
2864
  int64_t          streamId;
2865
  int32_t          taskId, dummy = 0;
×
2866
  char             suffix[64] = {0};
×
2867

2868
  rocksdb_options_t**              cfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*));
×
2869
  RocksdbCfParam*                  params = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam));
×
2870
  rocksdb_comparator_t**           pCompare = taosMemoryCalloc(nCf, sizeof(rocksdb_comparator_t*));
×
2871
  rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
×
2872

2873
  for (int i = 0; i < nCf; i++) {
×
2874
    char* cf = cfs[i];
×
2875
    char  funcname[64] = {0};
×
2876
    cfOpts[i] = rocksdb_options_create_copy(handle->dbOpt);
×
2877
    if (i == 0) continue;
×
2878
    if (3 == sscanf(cf, "0x%" PRIx64 "-%d_%s", &streamId, &taskId, funcname)) {
×
2879
      rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create();
×
2880
      rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache);
×
2881
      rocksdb_block_based_options_set_partition_filters(tableOpt, 1);
×
2882

2883
      rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15);
×
2884
      rocksdb_block_based_options_set_filter_policy(tableOpt, filter);
×
2885

2886
      rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpts[i], tableOpt);
×
2887
      params[i].tableOpt = tableOpt;
×
2888

2889
      int idx = streamStateGetCfIdx(NULL, funcname);
×
2890
      if (idx < 0 || idx >= sizeof(ginitDict) / sizeof(ginitDict[0])) {
×
2891
        stError("failed to open cf");
×
2892
        return -1;
×
2893
      }
2894
      SCfInit* cfPara = &ginitDict[idx];
×
2895

2896
      rocksdb_comparator_t* compare =
2897
          rocksdb_comparator_create(NULL, cfPara->destroyCmp, cfPara->cmpKey, cfPara->cmpName);
×
2898
      rocksdb_options_set_comparator((rocksdb_options_t*)cfOpts[i], compare);
×
2899
      pCompare[i] = compare;
×
2900
    }
2901
  }
2902
  rocksdb_t* db = rocksdb_open_column_families(handle->dbOpt, name, nCf, (const char* const*)cfs,
×
2903
                                               (const rocksdb_options_t* const*)cfOpts, cfHandle, &err);
2904
  if (err != NULL) {
×
2905
    stError("failed to open rocksdb cf, reason:%s", err);
×
2906
    taosMemoryFree(err);
×
2907
    taosMemoryFree(cfHandle);
×
2908
    taosMemoryFree(pCompare);
×
2909
    taosMemoryFree(params);
×
2910
    taosMemoryFree(cfOpts);
×
2911
    // fix other leak
2912
    return -1;
×
2913
  } else {
2914
    stDebug("succ to open rocksdb cf");
×
2915
  }
2916
  // close default cf
2917
  if (((rocksdb_column_family_handle_t**)cfHandle)[0] != 0) {
×
2918
    rocksdb_column_family_handle_destroy(cfHandle[0]);
×
2919
    cfHandle[0] = NULL;
×
2920
  }
2921
  rocksdb_options_destroy(cfOpts[0]);
×
2922

2923
  handle->db = db;
×
2924

2925
  static int32_t cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
2926
  for (int i = 0; i < nCf; i++) {
×
2927
    char* cf = cfs[i];
×
2928
    if (i == 0) continue;  // skip default column family, not set opt
×
2929

2930
    char funcname[64] = {0};
×
2931
    if (3 == sscanf(cf, "0x%" PRIx64 "-%d_%s", &streamId, &taskId, funcname)) {
×
2932
      char idstr[128] = {0};
×
2933
      sprintf(idstr, "0x%" PRIx64 "-%d", streamId, taskId);
×
2934

2935
      int idx = streamStateGetCfIdx(NULL, funcname);
×
2936

2937
      RocksdbCfInst*  inst = NULL;
×
2938
      RocksdbCfInst** pInst = taosHashGet(handle->cfInst, idstr, strlen(idstr) + 1);
×
2939
      if (pInst == NULL || *pInst == NULL) {
×
2940
        inst = taosMemoryCalloc(1, sizeof(RocksdbCfInst));
×
2941
        inst->pHandle = taosMemoryCalloc(cfLen, sizeof(rocksdb_column_family_handle_t*));
×
2942
        inst->cfOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*));
×
2943
        inst->wOpt = rocksdb_writeoptions_create();
×
2944
        inst->rOpt = rocksdb_readoptions_create();
×
2945
        inst->param = taosMemoryCalloc(cfLen, sizeof(RocksdbCfParam));
×
2946
        inst->pBackend = handle;
×
2947
        inst->db = db;
×
2948
        inst->pCompares = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*));
×
2949

2950
        inst->dbOpt = handle->dbOpt;
×
2951
        rocksdb_writeoptions_disable_WAL(inst->wOpt, 1);
×
2952
        TAOS_UNUSED(taosHashPut(handle->cfInst, idstr, strlen(idstr) + 1, &inst, sizeof(void*)));
×
2953
      } else {
2954
        inst = *pInst;
×
2955
      }
2956
      inst->cfOpt[idx] = cfOpts[i];
×
2957
      inst->pCompares[idx] = pCompare[i];
×
2958
      memcpy(&(inst->param[idx]), &(params[i]), sizeof(RocksdbCfParam));
×
2959
      inst->pHandle[idx] = cfHandle[i];
×
2960
    }
2961
  }
2962
  void* pIter = taosHashIterate(handle->cfInst, NULL);
×
2963
  while (pIter) {
×
2964
    RocksdbCfInst* inst = *(RocksdbCfInst**)pIter;
×
2965

2966
    for (int i = 0; i < cfLen; i++) {
×
2967
      if (inst->cfOpt[i] == NULL) {
×
2968
        rocksdb_options_t*                   opt = rocksdb_options_create_copy(handle->dbOpt);
×
2969
        rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create();
×
2970
        rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache);
×
2971
        rocksdb_block_based_options_set_partition_filters(tableOpt, 1);
×
2972

2973
        rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15);
×
2974
        rocksdb_block_based_options_set_filter_policy(tableOpt, filter);
×
2975

2976
        rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)opt, tableOpt);
×
2977

2978
        SCfInit* cfPara = &ginitDict[i];
×
2979

2980
        rocksdb_comparator_t* compare =
2981
            rocksdb_comparator_create(NULL, cfPara->destroyCmp, cfPara->cmpKey, cfPara->cmpName);
×
2982
        rocksdb_options_set_comparator((rocksdb_options_t*)opt, compare);
×
2983

2984
        inst->pCompares[i] = compare;
×
2985
        inst->cfOpt[i] = opt;
×
2986
        inst->param[i].tableOpt = tableOpt;
×
2987
      }
2988
    }
2989
    SCfComparator compare = {.comp = inst->pCompares, .numOfComp = cfLen};
×
2990
    inst->pCompareNode = streamBackendAddCompare(handle, &compare);
×
2991
    pIter = taosHashIterate(handle->cfInst, pIter);
×
2992
  }
2993

2994
  taosMemoryFree(cfHandle);
×
2995
  taosMemoryFree(pCompare);
×
2996
  taosMemoryFree(params);
×
2997
  taosMemoryFree(cfOpts);
×
2998
  return 0;
×
2999
}
3000
#ifdef BUILD_NO_CALL
3001
int streamStateOpenBackend(void* backend, SStreamState* pState) {
3002
  taosAcquireRef(streamBackendId, pState->streamBackendRid);
3003
  SBackendWrapper*   handle = backend;
3004
  SBackendCfWrapper* pBackendCfWrapper = taosMemoryCalloc(1, sizeof(SBackendCfWrapper));
3005

3006
  streamMutexLock(&handle->cfMutex);
3007
  RocksdbCfInst** ppInst = taosHashGet(handle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1);
3008
  if (ppInst != NULL && *ppInst != NULL) {
3009
    RocksdbCfInst* inst = *ppInst;
3010
    pBackendCfWrapper->rocksdb = inst->db;
3011
    pBackendCfWrapper->pHandle = (void**)inst->pHandle;
3012
    pBackendCfWrapper->writeOpts = inst->wOpt;
3013
    pBackendCfWrapper->readOpts = inst->rOpt;
3014
    pBackendCfWrapper->cfOpts = (void**)(inst->cfOpt);
3015
    pBackendCfWrapper->dbOpt = handle->dbOpt;
3016
    pBackendCfWrapper->param = inst->param;
3017
    pBackendCfWrapper->pBackend = handle;
3018
    pBackendCfWrapper->pComparNode = inst->pCompareNode;
3019
    streamMutexUnlock(&handle->cfMutex);
3020
    pBackendCfWrapper->backendId = pState->streamBackendRid;
3021
    memcpy(pBackendCfWrapper->idstr, pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr));
3022

3023
    int64_t id = taosAddRef(streamBackendCfWrapperId, pBackendCfWrapper);
3024
    pState->pTdbState->backendCfWrapperId = id;
3025
    pState->pTdbState->pBackendCfWrapper = pBackendCfWrapper;
3026
    stInfo("succ to open state %p on backendWrapper, %p, %s", pState, pBackendCfWrapper, pBackendCfWrapper->idstr);
3027

3028
    inst->pHandle = NULL;
3029
    inst->cfOpt = NULL;
3030
    inst->param = NULL;
3031

3032
    inst->wOpt = NULL;
3033
    inst->rOpt = NULL;
3034
    return 0;
3035
  }
3036
  streamMutexUnlock(&handle->cfMutex);
3037

3038
  char* err = NULL;
3039
  int   cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
3040

3041
  RocksdbCfParam*           param = taosMemoryCalloc(cfLen, sizeof(RocksdbCfParam));
3042
  const rocksdb_options_t** cfOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*));
3043
  for (int i = 0; i < cfLen; i++) {
3044
    cfOpt[i] = rocksdb_options_create_copy(handle->dbOpt);
3045
    // refactor later
3046
    rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create();
3047
    rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache);
3048
    rocksdb_block_based_options_set_partition_filters(tableOpt, 1);
3049

3050
    rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15);
3051
    rocksdb_block_based_options_set_filter_policy(tableOpt, filter);
3052

3053
    rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpt[i], tableOpt);
3054

3055
    param[i].tableOpt = tableOpt;
3056
  };
3057

3058
  rocksdb_comparator_t** pCompare = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*));
3059
  for (int i = 0; i < cfLen; i++) {
3060
    SCfInit* cf = &ginitDict[i];
3061

3062
    rocksdb_comparator_t* compare = rocksdb_comparator_create(NULL, cf->destroyCmp, cf->cmpKey, cf->cmpName);
3063
    rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[i], compare);
3064
    pCompare[i] = compare;
3065
  }
3066
  rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(cfLen, sizeof(rocksdb_column_family_handle_t*));
3067
  pBackendCfWrapper->rocksdb = handle->db;
3068
  pBackendCfWrapper->pHandle = (void**)cfHandle;
3069
  pBackendCfWrapper->writeOpts = rocksdb_writeoptions_create();
3070
  pBackendCfWrapper->readOpts = rocksdb_readoptions_create();
3071
  pBackendCfWrapper->cfOpts = (void**)cfOpt;
3072
  pBackendCfWrapper->dbOpt = handle->dbOpt;
3073
  pBackendCfWrapper->param = param;
3074
  pBackendCfWrapper->pBackend = handle;
3075
  pBackendCfWrapper->backendId = pState->streamBackendRid;
3076
  taosThreadRwlockInit(&pBackendCfWrapper->rwLock, NULL);
3077
  SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen};
3078
  pBackendCfWrapper->pComparNode = streamBackendAddCompare(handle, &compare);
3079
  rocksdb_writeoptions_disable_WAL(pBackendCfWrapper->writeOpts, 1);
3080
  memcpy(pBackendCfWrapper->idstr, pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr));
3081

3082
  int64_t id = taosAddRef(streamBackendCfWrapperId, pBackendCfWrapper);
3083
  pState->pTdbState->backendCfWrapperId = id;
3084
  pState->pTdbState->pBackendCfWrapper = pBackendCfWrapper;
3085
  stInfo("succ to open state %p on backendWrapper %p %s", pState, pBackendCfWrapper, pBackendCfWrapper->idstr);
3086
  return 0;
3087
}
3088

3089
void streamStateCloseBackend(SStreamState* pState, bool remove) {
3090
  SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
3091
  SBackendWrapper*   pHandle = wrapper->pBackend;
3092

3093
  stInfo("start to close state on backend: %p", pHandle);
3094

3095
  streamMutexLock(&pHandle->cfMutex);
3096
  RocksdbCfInst** ppInst = taosHashGet(pHandle->cfInst, wrapper->idstr, strlen(pState->pTdbState->idstr) + 1);
3097
  if (ppInst != NULL && *ppInst != NULL) {
3098
    RocksdbCfInst* inst = *ppInst;
3099
    taosMemoryFree(inst);
3100
    taosHashRemove(pHandle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1);
3101
  }
3102
  streamMutexUnlock(&pHandle->cfMutex);
3103

3104
  char* status[] = {"close", "drop"};
3105
  stInfo("start to %s state %p on backendWrapper %p %s", status[remove == false ? 0 : 1], pState, wrapper,
3106
         wrapper->idstr);
3107
  wrapper->remove |= remove;  // update by other pState
3108
  taosReleaseRef(streamBackendCfWrapperId, pState->pTdbState->backendCfWrapperId);
3109
}
3110
#endif
3111
void streamStateDestroyCompar(void* arg) {
×
3112
  SCfComparator* comp = (SCfComparator*)arg;
×
3113
  for (int i = 0; i < comp->numOfComp; i++) {
×
3114
    if (comp->comp[i]) rocksdb_comparator_destroy(comp->comp[i]);
×
3115
  }
3116
  taosMemoryFree(comp->comp);
×
3117
}
×
3118

3119
int streamStateGetCfIdx(SStreamState* pState, const char* funcName) {
520,353✔
3120
  int    idx = -1;
520,353✔
3121
  size_t len = strlen(funcName);
520,353✔
3122
  for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
2,951,615!
3123
    if (len == ginitDict[i].len && strncmp(funcName, ginitDict[i].key, strlen(funcName)) == 0) {
2,952,281✔
3124
      idx = i;
521,019✔
3125
      break;
521,019✔
3126
    }
3127
  }
3128
  if (pState != NULL && idx != -1) {
520,353!
3129
    STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
520,945✔
3130
    if (wrapper == NULL) {
520,945!
3131
      return -1;
×
3132
    }
3133

3134
    streamMutexLock(&wrapper->mutex);
520,945✔
3135

3136
    rocksdb_column_family_handle_t* cf = wrapper->pCf[idx];
520,857✔
3137
    if (cf == NULL) {
520,857✔
3138
      char* err = NULL;
4,092✔
3139
      cf = rocksdb_create_column_family(wrapper->db, wrapper->pCfOpts[idx], ginitDict[idx].key, &err);
4,092✔
3140
      if (err != NULL) {
4,092!
3141
        idx = -1;
×
3142
        stError("failed to open cf, %p %s_%s, reason:%s", pState, wrapper->idstr, funcName, err);
×
3143
        rocksdb_column_family_handle_destroy(cf);
×
3144
        taosMemoryFree(err);
×
3145
      } else {
3146
        stDebug("succ to open cf, %p %s_%s", pState, wrapper->idstr, funcName);
4,092✔
3147
        wrapper->pCf[idx] = cf;
4,092✔
3148
      }
3149
    }
3150
    streamMutexUnlock(&wrapper->mutex);
520,857✔
3151
  }
3152

3153
  return idx;
520,853✔
3154
}
3155
bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len) {
6,802✔
3156
  rocksdb_iter_seek(iter, buf, len);
6,802✔
3157
  if (!rocksdb_iter_valid(iter)) {
6,802✔
3158
    rocksdb_iter_seek_for_prev(iter, buf, len);
3,640✔
3159
    if (!rocksdb_iter_valid(iter)) {
3,640✔
3160
      return false;
3,017✔
3161
    }
3162
  }
3163
  return true;
3,785✔
3164
}
3165
rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKeyName, rocksdb_snapshot_t** snapshot,
10,378✔
3166
                                          rocksdb_readoptions_t** readOpt) {
3167
  int idx = streamStateGetCfIdx(pState, cfKeyName);
10,378✔
3168

3169
  *readOpt = rocksdb_readoptions_create();
10,378✔
3170

3171
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
10,378✔
3172
  if (snapshot != NULL) {
10,378!
3173
    *snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(wrapper->db);
10,378✔
3174
    rocksdb_readoptions_set_snapshot(*readOpt, *snapshot);
10,378✔
3175
    rocksdb_readoptions_set_fill_cache(*readOpt, 0);
10,378✔
3176
  }
3177

3178
  return rocksdb_create_iterator_cf(wrapper->db, *readOpt, ((rocksdb_column_family_handle_t**)wrapper->pCf)[idx]);
10,378✔
3179
}
3180

3181
#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen)                                              \
3182
  do {                                                                                                            \
3183
    code = 0;                                                                                                     \
3184
    char  buf[128] = {0};                                                                                         \
3185
    char* err = NULL;                                                                                             \
3186
    int   i = streamStateGetCfIdx(pState, funcname);                                                              \
3187
    if (i < 0) {                                                                                                  \
3188
      stWarn("streamState failed to get cf name: %s", funcname);                                                  \
3189
      code = TSDB_CODE_THIRDPARTY_ERROR;                                                                          \
3190
      break;                                                                                                      \
3191
    }                                                                                                             \
3192
    STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;                                                \
3193
    TAOS_UNUSED(atomic_add_fetch_64(&wrapper->dataWritten, 1));                                                   \
3194
    char toString[128] = {0};                                                                                     \
3195
    if (stDebugFlag & DEBUG_TRACE) TAOS_UNUSED((ginitDict[i].toStrFunc((void*)key, toString)));                   \
3196
    int32_t                         klen = ginitDict[i].enFunc((void*)key, buf);                                  \
3197
    rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \
3198
    rocksdb_writeoptions_t*         opts = wrapper->writeOpt;                                                     \
3199
    rocksdb_t*                      db = wrapper->db;                                                             \
3200
    char*                           ttlV = NULL;                                                                  \
3201
    int32_t                         ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV);             \
3202
    rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err);          \
3203
    if (err != NULL) {                                                                                            \
3204
      stError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err);                     \
3205
      taosMemoryFree(err);                                                                                        \
3206
      code = TSDB_CODE_THIRDPARTY_ERROR;                                                                          \
3207
    } else {                                                                                                      \
3208
      stTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d, %p", toString, funcname, vLen, \
3209
              ttlVLen, wrapper);                                                                                  \
3210
    }                                                                                                             \
3211
    taosMemoryFree(ttlV);                                                                                         \
3212
  } while (0);
3213

3214
#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen)                                                   \
3215
  do {                                                                                                                \
3216
    code = 0;                                                                                                         \
3217
    char  buf[128] = {0};                                                                                             \
3218
    char* err = NULL;                                                                                                 \
3219
    int   i = streamStateGetCfIdx(pState, funcname);                                                                  \
3220
    if (i < 0) {                                                                                                      \
3221
      stWarn("streamState failed to get cf name: %s", funcname);                                                      \
3222
      code = -1;                                                                                                      \
3223
      break;                                                                                                          \
3224
    }                                                                                                                 \
3225
    STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;                                                    \
3226
    char            toString[128] = {0};                                                                              \
3227
    if (stDebugFlag & DEBUG_TRACE) TAOS_UNUSED((ginitDict[i].toStrFunc((void*)key, toString)));                       \
3228
    int32_t                         klen = ginitDict[i].enFunc((void*)key, buf);                                      \
3229
    rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx];     \
3230
    rocksdb_t*                      db = wrapper->db;                                                                 \
3231
    rocksdb_readoptions_t*          opts = wrapper->readOpt;                                                          \
3232
    size_t                          len = 0;                                                                          \
3233
    char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err);                       \
3234
    if (val == NULL || len == 0) {                                                                                    \
3235
      if (err == NULL) {                                                                                              \
3236
        stTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, wrapper->idstr, funcname); \
3237
      } else {                                                                                                        \
3238
        stError("streamState str: %s failed to read from %s_%s, err: %s", toString, wrapper->idstr, funcname, err);   \
3239
        taosMemoryFreeClear(err);                                                                                     \
3240
      }                                                                                                               \
3241
      code = -1;                                                                                                      \
3242
    } else {                                                                                                          \
3243
      char*   p = NULL;                                                                                               \
3244
      int32_t tlen = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal);                                          \
3245
      if (tlen <= 0) {                                                                                                \
3246
        stError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, wrapper->idstr,         \
3247
                funcname);                                                                                            \
3248
        code = -1;                                                                                                    \
3249
      } else {                                                                                                        \
3250
        stTrace("streamState str: %s succ to read from %s_%s, valLen:%d, %p", toString, wrapper->idstr, funcname,     \
3251
                tlen, wrapper);                                                                                       \
3252
      }                                                                                                               \
3253
      taosMemoryFree(val);                                                                                            \
3254
      if (vLen != NULL) *vLen = tlen;                                                                                 \
3255
    }                                                                                                                 \
3256
  } while (0);
3257

3258
#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key)                                                           \
3259
  do {                                                                                                            \
3260
    code = 0;                                                                                                     \
3261
    char  buf[128] = {0};                                                                                         \
3262
    char* err = NULL;                                                                                             \
3263
    int   i = streamStateGetCfIdx(pState, funcname);                                                              \
3264
    if (i < 0) {                                                                                                  \
3265
      stWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname);                     \
3266
      code = TSDB_CODE_THIRDPARTY_ERROR;                                                                          \
3267
      break;                                                                                                      \
3268
    }                                                                                                             \
3269
    STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;                                                \
3270
    TAOS_UNUSED(atomic_add_fetch_64(&wrapper->dataWritten, 1));                                                   \
3271
    char toString[128] = {0};                                                                                     \
3272
    if (stDebugFlag & DEBUG_TRACE) TAOS_UNUSED(ginitDict[i].toStrFunc((void*)key, toString));                     \
3273
    int32_t                         klen = ginitDict[i].enFunc((void*)key, buf);                                  \
3274
    rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \
3275
    rocksdb_t*                      db = wrapper->db;                                                             \
3276
    rocksdb_writeoptions_t*         opts = wrapper->writeOpt;                                                     \
3277
    rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, klen, &err);                                           \
3278
    if (err != NULL) {                                                                                            \
3279
      stError("streamState str: %s failed to del from %s_%s, err: %s", toString, wrapper->idstr, funcname, err);  \
3280
      taosMemoryFree(err);                                                                                        \
3281
      code = TSDB_CODE_THIRDPARTY_ERROR;                                                                          \
3282
    } else {                                                                                                      \
3283
      stTrace("streamState str: %s succ to del from %s_%s", toString, wrapper->idstr, funcname);                  \
3284
    }                                                                                                             \
3285
  } while (0);
3286

3287
// state cf
3288
int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
×
3289
  int code = 0;
×
3290

3291
  SStateKey sKey = {.key = *key, .opNum = pState->number};
×
3292
  char*     dst = NULL;
×
3293
  size_t    size = 0;
×
3294
  if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) {
×
3295
    STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)value, (int32_t)vLen);
×
3296
  } else {
3297
    code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size);
×
3298
    if (code != 0) {
×
3299
      return code;
×
3300
    }
3301
    STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)dst, (int32_t)size);
×
3302
    taosMemoryFree(dst);
×
3303
  }
3304
  return code;
×
3305
}
3306
int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
610✔
3307
  int       code = 0;
610✔
3308
  SStateKey sKey = {.key = *key, .opNum = pState->number};
610✔
3309

3310
  char*  tVal = NULL;
610✔
3311
  size_t tValLen = 0;
610✔
3312
  STREAM_STATE_GET_ROCKSDB(pState, "state", &sKey, &tVal, &tValLen);
610!
3313
  if (code != 0) {
610✔
3314
    taosMemoryFree(tVal);
526✔
3315
    return code;
526✔
3316
  }
3317
  if (pState->pResultRowStore.resultRowGet == NULL || pState->pExprSupp == NULL) {
84!
3318
    *pVal = tVal;
×
3319
    *pVLen = tValLen;
×
3320
    return code;
×
3321
  }
3322
  size_t pValLen = 0;
84✔
3323
  code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, &pValLen);
84✔
3324
  *pVLen = (int32_t)pValLen;
84✔
3325
  taosMemoryFree(tVal);
84✔
3326
  return code;
84✔
3327
}
3328
int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key) {
2,930✔
3329
  int       code = 0;
2,930✔
3330
  SStateKey sKey = {.key = *key, .opNum = pState->number};
2,930✔
3331
  STREAM_STATE_DEL_ROCKSDB(pState, "state", &sKey);
2,930!
3332
  return code;
2,930✔
3333
}
3334
int32_t streamStateClear_rocksdb(SStreamState* pState) {
×
3335
  stDebug("streamStateClear_rocksdb");
×
3336

3337
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
×
3338
  TAOS_UNUSED(atomic_add_fetch_64(&wrapper->dataWritten, 1));
×
3339

3340
  char      sKeyStr[128] = {0};
×
3341
  char      eKeyStr[128] = {0};
×
3342
  SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number};
×
3343
  SStateKey eKey = {.key = {.ts = INT64_MAX, .groupId = UINT64_MAX}, .opNum = pState->number};
×
3344

3345
  int sLen = stateKeyEncode(&sKey, sKeyStr);
×
3346
  int eLen = stateKeyEncode(&eKey, eKeyStr);
×
3347

3348
  if (wrapper->pCf[1] != NULL) {
×
3349
    char* err = NULL;
×
3350
    rocksdb_delete_range_cf(wrapper->db, wrapper->writeOpt, wrapper->pCf[1], sKeyStr, sLen, eKeyStr, eLen, &err);
×
3351
    if (err != NULL) {
×
3352
      char toStringStart[128] = {0};
×
3353
      char toStringEnd[128] = {0};
×
3354
      TAOS_UNUSED(stateKeyToString(&sKey, toStringStart));
×
3355
      TAOS_UNUSED(stateKeyToString(&eKey, toStringEnd));
×
3356

3357
      stWarn("failed to delete range cf(state) start: %s, end:%s, reason:%s", toStringStart, toStringEnd, err);
×
3358
      taosMemoryFree(err);
×
3359
    } else {
3360
      rocksdb_compact_range_cf(wrapper->db, wrapper->pCf[1], sKeyStr, sLen, eKeyStr, eLen);
×
3361
    }
3362
  }
3363

3364
  return 0;
×
3365
}
3366
void streamStateCurNext_rocksdb(SStreamStateCur* pCur) {
2,570✔
3367
  if (pCur && pCur->iter && rocksdb_iter_valid(pCur->iter)) {
2,570!
3368
    rocksdb_iter_next(pCur->iter);
2,551✔
3369
  }
3370
}
2,570✔
3371
int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) {
×
3372
  int code = 0;
×
3373
  stDebug("streamStateGetFirst_rocksdb");
×
3374
  SWinKey tmp = {.ts = 0, .groupId = 0};
×
3375
  code = streamStatePut_rocksdb(pState, &tmp, NULL, 0);
×
3376
  if (code != 0) {
×
3377
    return code;
×
3378
  }
3379

3380
  SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp);
×
3381
  code = streamStateGetKVByCur_rocksdb(pState, pCur, key, NULL, 0);
×
3382
  if (code != 0) {
×
3383
    return code;
×
3384
  }
3385
  streamStateFreeCur(pCur);
×
3386
  return streamStateDel_rocksdb(pState, &tmp);
×
3387
}
3388

3389
int32_t streamStateFillGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
3,511✔
3390
  if (!pCur) {
3,511✔
3391
    return -1;
32✔
3392
  }
3393
  uint64_t groupId = pKey->groupId;
3,479✔
3394

3395
  int32_t code = streamStateFillGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
3,479✔
3396
  if (code == 0) {
3,479✔
3397
    if (pKey->groupId == groupId) {
2,326✔
3398
      return 0;
2,306✔
3399
    }
3400
    if (pVal != NULL) {
20!
3401
      taosMemoryFree((void*)*pVal);
20✔
3402
      *pVal = NULL;
20✔
3403
    }
3404
  }
3405
  return -1;
1,173✔
3406
}
3407
int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
×
3408
  stDebug("streamStateAddIfNotExist_rocksdb");
×
3409
  int32_t size = *pVLen;
×
3410
  if (streamStateGet_rocksdb(pState, key, pVal, pVLen) == 0) {
×
3411
    return 0;
×
3412
  }
3413
  *pVal = taosMemoryMalloc(size);
×
3414
  if (*pVal == NULL) {
×
3415
    return terrno;
×
3416
  }
3417
  memset(*pVal, 0, size);
×
3418
  return 0;
×
3419
}
3420
void streamStateCurPrev_rocksdb(SStreamStateCur* pCur) {
×
3421
  if (pCur) {
×
3422
    rocksdb_iter_prev(pCur->iter);
×
3423
  }
3424
}
×
3425
int32_t streamStateGetKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SWinKey* pKey, const void** pVal,
3,065✔
3426
                                      int32_t* pVLen) {
3427
  if (!pCur) return -1;
3,065✔
3428
  SStateKey  tkey;
3429
  SStateKey* pKtmp = &tkey;
9✔
3430

3431
  if (rocksdb_iter_valid(pCur->iter) && !iterValueIsStale(pCur->iter)) {
9!
3432
    size_t tlen;
3433
    char*  keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen);
9✔
3434
    TAOS_UNUSED(stateKeyDecode((void*)pKtmp, keyStr));
9✔
3435
    if (pKtmp->opNum != pCur->number) {
9!
3436
      return -1;
×
3437
    }
3438

3439
    if (pVLen != NULL) {
9!
3440
      size_t      vlen = 0;
9✔
3441
      const char* valStr = rocksdb_iter_value(pCur->iter, &vlen);
9✔
3442
      char*       val = NULL;
9✔
3443
      int32_t     len = valueDecode((void*)valStr, vlen, NULL, (char**)val);
9✔
3444
      if (len <= 0) {
9!
3445
        taosMemoryFree(val);
×
3446
        return -1;
9✔
3447
      }
3448

3449
      char*  tVal = val;
9✔
3450
      size_t tVlen = len;
9✔
3451

3452
      if (pVal != NULL) {
9!
3453
        if (pState != NULL && pState->pResultRowStore.resultRowGet != NULL && pState->pExprSupp != NULL) {
9!
3454
          int code =
3455
              (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, val, len, (char**)&tVal, (size_t*)&tVlen);
9✔
3456
          if (code != 0) {
9!
3457
            taosMemoryFree(val);
9✔
3458
            return code;
9✔
3459
          }
3460
          taosMemoryFree(val);
×
3461
          *pVal = (char*)tVal;
×
3462
        } else {
3463
          stInfo("streamStateGetKVByCur_rocksdb, pState = %p, pResultRowStore = %p, pExprSupp = %p", pState,
×
3464
                 pState->pResultRowStore.resultRowGet, pState->pExprSupp);
3465
          *pVal = (char*)tVal;
×
3466
        }
3467
      } else {
3468
        taosMemoryFree(val);
×
3469
      }
3470
      *pVLen = (int32_t)tVlen;
×
3471
    }
3472

3473
    *pKey = pKtmp->key;
×
3474
    return 0;
×
3475
  }
3476
  return -1;
×
3477
}
3478
SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key) {
201✔
3479
  SStreamStateCur* pCur = streamStateFillGetCur_rocksdb(pState, key);
201✔
3480
  if (pCur) {
201✔
3481
    int32_t code = streamStateFillGetGroupKVByCur_rocksdb(pCur, key, NULL, 0);
162✔
3482
    if (code == 0) return pCur;
162!
3483
    streamStateFreeCur(pCur);
×
3484
  }
3485
  return NULL;
39✔
3486
}
3487

3488
SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) {
×
3489
  SStreamStateCur* pCur = createStreamStateCursor();
×
3490
  if (pCur == NULL) {
×
3491
    return NULL;
×
3492
  }
3493
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
×
3494
  pCur->number = pState->number;
×
3495
  pCur->db = wrapper->db;
×
3496
  pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
×
3497
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
×
3498

3499
  SStateKey sKey = {.key = *key, .opNum = pState->number};
×
3500
  char      buf[128] = {0};
×
3501
  int       len = stateKeyEncode((void*)&sKey, buf);
×
3502
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
×
3503
    streamStateFreeCur(pCur);
×
3504
    return NULL;
×
3505
  }
3506
  // skip ttl expired data
3507
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
×
3508
    rocksdb_iter_next(pCur->iter);
×
3509
  }
3510

3511
  if (rocksdb_iter_valid(pCur->iter)) {
×
3512
    SStateKey curKey;
3513
    size_t    kLen;
3514
    char*     keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
×
3515
    TAOS_UNUSED(stateKeyDecode((void*)&curKey, keyStr));
×
3516
    if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) > 0) {
×
3517
      return pCur;
×
3518
    }
3519
    rocksdb_iter_next(pCur->iter);
×
3520
    return pCur;
×
3521
  }
3522
  streamStateFreeCur(pCur);
×
3523
  return NULL;
×
3524
}
3525

3526
SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState) {
3,065✔
3527
  int32_t code = 0;
3,065✔
3528

3529
  const SStateKey maxStateKey = {.key = {.groupId = UINT64_MAX, .ts = INT64_MAX}, .opNum = INT64_MAX};
3,065✔
3530
  STREAM_STATE_PUT_ROCKSDB(pState, "state", &maxStateKey, "", 0);
3,065!
3531
  if (code != 0) {
3,065!
3532
    return NULL;
×
3533
  }
3534

3535
  {
3536
    char tbuf[256] = {0};
3,065✔
3537
    TAOS_UNUSED(stateKeyToString((void*)&maxStateKey, tbuf));
3,065✔
3538
    stDebug("seek to last:%s", tbuf);
3,065✔
3539
  }
3540

3541
  SStreamStateCur* pCur = createStreamStateCursor();
3,065✔
3542
  if (pCur == NULL) return NULL;
3,065!
3543

3544
  pCur->number = pState->number;
3,065✔
3545
  pCur->db = ((STaskDbWrapper*)pState->pTdbState->pOwner->pBackend)->db;
3,065✔
3546
  pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
6,130✔
3547
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
3,065✔
3548

3549
  char    buf[128] = {0};
3,065✔
3550
  int32_t klen = stateKeyEncode((void*)&maxStateKey, buf);
3,065✔
3551
  rocksdb_iter_seek(pCur->iter, buf, (size_t)klen);
3,065✔
3552
  rocksdb_iter_prev(pCur->iter);
3,065✔
3553
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
3,065!
3554
    rocksdb_iter_prev(pCur->iter);
×
3555
  }
3556

3557
  if (!rocksdb_iter_valid(pCur->iter)) {
3,065✔
3558
    streamStateFreeCur(pCur);
3,056✔
3559
    pCur = NULL;
3,056✔
3560
  }
3561

3562
  STREAM_STATE_DEL_ROCKSDB(pState, "state", &maxStateKey);
3,065!
3563
  return pCur;
3,065✔
3564
}
3565

3566
SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
×
3567
  stDebug("streamStateGetCur_rocksdb");
×
3568
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
×
3569

3570
  SStreamStateCur* pCur = createStreamStateCursor();
×
3571
  if (pCur == NULL) return NULL;
×
3572

3573
  pCur->db = wrapper->db;
×
3574
  pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
×
3575
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
×
3576
  pCur->number = pState->number;
×
3577

3578
  SStateKey sKey = {.key = *key, .opNum = pState->number};
×
3579
  char      buf[128] = {0};
×
3580
  int       len = stateKeyEncode((void*)&sKey, buf);
×
3581

3582
  rocksdb_iter_seek(pCur->iter, buf, len);
×
3583

3584
  if (rocksdb_iter_valid(pCur->iter) && !iterValueIsStale(pCur->iter)) {
×
3585
    SStateKey curKey;
3586
    size_t    kLen = 0;
×
3587
    char*     keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
×
3588
    TAOS_UNUSED(stateKeyDecode((void*)&curKey, keyStr));
×
3589

3590
    if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) == 0) {
×
3591
      pCur->number = pState->number;
×
3592
      return pCur;
×
3593
    }
3594
  }
3595
  streamStateFreeCur(pCur);
×
3596
  return NULL;
×
3597
}
3598

3599
// func cf
3600
int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) {
×
3601
  int    code = 0;
×
3602
  char*  dst = NULL;
×
3603
  size_t size = 0;
×
3604
  if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) {
×
3605
    STREAM_STATE_PUT_ROCKSDB(pState, "func", key, (void*)value, (int32_t)vLen);
×
3606
    return code;
×
3607
  }
3608
  code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size);
×
3609
  if (code != 0) {
×
3610
    return code;
×
3611
  }
3612
  STREAM_STATE_PUT_ROCKSDB(pState, "func", key, (void*)dst, (int32_t)size);
×
3613
  taosMemoryFree(dst);
×
3614

3615
  return code;
×
3616
}
3617
int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) {
×
3618
  int    code = 0;
×
3619
  char*  tVal = NULL;
×
3620
  size_t tValLen = 0;
×
3621
  STREAM_STATE_GET_ROCKSDB(pState, "func", key, tVal, &tValLen);
×
3622
  if (code != 0) {
×
3623
    taosMemoryFree(tVal);
×
3624
    return code;
×
3625
  }
3626

3627
  if (pState->pResultRowStore.resultRowGet == NULL || pState->pExprSupp == NULL) {
×
3628
    *pVal = tVal;
×
3629
    *pVLen = tValLen;
×
3630
    return code;
×
3631
  }
3632

3633
  size_t pValLen = 0;
×
3634
  code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, &pValLen);
×
3635
  *pVLen = (int32_t)pValLen;
×
3636

3637
  taosMemoryFree(tVal);
×
3638
  return code;
×
3639
}
3640
int32_t streamStateFuncDel_rocksdb(SStreamState* pState, const STupleKey* key) {
×
3641
  int code = 0;
×
3642
  STREAM_STATE_DEL_ROCKSDB(pState, "func", key);
×
3643
  return 0;
×
3644
}
3645

3646
// session cf
3647
int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
289✔
3648
  int              code = 0;
289✔
3649
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
289✔
3650
  if (value == NULL || vLen == 0) {
289!
3651
    stError("streamStateSessionPut_rocksdb val: %p, len: %d", value, vLen);
×
3652
  }
3653
  char*  dst = NULL;
289✔
3654
  size_t size = 0;
289✔
3655
  if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) {
289!
3656
    STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, (void*)value, (int32_t)vLen);
×
3657
    return code;
×
3658
  }
3659

3660
  code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size);
289✔
3661
  if (code != 0) {
289!
3662
    return code;
×
3663
  }
3664
  STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, dst, (int32_t)size);
289!
3665
  taosMemoryFree(dst);
289✔
3666

3667
  return code;
289✔
3668
}
3669
int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
250✔
3670
  stDebug("streamStateSessionGet_rocksdb");
250!
3671
  int              code = 0;
250✔
3672
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, key);
250✔
3673
  SSessionKey      resKey = *key;
250✔
3674
  void*            tmp = NULL;
250✔
3675
  int32_t          vLen = 0;
250✔
3676

3677
  code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &resKey, &tmp, &vLen);
250✔
3678
  if (code == 0 && key->win.skey == resKey.win.skey) {
250!
3679
    *key = resKey;
250✔
3680

3681
    if (pVal) {
250!
3682
      *pVal = tmp;
250✔
3683
      tmp = NULL;
250✔
3684
    };
3685
    if (pVLen) *pVLen = vLen;
250!
3686
  } else {
3687
    code = -1;
×
3688
  }
3689

3690
  taosMemoryFree(tmp);
250✔
3691
  streamStateFreeCur(pCur);
250✔
3692
  return code;
250✔
3693
}
3694

3695
int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* key) {
916✔
3696
  int              code = 0;
916✔
3697
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
916✔
3698
  STREAM_STATE_DEL_ROCKSDB(pState, "sess", &sKey);
916!
3699
  return code;
916✔
3700
}
3701

3702
SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState, int64_t groupId) {
510✔
3703
  stDebug("streamStateSessionSeekToLast_rocksdb");
510!
3704

3705
  int32_t code = 0;
510✔
3706

3707
  SSessionKey      maxSessionKey = {.groupId = groupId, .win = {.skey = INT64_MAX, .ekey = INT64_MAX}};
510✔
3708
  SStateSessionKey maxKey = {.key = maxSessionKey, .opNum = pState->number};
510✔
3709

3710
  STREAM_STATE_PUT_ROCKSDB(pState, "sess", &maxKey, "", 0);
510!
3711
  if (code != 0) {
511!
3712
    return NULL;
×
3713
  }
3714
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
511✔
3715

3716
  SStreamStateCur* pCur = createStreamStateCursor();
511✔
3717
  pCur->number = pState->number;
511✔
3718
  pCur->db = wrapper->db;
511✔
3719
  pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
1,022✔
3720
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
511✔
3721

3722
  char    buf[128] = {0};
511✔
3723
  int32_t klen = stateSessionKeyEncode((void*)&maxKey, buf);
511✔
3724
  rocksdb_iter_seek(pCur->iter, buf, (size_t)klen);
511✔
3725
  rocksdb_iter_prev(pCur->iter);
511✔
3726
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
511!
3727
    rocksdb_iter_prev(pCur->iter);
×
3728
  }
3729

3730
  if (!rocksdb_iter_valid(pCur->iter)) {
511!
3731
    streamStateFreeCur(pCur);
511✔
3732
    pCur = NULL;
511✔
3733
  }
3734

3735
  STREAM_STATE_DEL_ROCKSDB(pState, "sess", &maxKey);
511!
3736
  return pCur;
511✔
3737
}
3738

3739
int32_t streamStateSessionCurPrev_rocksdb(SStreamStateCur* pCur) {
×
3740
  stDebug("streamStateCurPrev_rocksdb");
×
3741
  if (!pCur) return -1;
×
3742

3743
  rocksdb_iter_prev(pCur->iter);
×
3744
  return 0;
×
3745
}
3746

3747
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) {
1,866✔
3748
  stDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb");
1,866!
3749

3750
  STaskDbWrapper*  wrapper = pState->pTdbState->pOwner->pBackend;
1,866✔
3751
  SStreamStateCur* pCur = createStreamStateCursor();
1,866✔
3752
  if (pCur == NULL) {
1,866!
3753
    return NULL;
×
3754
  }
3755

3756
  pCur->number = pState->number;
1,866✔
3757
  pCur->db = wrapper->db;
1,866✔
3758
  pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
3,732✔
3759
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
1,866✔
3760
  if (pCur->iter == NULL) {
1,866!
3761
    streamStateFreeCur(pCur);
×
3762
    return NULL;
×
3763
  }
3764

3765
  char             buf[128] = {0};
1,866✔
3766
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
1,866✔
3767
  int              len = stateSessionKeyEncode(&sKey, buf);
1,866✔
3768
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
1,866✔
3769
    streamStateFreeCur(pCur);
1,063✔
3770
    return NULL;
1,063✔
3771
  }
3772
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) rocksdb_iter_prev(pCur->iter);
803!
3773

3774
  if (!rocksdb_iter_valid(pCur->iter)) {
803!
3775
    streamStateFreeCur(pCur);
×
3776
    return NULL;
×
3777
  }
3778

3779
  int32_t          c = 0;
803✔
3780
  size_t           klen;
3781
  const char*      iKey = rocksdb_iter_key(pCur->iter, &klen);
803✔
3782
  SStateSessionKey curKey = {0};
803✔
3783
  TAOS_UNUSED(stateSessionKeyDecode(&curKey, (char*)iKey));
803✔
3784
  if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) >= 0) return pCur;
803✔
3785

3786
  if (!rocksdb_iter_valid(pCur->iter)) {
280!
3787
    streamStateFreeCur(pCur);
×
3788
    return NULL;
×
3789
  }
3790

3791
  rocksdb_iter_prev(pCur->iter);
280✔
3792
  if (!rocksdb_iter_valid(pCur->iter)) {
280✔
3793
    streamStateFreeCur(pCur);
54✔
3794
    return NULL;
54✔
3795
  }
3796
  return pCur;
226✔
3797
}
3798
SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) {
539✔
3799
  stDebug("streamStateSessionSeekKeyCurrentNext_rocksdb");
539!
3800
  STaskDbWrapper*  wrapper = pState->pTdbState->pOwner->pBackend;
539✔
3801
  SStreamStateCur* pCur = createStreamStateCursor();
539✔
3802
  if (pCur == NULL) {
539!
3803
    return NULL;
×
3804
  }
3805
  pCur->db = wrapper->db;
539✔
3806
  pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
1,078✔
3807
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
539✔
3808
  pCur->number = pState->number;
539✔
3809

3810
  char             buf[128] = {0};
539✔
3811
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
539✔
3812
  int              len = stateSessionKeyEncode(&sKey, buf);
539✔
3813

3814
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
539✔
3815
    streamStateFreeCur(pCur);
288✔
3816
    return NULL;
288✔
3817
  }
3818
  if (iterValueIsStale(pCur->iter)) {
251!
3819
    streamStateFreeCur(pCur);
×
3820
    return NULL;
×
3821
  }
3822
  size_t           klen;
3823
  const char*      iKey = rocksdb_iter_key(pCur->iter, &klen);
251✔
3824
  SStateSessionKey curKey = {0};
251✔
3825
  TAOS_UNUSED(stateSessionKeyDecode(&curKey, (char*)iKey));
251✔
3826
  if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) <= 0) return pCur;
251✔
3827

3828
  rocksdb_iter_next(pCur->iter);
1✔
3829
  if (!rocksdb_iter_valid(pCur->iter)) {
1!
3830
    streamStateFreeCur(pCur);
1✔
3831
    return NULL;
1✔
3832
  }
3833
  return pCur;
×
3834
}
3835

3836
SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) {
1,664✔
3837
  stDebug("streamStateSessionSeekKeyNext_rocksdb");
1,664!
3838
  STaskDbWrapper*  wrapper = pState->pTdbState->pOwner->pBackend;
1,664✔
3839
  SStreamStateCur* pCur = createStreamStateCursor();
1,664✔
3840
  if (pCur == NULL) {
1,664!
3841
    return NULL;
×
3842
  }
3843
  pCur->db = wrapper->db;
1,664✔
3844
  pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
3,328✔
3845
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
1,664✔
3846
  pCur->number = pState->number;
1,664✔
3847

3848
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
1,664✔
3849

3850
  char buf[128] = {0};
1,664✔
3851
  int  len = stateSessionKeyEncode(&sKey, buf);
1,664✔
3852
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
1,664✔
3853
    streamStateFreeCur(pCur);
1,220✔
3854
    return NULL;
1,220✔
3855
  }
3856
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) rocksdb_iter_next(pCur->iter);
444!
3857
  if (!rocksdb_iter_valid(pCur->iter)) {
444!
3858
    streamStateFreeCur(pCur);
×
3859
    return NULL;
×
3860
  }
3861

3862
  size_t           klen;
3863
  const char*      iKey = rocksdb_iter_key(pCur->iter, &klen);
444✔
3864
  SStateSessionKey curKey = {0};
444✔
3865
  TAOS_UNUSED(stateSessionKeyDecode(&curKey, (char*)iKey));
444✔
3866
  if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) < 0) return pCur;
444✔
3867

3868
  rocksdb_iter_next(pCur->iter);
136✔
3869
  if (!rocksdb_iter_valid(pCur->iter)) {
136✔
3870
    streamStateFreeCur(pCur);
135✔
3871
    return NULL;
135✔
3872
  }
3873
  return pCur;
1✔
3874
}
3875

3876
SStreamStateCur* streamStateSessionSeekKeyPrev_rocksdb(SStreamState* pState, const SSessionKey* key) {
55✔
3877
  stDebug("streamStateSessionSeekKeyPrev_rocksdb");
55!
3878
  STaskDbWrapper*  wrapper = pState->pTdbState->pOwner->pBackend;
55✔
3879
  SStreamStateCur* pCur = createStreamStateCursor();
55✔
3880
  if (pCur == NULL) {
55!
3881
    return NULL;
×
3882
  }
3883
  pCur->db = wrapper->db;
55✔
3884
  pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
110✔
3885
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
55✔
3886
  pCur->number = pState->number;
55✔
3887

3888
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
55✔
3889

3890
  char buf[128] = {0};
55✔
3891
  int  len = stateSessionKeyEncode(&sKey, buf);
55✔
3892
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
55✔
3893
    streamStateFreeCur(pCur);
54✔
3894
    return NULL;
54✔
3895
  }
3896
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) rocksdb_iter_prev(pCur->iter);
1!
3897
  if (!rocksdb_iter_valid(pCur->iter)) {
1!
3898
    streamStateFreeCur(pCur);
×
3899
    return NULL;
×
3900
  }
3901

3902
  size_t           klen;
3903
  const char*      iKey = rocksdb_iter_key(pCur->iter, &klen);
1✔
3904
  SStateSessionKey curKey = {0};
1✔
3905
  TAOS_UNUSED(stateSessionKeyDecode(&curKey, (char*)iKey));
1✔
3906
  if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) > 0) return pCur;
1!
3907

3908
  rocksdb_iter_prev(pCur->iter);
×
3909
  if (!rocksdb_iter_valid(pCur->iter)) {
×
3910
    streamStateFreeCur(pCur);
×
3911
    return NULL;
×
3912
  }
3913
  return pCur;
×
3914
}
3915

3916
int32_t streamStateSessionGetKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SSessionKey* pKey,
4,444✔
3917
                                             void** pVal, int32_t* pVLen) {
3918
  if (!pCur) {
4,444✔
3919
    return -1;
2,871✔
3920
  }
3921
  SStateSessionKey ktmp = {0};
1,573✔
3922
  size_t           kLen = 0, vLen = 0;
1,573✔
3923

3924
  if (!rocksdb_iter_valid(pCur->iter) || iterValueIsStale(pCur->iter)) {
1,573!
3925
    return -1;
28✔
3926
  }
3927
  const char* curKey = rocksdb_iter_key(pCur->iter, (size_t*)&kLen);
1,545✔
3928
  TAOS_UNUSED(stateSessionKeyDecode((void*)&ktmp, (char*)curKey));
1,545✔
3929

3930
  if (pVal != NULL) *pVal = NULL;
1,545✔
3931
  if (pVLen != NULL) *pVLen = 0;
1,545✔
3932

3933
  SStateSessionKey* pKTmp = &ktmp;
1,545✔
3934
  const char*       vval = rocksdb_iter_value(pCur->iter, (size_t*)&vLen);
1,545✔
3935
  char*             val = NULL;
1,545✔
3936
  int32_t           len = valueDecode((void*)vval, vLen, NULL, &val);
1,545✔
3937
  if (len < 0) {
1,545!
3938
    taosMemoryFree(val);
×
3939
    return -1;
×
3940
  }
3941

3942
  if (pKTmp->opNum != pCur->number) {
1,545!
3943
    taosMemoryFree(val);
×
3944
    return -1;
×
3945
  }
3946
  if (pKey->groupId != 0 && pKey->groupId != pKTmp->key.groupId) {
1,545✔
3947
    taosMemoryFree(val);
622✔
3948
    return -1;
622✔
3949
  }
3950

3951
  char*  tVal = val;
923✔
3952
  size_t tVlen = len;
923✔
3953

3954
  if (pVal != NULL) {
923✔
3955
    if (pState != NULL && pState->pResultRowStore.resultRowGet != NULL && pState->pExprSupp != NULL) {
866!
3956
      int code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, val, len, (char**)&tVal, (size_t*)&tVlen);
866✔
3957
      if (code != 0) {
866!
3958
        taosMemoryFree(val);
×
3959
        return code;
×
3960
      }
3961
      taosMemoryFree(val);
866✔
3962
      *pVal = (char*)tVal;
866✔
3963
    } else {
3964
      *pVal = (char*)tVal;
×
3965
    }
3966
  } else {
3967
    taosMemoryFree(val);
57✔
3968
  }
3969

3970
  if (pVLen != NULL) *pVLen = (int32_t)tVlen;
923✔
3971

3972
  *pKey = pKTmp->key;
923✔
3973
  return 0;
923✔
3974
}
3975
// fill cf
3976
int32_t streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
1,325✔
3977
  int code = 0;
1,325✔
3978

3979
  STREAM_STATE_PUT_ROCKSDB(pState, "fill", key, value, vLen);
1,325!
3980
  return code;
1,325✔
3981
}
3982

3983
int32_t streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
1,648✔
3984
  int code = 0;
1,648✔
3985

3986
  STREAM_STATE_GET_ROCKSDB(pState, "fill", key, pVal, pVLen);
1,648!
3987
  return code;
1,648✔
3988
}
3989
int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) {
204✔
3990
  int code = 0;
204✔
3991
  STREAM_STATE_DEL_ROCKSDB(pState, "fill", key);
204!
3992
  return code;
204✔
3993
}
3994

3995
SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
201✔
3996
  stDebug("streamStateFillGetCur_rocksdb");
201!
3997
  SStreamStateCur* pCur = createStreamStateCursor();
201✔
3998
  STaskDbWrapper*  wrapper = pState->pTdbState->pOwner->pBackend;
201✔
3999

4000
  if (pCur == NULL) return NULL;
201!
4001

4002
  pCur->db = wrapper->db;
201✔
4003
  pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot,
402✔
4004
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
201✔
4005
  pCur->number = pState->number;
201✔
4006

4007
  char buf[128] = {0};
201✔
4008
  int  len = winKeyEncode((void*)key, buf);
201✔
4009
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
201✔
4010
    streamStateFreeCur(pCur);
5✔
4011
    return NULL;
5✔
4012
  }
4013
  if (iterValueIsStale(pCur->iter)) {
196!
4014
    streamStateFreeCur(pCur);
×
4015
    return NULL;
×
4016
  }
4017

4018
  if (rocksdb_iter_valid(pCur->iter)) {
196!
4019
    size_t  kLen;
4020
    SWinKey curKey;
4021
    char*   keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
196✔
4022
    TAOS_UNUSED(winKeyDecode((void*)&curKey, keyStr));
196✔
4023
    if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) == 0) {
196✔
4024
      return pCur;
162✔
4025
    }
4026
  }
4027

4028
  streamStateFreeCur(pCur);
34✔
4029
  return NULL;
34✔
4030
}
4031
int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
3,479✔
4032
  if (!pCur) {
3,479!
4033
    return -1;
×
4034
  }
4035
  SWinKey winKey;
4036
  if (!rocksdb_iter_valid(pCur->iter) || iterValueIsStale(pCur->iter)) {
3,479!
4037
    return -1;
1,153✔
4038
  }
4039
  size_t klen, vlen;
4040
  char*  keyStr = (char*)rocksdb_iter_key(pCur->iter, &klen);
2,326✔
4041
  TAOS_UNUSED(winKeyDecode(&winKey, keyStr));
2,326✔
4042

4043
  const char* valStr = rocksdb_iter_value(pCur->iter, &vlen);
2,326✔
4044
  int32_t     len = valueDecode((void*)valStr, vlen, NULL, (char**)pVal);
2,326✔
4045
  if (len < 0) {
2,326!
4046
    return -1;
×
4047
  }
4048
  if (pVLen != NULL) *pVLen = len;
2,326✔
4049

4050
  *pKey = winKey;
2,326✔
4051
  return 0;
2,326✔
4052
}
4053

4054
SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) {
320✔
4055
  stDebug("streamStateFillSeekKeyNext_rocksdb");
320!
4056
  STaskDbWrapper*  wrapper = pState->pTdbState->pOwner->pBackend;
320✔
4057
  SStreamStateCur* pCur = createStreamStateCursor();
320✔
4058
  if (!pCur) {
320!
4059
    return NULL;
×
4060
  }
4061

4062
  pCur->db = wrapper->db;
320✔
4063
  pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot,
640✔
4064
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
320✔
4065
  pCur->number = pState->number;
320✔
4066

4067
  char buf[128] = {0};
320✔
4068
  int  len = winKeyEncode((void*)key, buf);
320✔
4069
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
320✔
4070
    streamStateFreeCur(pCur);
15✔
4071
    return NULL;
15✔
4072
  }
4073
  // skip stale data
4074
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
305!
4075
    rocksdb_iter_next(pCur->iter);
×
4076
  }
4077

4078
  if (rocksdb_iter_valid(pCur->iter)) {
305!
4079
    SWinKey curKey;
4080
    size_t  kLen = 0;
305✔
4081
    char*   keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
305✔
4082
    TAOS_UNUSED(winKeyDecode((void*)&curKey, keyStr));
305✔
4083
    if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) < 0) {
305!
4084
      return pCur;
×
4085
    }
4086
    rocksdb_iter_next(pCur->iter);
305✔
4087
    return pCur;
305✔
4088
  }
4089
  streamStateFreeCur(pCur);
×
4090
  return NULL;
×
4091
}
4092
SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) {
1,576✔
4093
  stDebug("streamStateFillSeekKeyPrev_rocksdb");
1,576!
4094
  STaskDbWrapper*  wrapper = pState->pTdbState->pOwner->pBackend;
1,576✔
4095
  SStreamStateCur* pCur = createStreamStateCursor();
1,576✔
4096
  if (pCur == NULL) {
1,576!
4097
    return NULL;
×
4098
  }
4099

4100
  pCur->db = wrapper->db;
1,576✔
4101
  pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot,
3,152✔
4102
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
1,576✔
4103
  pCur->number = pState->number;
1,576✔
4104

4105
  char buf[128] = {0};
1,576✔
4106
  int  len = winKeyEncode((void*)key, buf);
1,576✔
4107
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
1,576✔
4108
    streamStateFreeCur(pCur);
28✔
4109
    return NULL;
28✔
4110
  }
4111
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
1,548!
4112
    rocksdb_iter_prev(pCur->iter);
×
4113
  }
4114

4115
  if (rocksdb_iter_valid(pCur->iter)) {
1,548!
4116
    SWinKey curKey;
4117
    size_t  kLen = 0;
1,548✔
4118
    char*   keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
1,548✔
4119
    TAOS_UNUSED(winKeyDecode((void*)&curKey, keyStr));
1,548✔
4120
    if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) > 0) {
1,548✔
4121
      return pCur;
106✔
4122
    }
4123
    rocksdb_iter_prev(pCur->iter);
1,442✔
4124
    return pCur;
1,442✔
4125
  }
4126

4127
  streamStateFreeCur(pCur);
×
4128
  return NULL;
×
4129
}
4130

4131
SStreamStateCur* streamStateFillSeekToLast_rocksdb(SStreamState* pState) {
11✔
4132
  SWinKey key = {.groupId = UINT64_MAX, .ts = INT64_MAX};
11✔
4133
  return streamStateFillSeekKeyNext_rocksdb(pState, &key);
11✔
4134
}
4135

4136
#ifdef BUILD_NO_CALL
4137
int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
4138
  stDebug("streamStateSessionGetKeyByRange_rocksdb");
4139
  STaskDbWrapper*  wrapper = pState->pTdbState->pOwner->pBackend;
4140
  SStreamStateCur* pCur = createStreamStateCursor();
4141
  if (pCur == NULL) {
4142
    return -1;
4143
  }
4144
  pCur->db = wrapper->db;
4145
  pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
4146
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
4147
  pCur->number = pState->number;
4148

4149
  SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
4150
  int32_t          c = 0;
4151
  char             buf[128] = {0};
4152
  int              len = stateSessionKeyEncode(&sKey, buf);
4153
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
4154
    streamStateFreeCur(pCur);
4155
    return -1;
4156
  }
4157

4158
  size_t           kLen;
4159
  const char*      iKeyStr = rocksdb_iter_key(pCur->iter, (size_t*)&kLen);
4160
  SStateSessionKey iKey = {0};
4161
  stateSessionKeyDecode(&iKey, (char*)iKeyStr);
4162

4163
  c = stateSessionKeyCmpr(&sKey, sizeof(sKey), &iKey, sizeof(iKey));
4164

4165
  SSessionKey resKey = *key;
4166
  int32_t     code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &resKey, NULL, NULL);
4167
  if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
4168
    *curKey = resKey;
4169
    streamStateFreeCur(pCur);
4170
    return code;
4171
  }
4172

4173
  if (c > 0) {
4174
    streamStateCurNext_rocksdb(pCur);
4175
    code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &resKey, NULL, NULL);
4176
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
4177
      *curKey = resKey;
4178
      streamStateFreeCur(pCur);
4179
      return code;
4180
    }
4181
  } else if (c < 0) {
4182
    streamStateCurPrev(pState, pCur);
4183
    code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &resKey, NULL, NULL);
4184
    if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
4185
      *curKey = resKey;
4186
      streamStateFreeCur(pCur);
4187
      return code;
4188
    }
4189
  }
4190

4191
  streamStateFreeCur(pCur);
4192
  return -1;
4193
}
4194
#endif
4195

4196
int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
592✔
4197
                                                int32_t* pVLen) {
4198
  stDebug("streamStateSessionAddIfNotExist_rocksdb");
592!
4199
  // todo refactor
4200
  int32_t     res = 0;
592✔
4201
  SSessionKey originKey = *key;
592✔
4202
  SSessionKey searchKey = *key;
592✔
4203
  searchKey.win.skey = key->win.skey - gap;
592✔
4204
  searchKey.win.ekey = key->win.ekey + gap;
592✔
4205
  int32_t valSize = *pVLen;
592✔
4206

4207
  void* tmp = taosMemoryMalloc(valSize);
592✔
4208
  if (tmp == NULL) {
592!
4209
    return terrno;
×
4210
  }
4211

4212
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key);
592✔
4213
  int32_t          code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, key, pVal, pVLen);
592✔
4214

4215
  if (code == 0) {
592✔
4216
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
100✔
4217
      memcpy(tmp, *pVal, *pVLen);
76✔
4218
      taosMemoryFreeClear(*pVal);
76!
4219
      goto _end;
76✔
4220
    }
4221
    taosMemoryFreeClear(*pVal);
24!
4222
    streamStateCurNext_rocksdb(pCur);
24✔
4223
  } else {
4224
    *key = originKey;
492✔
4225
    streamStateFreeCur(pCur);
492✔
4226
    taosMemoryFreeClear(*pVal);
492!
4227
    pCur = streamStateSessionSeekKeyNext_rocksdb(pState, key);
492✔
4228
  }
4229

4230
  code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, key, pVal, pVLen);
516✔
4231
  if (code == 0) {
516✔
4232
    if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
15!
4233
      memcpy(tmp, *pVal, *pVLen);
×
4234
      goto _end;
×
4235
    }
4236
  }
4237

4238
  *key = originKey;
516✔
4239
  res = 1;
516✔
4240
  memset(tmp, 0, valSize);
516✔
4241

4242
_end:
592✔
4243
  taosMemoryFree(*pVal);
592✔
4244
  *pVal = tmp;
592✔
4245
  streamStateFreeCur(pCur);
592✔
4246
  return res;
592✔
4247
}
4248
void streamStateSessionClear_rocksdb(SStreamState* pState) {
155✔
4249
  stDebug("streamStateSessionClear_rocksdb");
155!
4250
  SSessionKey      key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
155✔
4251
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, &key);
155✔
4252

4253
  while (1) {
×
4254
    SSessionKey delKey = {0};
155✔
4255
    void*       buf = NULL;
155✔
4256
    int32_t     size = 0;
155✔
4257
    int32_t     code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &delKey, &buf, &size);
155✔
4258
    if (code == 0 && size > 0) {
155!
4259
      memset(buf, 0, size);
×
4260
      // refactor later
4261
      TAOS_UNUSED(streamStateSessionPut_rocksdb(pState, &delKey, buf, size));
×
4262
    } else {
4263
      taosMemoryFreeClear(buf);
155!
4264
      break;
155✔
4265
    }
4266
    taosMemoryFreeClear(buf);
×
4267

4268
    streamStateCurNext_rocksdb(pCur);
×
4269
  }
4270
  streamStateFreeCur(pCur);
155✔
4271
}
155✔
4272
int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, char* pKeyData,
368✔
4273
                                              int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) {
4274
  stDebug("streamStateStateAddIfNotExist_rocksdb");
368!
4275
  // todo refactor
4276
  int32_t     res = 0;
368✔
4277
  SSessionKey tmpKey = *key;
368✔
4278
  int32_t     valSize = *pVLen;
368✔
4279
  void*       tmp = taosMemoryMalloc(valSize);
368✔
4280
  if (!tmp) {
368!
4281
    return -1;
×
4282
  }
4283

4284
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key);
368✔
4285
  int32_t          code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, key, pVal, pVLen);
368✔
4286
  if (code == 0) {
368✔
4287
    if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) {
21!
4288
      memcpy(tmp, *pVal, valSize);
3✔
4289
      goto _end;
3✔
4290
    }
4291

4292
    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
18✔
4293
    if (fn(pKeyData, stateKey) == true) {
18✔
4294
      memcpy(tmp, *pVal, valSize);
2✔
4295
      goto _end;
2✔
4296
    }
4297

4298
    streamStateCurNext_rocksdb(pCur);
16✔
4299
  } else {
4300
    *key = tmpKey;
347✔
4301
    streamStateFreeCur(pCur);
347✔
4302
    pCur = streamStateSessionSeekKeyNext_rocksdb(pState, key);
347✔
4303
  }
4304
  taosMemoryFreeClear(*pVal);
363✔
4305
  code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, key, pVal, pVLen);
363✔
4306
  if (code == 0) {
363✔
4307
    void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
4✔
4308
    if (fn(pKeyData, stateKey) == true) {
4!
4309
      memcpy(tmp, *pVal, valSize);
×
4310
      goto _end;
×
4311
    }
4312
  }
4313
  taosMemoryFreeClear(*pVal);
363✔
4314

4315
  *key = tmpKey;
363✔
4316
  res = 1;
363✔
4317
  memset(tmp, 0, valSize);
363✔
4318

4319
_end:
368✔
4320
  taosMemoryFreeClear(*pVal);
368✔
4321
  *pVal = tmp;
368✔
4322
  streamStateFreeCur(pCur);
368✔
4323
  return res;
368✔
4324
}
4325

4326
//  partag cf
4327
int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen) {
28✔
4328
  int    code = 0;
28✔
4329
  char*  dst = NULL;
28✔
4330
  size_t size = 0;
28✔
4331
  if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL || tag == NULL) {
28!
4332
    STREAM_STATE_PUT_ROCKSDB(pState, "partag", &groupId, tag, tagLen);
28!
4333
    return code;
28✔
4334
  }
4335
  code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, tag, tagLen, &dst, &size);
×
4336
  if (code != 0) {
×
4337
    return code;
×
4338
  }
4339
  STREAM_STATE_PUT_ROCKSDB(pState, "partag", &groupId, dst, (int32_t)size);
×
4340
  taosMemoryFree(dst);
×
4341
  return code;
×
4342
}
4343

4344
void streamStateParTagSeekKeyNext_rocksdb(SStreamState* pState, const int64_t groupId, SStreamStateCur* pCur) {
563✔
4345
  if (pCur == NULL) {
563!
4346
    return ;
326✔
4347
  }
4348
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
563✔
4349
  pCur->number = pState->number;
563✔
4350
  pCur->db = wrapper->db;
563✔
4351
  pCur->iter = streamStateIterCreate(pState, "partag", (rocksdb_snapshot_t**)&pCur->snapshot,
1,126✔
4352
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
563✔
4353
  int i = streamStateGetCfIdx(pState, "partag");
563✔
4354
  if (i < 0) {
563!
4355
    stError("streamState failed to put to cf name:%s", "partag");
×
4356
    return ;
×
4357
  }
4358

4359
  char    buf[128] = {0};
563✔
4360
  int32_t klen = ginitDict[i].enFunc((void*)&groupId, buf);
563✔
4361
  if (!streamStateIterSeekAndValid(pCur->iter, buf, klen)) {
563✔
4362
    return ;
326✔
4363
  }
4364
  // skip ttl expired data
4365
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
237!
4366
    rocksdb_iter_next(pCur->iter);
×
4367
  }
4368

4369
  if (rocksdb_iter_valid(pCur->iter)) {
237!
4370
    int64_t curGroupId;
4371
    size_t  kLen = 0;
237✔
4372
    char*   keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
237✔
4373
    TAOS_UNUSED(parKeyDecode((void*)&curGroupId, keyStr));
237✔
4374
    if (curGroupId > groupId) return ;
237!
4375

4376
    rocksdb_iter_next(pCur->iter);
237✔
4377
  }
4378
}
4379

4380
int32_t streamStateParTagGetKVByCur_rocksdb(SStreamStateCur* pCur, int64_t* pGroupId, const void** pVal, int32_t* pVLen) {
563✔
4381
  stDebug("streamStateFillGetKVByCur_rocksdb");
563!
4382
  if (!pCur) {
563!
4383
    return -1;
×
4384
  }
4385
  SWinKey winKey;
4386
  if (!rocksdb_iter_valid(pCur->iter) || iterValueIsStale(pCur->iter)) {
563!
4387
    return -1;
563✔
4388
  }
4389

4390
  size_t klen, vlen;
4391
  char*  keyStr = (char*)rocksdb_iter_key(pCur->iter, &klen);
×
4392
  (void)parKeyDecode(pGroupId, keyStr);
×
4393

4394
  if (pVal) {
×
4395
    const char* valStr = rocksdb_iter_value(pCur->iter, &vlen);
×
4396
    int32_t     len = valueDecode((void*)valStr, vlen, NULL, (char**)pVal);
×
4397
    if (len < 0) {
×
4398
      return -1;
×
4399
    }
4400
    if (pVLen != NULL) *pVLen = len;
×
4401
  }
4402

4403
  return 0;
×
4404
}
4405

4406
#ifdef BUILD_NO_CALL
4407
int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen) {
4408
  int    code = 0;
4409
  char*  tVal;
4410
  size_t tValLen = 0;
4411
  STREAM_STATE_GET_ROCKSDB(pState, "partag", &groupId, &tVal, &tValLen);
4412
  if (code != 0) {
4413
    taosMemoryFree(tVal);
4414
    return code;
4415
  }
4416
  code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)tagVal, (size_t*)tagLen);
4417
  taosMemoryFree(tVal);
4418

4419
  return code;
4420
}
4421
#endif
4422
// parname cfg
4423
int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
2,662✔
4424
  int code = 0;
2,662✔
4425
  STREAM_STATE_PUT_ROCKSDB(pState, "parname", &groupId, (char*)tbname, TSDB_TABLE_NAME_LEN);
2,662!
4426
  return code;
2,662✔
4427
}
4428
int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, void** pVal) {
472,797✔
4429
  int    code = 0;
472,797✔
4430
  size_t tagLen;
4431
  STREAM_STATE_GET_ROCKSDB(pState, "parname", &groupId, pVal, &tagLen);
472,797!
4432
  return code;
473,464✔
4433
}
4434

4435
int32_t streamStateDeleteParName_rocksdb(SStreamState* pState, int64_t groupId) {
44✔
4436
  int    code = 0;
44✔
4437
  STREAM_STATE_DEL_ROCKSDB(pState, "parname", &groupId);
44!
4438
  return code;
44✔
4439
}
4440

4441
int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen) {
×
4442
  int code = 0;
×
4443
  STREAM_STATE_PUT_ROCKSDB(pState, "default", key, pVal, pVLen);
×
4444
  return code;
×
4445
}
4446
int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen) {
13,347✔
4447
  int code = 0;
13,347✔
4448
  STREAM_STATE_GET_ROCKSDB(pState, "default", key, pVal, pVLen);
13,347!
4449
  return code;
13,347✔
4450
}
4451
int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key) {
×
4452
  int code = 0;
×
4453
  STREAM_STATE_DEL_ROCKSDB(pState, "default", key);
×
4454
  return code;
×
4455
}
4456

4457
int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result) {
×
4458
  int   code = 0;
×
4459
  char* err = NULL;
×
4460

4461
  STaskDbWrapper*        wrapper = pState->pTdbState->pOwner->pBackend;
×
4462
  rocksdb_snapshot_t*    snapshot = NULL;
×
4463
  rocksdb_readoptions_t* readopts = NULL;
×
4464
  rocksdb_iterator_t*    pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts);
×
4465
  if (pIter == NULL) {
×
4466
    return -1;
×
4467
  }
4468
  size_t klen = 0;
×
4469
  rocksdb_iter_seek(pIter, start, strlen(start));
×
4470
  while (rocksdb_iter_valid(pIter)) {
×
4471
    const char* key = rocksdb_iter_key(pIter, &klen);
×
4472
    int32_t     vlen = 0;
×
4473
    const char* vval = rocksdb_iter_value(pIter, (size_t*)&vlen);
×
4474
    char*       val = NULL;
×
4475
    int32_t     len = valueDecode((void*)vval, vlen, NULL, NULL);
×
4476
    if (len < 0) {
×
4477
      rocksdb_iter_next(pIter);
×
4478
      continue;
×
4479
    }
4480

4481
    if (end != NULL && strcmp(key, end) > 0) {
×
4482
      break;
×
4483
    }
4484
    if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) {
×
4485
      int64_t checkPoint = 0;
×
4486
      if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) {
×
4487
        if (taosArrayPush(result, &checkPoint) == NULL) {
×
4488
          code = terrno;
×
4489
          break;
×
4490
        }
4491
      }
4492
    } else {
4493
      break;
4494
    }
4495
    rocksdb_iter_next(pIter);
×
4496
  }
4497
  rocksdb_release_snapshot(wrapper->db, snapshot);
×
4498
  rocksdb_readoptions_destroy(readopts);
×
4499
  rocksdb_iter_destroy(pIter);
×
4500
  return code;
×
4501
}
4502
#ifdef BUILD_NO_CALL
4503
void* streamDefaultIterCreate_rocksdb(SStreamState* pState) {
4504
  SStreamStateCur* pCur = createStreamStateCursor();
4505
  STaskDbWrapper*  wrapper = pState->pTdbState->pOwner->pBackend;
4506

4507
  pCur->db = wrapper->db;
4508
  pCur->iter = streamStateIterCreate(pState, "default", (rocksdb_snapshot_t**)&pCur->snapshot,
4509
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
4510
  pCur->number = pState->number;
4511
  return pCur;
4512
}
4513
bool streamDefaultIterValid_rocksdb(void* iter) {
4514
  if (iter) {
4515
    return false;
4516
  }
4517
  SStreamStateCur* pCur = iter;
4518
  return (rocksdb_iter_valid(pCur->iter) && !iterValueIsStale(pCur->iter)) ? true : false;
4519
}
4520
void streamDefaultIterSeek_rocksdb(void* iter, const char* key) {
4521
  SStreamStateCur* pCur = iter;
4522
  rocksdb_iter_seek(pCur->iter, key, strlen(key));
4523
}
4524
void streamDefaultIterNext_rocksdb(void* iter) {
4525
  SStreamStateCur* pCur = iter;
4526
  rocksdb_iter_next(pCur->iter);
4527
}
4528
char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len) {
4529
  SStreamStateCur* pCur = iter;
4530
  return (char*)rocksdb_iter_key(pCur->iter, (size_t*)len);
4531
}
4532
char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len) {
4533
  SStreamStateCur* pCur = iter;
4534
  char*            ret = NULL;
4535

4536
  int32_t     vlen = 0;
4537
  const char* val = rocksdb_iter_value(pCur->iter, (size_t*)&vlen);
4538
  *len = valueDecode((void*)val, vlen, NULL, &ret);
4539
  if (*len < 0) {
4540
    taosMemoryFree(ret);
4541
    return NULL;
4542
  }
4543

4544
  return ret;
4545
}
4546
#endif
4547
// batch func
4548
void* streamStateCreateBatch() {
4,561✔
4549
  rocksdb_writebatch_t* pBatch = rocksdb_writebatch_create();
4,561✔
4550
  return pBatch;
4,561✔
4551
}
4552
int32_t streamStateGetBatchSize(void* pBatch) {
116,763✔
4553
  if (pBatch == NULL) return 0;
116,763!
4554
  return rocksdb_writebatch_count(pBatch);
116,763✔
4555
}
4556

4557
void    streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_writebatch_t*)pBatch); }
1,316✔
4558
void    streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); }
4,561✔
4559
int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb_writebatch_t* pBatch, void* key,
3,078✔
4560
                            void* val, int32_t vlen, int64_t ttl) {
4561
  int32_t         code = 0;
3,078✔
4562
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
3,078✔
4563
  TAOS_UNUSED(atomic_add_fetch_64(&wrapper->dataWritten, 1));
3,078✔
4564

4565
  int i = streamStateGetCfIdx(pState, cfKeyName);
3,078✔
4566
  if (i < 0) {
3,078!
4567
    stError("streamState failed to put to cf name:%s", cfKeyName);
×
4568
    return -1;
×
4569
  }
4570

4571
  char    buf[128] = {0};
3,078✔
4572
  int32_t klen = ginitDict[i].enFunc((void*)key, buf);
3,078✔
4573

4574
  char*   ttlV = NULL;
3,077✔
4575
  int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, ttl, &ttlV);
3,077✔
4576

4577
  rocksdb_column_family_handle_t* pCf = wrapper->pCf[ginitDict[i].idx];
3,078✔
4578
  rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
3,078✔
4579
  taosMemoryFree(ttlV);
3,078✔
4580

4581
  {
4582
    char tbuf[256] = {0};
3,078✔
4583
    TAOS_UNUSED(ginitDict[i].toStrFunc((void*)key, tbuf));
3,078✔
4584
    stTrace("streamState str: %s succ to write to %s_%s, len: %d", tbuf, wrapper->idstr, ginitDict[i].key, vlen);
3,078✔
4585
  }
4586
  return 0;
3,078✔
4587
}
4588

4589
int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb_writebatch_t* pBatch, void* key,
114,389✔
4590
                                    void* val, int32_t vlen, int64_t ttl, void* tmpBuf) {
4591
  int32_t code = 0;
114,389✔
4592
  char    buf[128] = {0};
114,389✔
4593

4594
  char*  dst = NULL;
114,389✔
4595
  size_t size = 0;
114,389✔
4596
  if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) {
114,389!
4597
    dst = val;
×
4598
    size = vlen;
×
4599
  } else {
4600
    code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, val, vlen, &dst, &size);
114,389✔
4601
    if (code != 0) {
114,388!
4602
      return code;
×
4603
    }
4604
  }
4605
  int32_t klen = ginitDict[cfIdx].enFunc((void*)key, buf);
114,388✔
4606
  char*   ttlV = tmpBuf;
114,388✔
4607
  int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(dst, size, ttl, &ttlV);
114,388✔
4608

4609
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
114,388✔
4610

4611
  TAOS_UNUSED(atomic_add_fetch_64(&wrapper->dataWritten, 1));
114,388✔
4612

4613
  rocksdb_column_family_handle_t* pCf = wrapper->pCf[ginitDict[cfIdx].idx];
114,388✔
4614
  rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
114,388✔
4615

4616
  if (pState->pResultRowStore.resultRowPut != NULL && pState->pExprSupp != NULL) {
114,389!
4617
    taosMemoryFree(dst);
114,389✔
4618
  }
4619

4620
  if (tmpBuf == NULL) {
114,389!
4621
    taosMemoryFree(ttlV);
×
4622
  }
4623

4624
  {
4625
    char tbuf[256] = {0};
114,389✔
4626
    TAOS_UNUSED(ginitDict[cfIdx].toStrFunc((void*)key, tbuf));
114,389✔
4627
    stTrace("streamState str: %s succ to write to %s_%s", tbuf, wrapper->idstr, ginitDict[cfIdx].key);
114,389✔
4628
  }
4629
  return 0;
114,389✔
4630
}
4631
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) {
4,394✔
4632
  char*           err = NULL;
4,394✔
4633
  STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
4,394✔
4634
  TAOS_UNUSED(atomic_add_fetch_64(&wrapper->dataWritten, 1));
4,394✔
4635
  rocksdb_write(wrapper->db, wrapper->writeOpt, (rocksdb_writebatch_t*)pBatch, &err);
4,394✔
4636
  if (err != NULL) {
4,394!
4637
    stError("streamState failed to write batch, err:%s", err);
×
4638
    taosMemoryFree(err);
×
4639
    return -1;
×
4640
  } else {
4641
    stDebug("write batch to backend:%p", wrapper->db);
4,394✔
4642
  }
4643
  return 0;
4,394✔
4644
}
4645
uint32_t nextPow2(uint32_t x) {
×
4646
  if (x <= 1) return 2;
×
4647
  x = x - 1;
×
4648
  x = x | (x >> 1);
×
4649
  x = x | (x >> 2);
×
4650
  x = x | (x >> 4);
×
4651
  x = x | (x >> 8);
×
4652
  x = x | (x >> 16);
×
4653
  return x + 1;
×
4654
}
4655

4656
#ifdef BUILD_NO_CALL
4657
int32_t copyFiles(const char* src, const char* dst) {
4658
  int32_t code = 0;
4659
  // opt later, just hard link
4660
  int32_t sLen = strlen(src);
4661
  int32_t dLen = strlen(dst);
4662
  char*   srcName = taosMemoryCalloc(1, sLen + 64);
4663
  char*   dstName = taosMemoryCalloc(1, dLen + 64);
4664

4665
  TdDirPtr pDir = taosOpenDir(src);
4666
  if (pDir == NULL) {
4667
    taosMemoryFree(srcName);
4668
    taosMemoryFree(dstName);
4669
    return -1;
4670
  }
4671

4672
  TdDirEntryPtr de = NULL;
4673
  while ((de = taosReadDir(pDir)) != NULL) {
4674
    char* name = taosGetDirEntryName(de);
4675
    if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue;
4676

4677
    sprintf(srcName, "%s%s%s", src, TD_DIRSEP, name);
4678
    sprintf(dstName, "%s%s%s", dst, TD_DIRSEP, name);
4679
    if (!taosDirEntryIsDir(de)) {
4680
      code = taosCopyFile(srcName, dstName);
4681
      if (code == -1) {
4682
        goto _err;
4683
      }
4684
    }
4685

4686
    memset(srcName, 0, sLen + 64);
4687
    memset(dstName, 0, dLen + 64);
4688
  }
4689

4690
_err:
4691
  taosMemoryFreeClear(srcName);
4692
  taosMemoryFreeClear(dstName);
4693
  taosCloseDir(&pDir);
4694
  return code >= 0 ? 0 : -1;
4695
}
4696
#endif
4697

4698
int32_t isBkdDataMeta(char* name, int32_t len) {
×
4699
  const char* pCurrent = "CURRENT";
×
4700
  int32_t     currLen = strlen(pCurrent);
×
4701

4702
  const char* pManifest = "MANIFEST-";
×
4703
  int32_t     maniLen = strlen(pManifest);
×
4704

4705
  if (len >= maniLen && strncmp(name, pManifest, maniLen) == 0) {
×
4706
    return 1;
×
4707
  } else if (len == currLen && strcmp(name, pCurrent) == 0) {
×
4708
    return 1;
×
4709
  }
4710
  return 0;
×
4711
}
4712
int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) {
×
4713
  int32_t code = 0;
×
4714
  size_t  len = 0;
×
4715
  void*   pIter = taosHashIterate(p2, NULL);
×
4716
  while (pIter) {
×
4717
    char* name = taosHashGetKey(pIter, &len);
×
4718
    if (!isBkdDataMeta(name, len) && !taosHashGet(p1, name, len)) {
×
4719
      char* fname = taosMemoryCalloc(1, len + 1);
×
4720
      if (fname == NULL) {
×
4721
        return terrno;
×
4722
      }
4723
      TAOS_UNUSED(strncpy(fname, name, len));
×
4724
      if (taosArrayPush(diff, &fname) == NULL) {
×
4725
        taosMemoryFree(fname);
×
4726
        return terrno;
×
4727
      }
4728
    }
4729
    pIter = taosHashIterate(p2, pIter);
×
4730
  }
4731
  return code;
×
4732
}
4733
int32_t compareHashTable(SHashObj* p1, SHashObj* p2, SArray* add, SArray* del) {
×
4734
  int32_t code = 0;
×
4735

4736
  code = compareHashTableImpl(p1, p2, add);
×
4737
  if (code != 0) {
×
4738
    code = compareHashTableImpl(p2, p1, del);
×
4739
  }
4740

4741
  return code;
×
4742
}
4743

4744
void hashTableToDebug(SHashObj* pTbl, char** buf) {
×
4745
  size_t  sz = taosHashGetSize(pTbl);
×
4746
  int32_t total = 0;
×
4747
  char*   p = taosMemoryCalloc(1, sz * 16 + 4);
×
4748
  void*   pIter = taosHashIterate(pTbl, NULL);
×
4749
  while (pIter) {
×
4750
    size_t len = 0;
×
4751
    char*  name = taosHashGetKey(pIter, &len);
×
4752
    char*  tname = taosMemoryCalloc(1, len + 1);
×
4753
    memcpy(tname, name, len);
×
4754
    total += sprintf(p + total, "%s,", tname);
×
4755

4756
    pIter = taosHashIterate(pTbl, pIter);
×
4757
    taosMemoryFree(tname);
×
4758
  }
4759
  if (total > 0) {
×
4760
    p[total - 1] = 0;
×
4761
  }
4762
  *buf = p;
×
4763
}
×
4764
void strArrayDebugInfo(SArray* pArr, char** buf) {
×
4765
  int32_t sz = taosArrayGetSize(pArr);
×
4766
  if (sz <= 0) return;
×
4767

4768
  char*   p = (char*)taosMemoryCalloc(1, 64 + sz * 64);
×
4769
  int32_t total = 0;
×
4770

4771
  for (int i = 0; i < sz; i++) {
×
4772
    char* name = taosArrayGetP(pArr, i);
×
4773
    total += sprintf(p + total, "%s,", name);
×
4774
  }
4775
  p[total - 1] = 0;
×
4776

4777
  *buf = p;
×
4778
}
4779
void dbChkpDebugInfo(SDbChkp* pDb) {
×
4780
  if (stDebugFlag & DEBUG_INFO) {
×
4781
    char* p[4] = {NULL};
×
4782

4783
    hashTableToDebug(pDb->pSstTbl[pDb->idx], &p[0]);
×
4784
    stTrace("chkp previous file: [%s]", p[0]);
×
4785

4786
    hashTableToDebug(pDb->pSstTbl[1 - pDb->idx], &p[1]);
×
4787
    stTrace("chkp curr file: [%s]", p[1]);
×
4788

4789
    strArrayDebugInfo(pDb->pAdd, &p[2]);
×
4790
    stTrace("chkp newly addded file: [%s]", p[2]);
×
4791

4792
    strArrayDebugInfo(pDb->pDel, &p[3]);
×
4793
    stTrace("chkp newly deleted file: [%s]", p[3]);
×
4794

4795
    for (int i = 0; i < 4; i++) {
×
4796
      taosMemoryFree(p[i]);
×
4797
    }
4798
  }
4799
}
×
4800
int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) {
×
4801
  int32_t code = 0;
×
4802
  int32_t nBytes;
4803
  TAOS_UNUSED(taosThreadRwlockWrlock(&p->rwLock));
×
4804

4805
  p->preCkptId = p->curChkpId;
×
4806
  p->curChkpId = chkpId;
×
4807
  const char* pCurrent = "CURRENT";
×
4808
  int32_t     currLen = strlen(pCurrent);
×
4809

4810
  const char* pManifest = "MANIFEST-";
×
4811
  int32_t     maniLen = strlen(pManifest);
×
4812

4813
  const char* pSST = ".sst";
×
4814
  int32_t     sstLen = strlen(pSST);
×
4815

4816
  memset(p->buf, 0, p->len);
×
4817

4818
  nBytes =
4819
      snprintf(p->buf, p->len, "%s%s%s%scheckpoint%" PRId64 "", p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId);
×
4820
  if (nBytes <= 0 || nBytes >= p->len) {
×
4821
    TAOS_UNUSED(taosThreadRwlockUnlock(&p->rwLock));
×
4822
    return TSDB_CODE_OUT_OF_RANGE;
×
4823
  }
4824

4825
  taosArrayClearP(p->pAdd, taosMemoryFree);
×
4826
  taosArrayClearP(p->pDel, taosMemoryFree);
×
4827
  taosHashClear(p->pSstTbl[1 - p->idx]);
×
4828

4829
  TdDirPtr pDir = taosOpenDir(p->buf);
×
4830
  if (pDir == NULL) {
×
4831
    TAOS_UNUSED(taosThreadRwlockUnlock(&p->rwLock));
×
4832
    return terrno;
×
4833
  }
4834

4835
  TdDirEntryPtr de = NULL;
×
4836
  int8_t        dummy = 0;
×
4837
  while ((de = taosReadDir(pDir)) != NULL) {
×
4838
    char* name = taosGetDirEntryName(de);
×
4839
    if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue;
×
4840
    if (strlen(name) == currLen && strcmp(name, pCurrent) == 0) {
×
4841
      taosMemoryFreeClear(p->pCurrent);
×
4842

4843
      p->pCurrent = taosStrdup(name);
×
4844
      if (p->pCurrent == NULL) {
×
4845
        code = terrno;
×
4846
        break;
×
4847
      }
4848
      continue;
×
4849
    }
4850

4851
    if (strlen(name) >= maniLen && strncmp(name, pManifest, maniLen) == 0) {
×
4852
      taosMemoryFreeClear(p->pManifest);
×
4853
      p->pManifest = taosStrdup(name);
×
4854
      if (p->pManifest == NULL) {
×
4855
        code = terrno;
×
4856
        break;
×
4857
      }
4858
      continue;
×
4859
    }
4860
    if (strlen(name) >= sstLen && strncmp(name + strlen(name) - 4, pSST, sstLen) == 0) {
×
4861
      if (taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy)) != 0) {
×
4862
        break;
×
4863
      }
4864
      continue;
×
4865
    }
4866
  }
4867
  TAOS_UNUSED(taosCloseDir(&pDir));
×
4868
  if (code != 0) {
×
4869
    TAOS_UNUSED(taosThreadRwlockUnlock(&p->rwLock));
×
4870
    return code;
×
4871
  }
4872

4873
  if (p->init == 0) {
×
4874
    void* pIter = taosHashIterate(p->pSstTbl[1 - p->idx], NULL);
×
4875
    while (pIter) {
×
4876
      size_t len = 0;
×
4877
      char*  name = taosHashGetKey(pIter, &len);
×
4878
      if (name != NULL && !isBkdDataMeta(name, len)) {
×
4879
        char* fname = taosMemoryCalloc(1, len + 1);
×
4880
        if (fname == NULL) {
×
4881
          TAOS_UNUSED(taosThreadRwlockUnlock(&p->rwLock));
×
4882
          return terrno;
×
4883
        }
4884

4885
        TAOS_UNUSED(strncpy(fname, name, len));
×
4886
        if (taosArrayPush(p->pAdd, &fname) == NULL) {
×
4887
          taosMemoryFree(fname);
×
4888
          TAOS_UNUSED(taosThreadRwlockUnlock(&p->rwLock));
×
4889
          return terrno;
×
4890
        }
4891
      }
4892
      pIter = taosHashIterate(p->pSstTbl[1 - p->idx], pIter);
×
4893
    }
4894
    if (taosArrayGetSize(p->pAdd) > 0) p->update = 1;
×
4895

4896
    p->init = 1;
×
4897
    p->preCkptId = -1;
×
4898
    p->curChkpId = chkpId;
×
4899
  } else {
4900
    int32_t code = compareHashTable(p->pSstTbl[p->idx], p->pSstTbl[1 - p->idx], p->pAdd, p->pDel);
×
4901
    if (code != 0) {
×
4902
      // dead code
4903
      taosArrayClearP(p->pAdd, taosMemoryFree);
×
4904
      taosArrayClearP(p->pDel, taosMemoryFree);
×
4905
      taosHashClear(p->pSstTbl[1 - p->idx]);
×
4906
      p->update = 0;
×
4907
      return code;
×
4908
    }
4909

4910
    if (taosArrayGetSize(p->pAdd) == 0 && taosArrayGetSize(p->pDel) == 0) {
×
4911
      p->update = 0;
×
4912
    }
4913

4914
    p->preCkptId = p->curChkpId;
×
4915
    p->curChkpId = chkpId;
×
4916
  }
4917

4918
  dbChkpDebugInfo(p);
×
4919

4920
  p->idx = 1 - p->idx;
×
4921

4922
  TAOS_UNUSED(taosThreadRwlockUnlock(&p->rwLock));
×
4923

4924
  return code;
×
4925
}
4926

4927
void dbChkpDestroy(SDbChkp* pChkp);
4928

4929
int32_t dbChkpCreate(char* path, int64_t initChkpId, SDbChkp** ppChkp) {
×
4930
  int32_t  code = 0;
×
4931
  SDbChkp* p = taosMemoryCalloc(1, sizeof(SDbChkp));
×
4932
  if (p == NULL) {
×
4933
    code = terrno;
×
4934
    goto _EXIT;
×
4935
  }
4936

4937
  p->curChkpId = initChkpId;
×
4938
  p->preCkptId = -1;
×
4939
  p->pSST = taosArrayInit(64, sizeof(void*));
×
4940
  if (p->pSST == NULL) {
×
4941
    code = terrno;
×
4942
    dbChkpDestroy(p);
×
4943
    return code;
×
4944
  }
4945

4946
  p->path = path;
×
4947
  p->len = strlen(path) + 128;
×
4948
  p->buf = taosMemoryCalloc(1, p->len);
×
4949
  if (p->buf == NULL) {
×
4950
    code = terrno;
×
4951
    goto _EXIT;
×
4952
  }
4953

4954
  p->idx = 0;
×
4955
  p->pSstTbl[0] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
×
4956
  if (p->pSstTbl[0] == NULL) {
×
4957
    code = terrno;
×
4958
    goto _EXIT;
×
4959
  }
4960

4961
  p->pSstTbl[1] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
×
4962
  if (p->pSstTbl[1] == NULL) {
×
4963
    code = terrno;
×
4964
    goto _EXIT;
×
4965
  }
4966

4967
  p->pAdd = taosArrayInit(64, sizeof(void*));
×
4968
  if (p->pAdd == NULL) {
×
4969
    code = terrno;
×
4970
    goto _EXIT;
×
4971
  }
4972

4973
  p->pDel = taosArrayInit(64, sizeof(void*));
×
4974
  if (p->pDel == NULL) {
×
4975
    code = terrno;
×
4976
    goto _EXIT;
×
4977
  }
4978

4979
  p->update = 0;
×
4980
  TAOS_UNUSED(taosThreadRwlockInit(&p->rwLock, NULL));
×
4981

4982
  SArray* list = NULL;
×
4983
  code = dbChkpGetDelta(p, initChkpId, list);
×
4984
  if (code != 0) {
×
4985
    goto _EXIT;
×
4986
  }
4987
  *ppChkp = p;
×
4988
  return code;
×
4989
_EXIT:
×
4990
  dbChkpDestroy(p);
×
4991
  return code;
×
4992
}
4993

4994
void dbChkpDestroy(SDbChkp* pChkp) {
×
4995
  if (pChkp == NULL) return;
×
4996

4997
  taosMemoryFree(pChkp->buf);
×
4998
  taosMemoryFree(pChkp->path);
×
4999

5000
  taosArrayDestroyP(pChkp->pSST, taosMemoryFree);
×
5001
  taosArrayDestroyP(pChkp->pAdd, taosMemoryFree);
×
5002
  taosArrayDestroyP(pChkp->pDel, taosMemoryFree);
×
5003

5004
  taosHashCleanup(pChkp->pSstTbl[0]);
×
5005
  taosHashCleanup(pChkp->pSstTbl[1]);
×
5006

5007
  taosMemoryFree(pChkp->pCurrent);
×
5008
  taosMemoryFree(pChkp->pManifest);
×
5009
  taosMemoryFree(pChkp);
×
5010
}
5011
#ifdef BUILD_NO_CALL
5012
int32_t dbChkpInit(SDbChkp* p) {
5013
  if (p == NULL) return 0;
5014
  return 0;
5015
}
5016
#endif
5017
int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) {
×
5018
  static char* chkpMeta = "META";
5019
  int32_t      code = 0;
×
5020

5021
  TAOS_UNUSED(taosThreadRwlockRdlock(&p->rwLock));
×
5022

5023
  int32_t cap = p->len + 128;
×
5024

5025
  char* buffer = taosMemoryCalloc(4, cap);
×
5026
  if (buffer == NULL) {
×
5027
    code = terrno;
×
5028
    goto _ERROR;
×
5029
  }
5030

5031
  char* srcBuf = buffer;
×
5032
  char* dstBuf = &srcBuf[cap];
×
5033
  char* srcDir = &dstBuf[cap];
×
5034
  char* dstDir = &srcDir[cap];
×
5035

5036
  int nBytes = snprintf(srcDir, cap, "%s%s%s%s%s%" PRId64 "", p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP,
×
5037
                        "checkpoint", p->curChkpId);
5038
  if (nBytes <= 0 || nBytes >= cap) {
×
5039
    code = TSDB_CODE_OUT_OF_RANGE;
×
5040
    goto _ERROR;
×
5041
  }
5042

5043
  nBytes = snprintf(dstDir, cap, "%s", dname);
×
5044
  if (nBytes <= 0 || nBytes >= cap) {
×
5045
    code = TSDB_CODE_OUT_OF_RANGE;
×
5046
    goto _ERROR;
×
5047
  }
5048

5049
  if (!taosDirExist(srcDir)) {
×
5050
    stError("failed to dump srcDir %s, reason: not exist such dir", srcDir);
×
5051
    code = TSDB_CODE_INVALID_PARA;
×
5052
    goto _ERROR;
×
5053
  }
5054
  int64_t chkpId = 0, processId = -1;
×
5055
  code = chkpLoadExtraInfo(srcDir, &chkpId, &processId);
×
5056
  if (code < 0) {
×
5057
    stError("failed to load extra info from %s, reason:%s", srcDir, code != 0 ? "unkown" : tstrerror(code));
×
5058

5059
    goto _ERROR;
×
5060
  }
5061

5062
  // add file to $name dir
5063
  for (int i = 0; i < taosArrayGetSize(p->pAdd); i++) {
×
5064
    memset(srcBuf, 0, cap);
×
5065
    memset(dstBuf, 0, cap);
×
5066

5067
    char* filename = taosArrayGetP(p->pAdd, i);
×
5068
    nBytes = snprintf(srcBuf, cap, "%s%s%s", srcDir, TD_DIRSEP, filename);
×
5069
    if (nBytes <= 0 || nBytes >= cap) {
×
5070
      code = TSDB_CODE_OUT_OF_RANGE;
×
5071
      goto _ERROR;
×
5072
    }
5073

5074
    nBytes = snprintf(dstBuf, cap, "%s%s%s", dstDir, TD_DIRSEP, filename);
×
5075
    if (nBytes <= 0 || nBytes >= cap) {
×
5076
      code = TSDB_CODE_OUT_OF_RANGE;
×
5077
      goto _ERROR;
×
5078
    }
5079

5080
    if (taosCopyFile(srcBuf, dstBuf) < 0) {
×
5081
      code = TAOS_SYSTEM_ERROR(errno);
×
5082
      stError("failed to copy file from %s to %s, reason:%s", srcBuf, dstBuf, tstrerror(code));
×
5083
      goto _ERROR;
×
5084
    }
5085
  }
5086
  // del file in $name
5087
  for (int i = 0; i < taosArrayGetSize(p->pDel); i++) {
×
5088
    char* filename = taosArrayGetP(p->pDel, i);
×
5089
    char* p = taosStrdup(filename);
×
5090
    if (p == NULL) {
×
5091
      code = terrno;
×
5092
      goto _ERROR;
×
5093
    }
5094
    if (taosArrayPush(list, &p) == NULL) {
×
5095
      taosMemoryFree(p);
×
5096
      code = terrno;
×
5097
      goto _ERROR;
×
5098
    }
5099
  }
5100

5101
  // copy current file to dst dir
5102
  memset(srcBuf, 0, cap);
×
5103
  memset(dstBuf, 0, cap);
×
5104

5105
  nBytes = snprintf(srcBuf, cap, "%s%s%s", srcDir, TD_DIRSEP, p->pCurrent);
×
5106
  if (nBytes <= 0 || nBytes >= cap) {
×
5107
    code = TSDB_CODE_OUT_OF_RANGE;
×
5108
    goto _ERROR;
×
5109
  }
5110

5111
  nBytes = snprintf(dstBuf, cap, "%s%s%s_%" PRId64 "", dstDir, TD_DIRSEP, p->pCurrent, p->curChkpId);
×
5112
  if (nBytes <= 0 || nBytes >= cap) {
×
5113
    code = TSDB_CODE_OUT_OF_RANGE;
×
5114
    goto _ERROR;
×
5115
  }
5116

5117
  if (taosCopyFile(srcBuf, dstBuf) < 0) {
×
5118
    code = TAOS_SYSTEM_ERROR(errno);
×
5119
    stError("failed to copy file from %s to %s, reason:%s", srcBuf, dstBuf, tstrerror(code));
×
5120
    goto _ERROR;
×
5121
  }
5122

5123
  // copy manifest file to dst dir
5124
  memset(srcBuf, 0, cap);
×
5125
  memset(dstBuf, 0, cap);
×
5126

5127
  nBytes = snprintf(srcBuf, cap, "%s%s%s", srcDir, TD_DIRSEP, p->pManifest);
×
5128
  if (nBytes <= 0 || nBytes >= cap) {
×
5129
    code = TSDB_CODE_OUT_OF_RANGE;
×
5130
    goto _ERROR;
×
5131
  }
5132

5133
  nBytes = snprintf(dstBuf, cap, "%s%s%s_%" PRId64 "", dstDir, TD_DIRSEP, p->pManifest, p->curChkpId);
×
5134
  if (nBytes <= 0 || nBytes >= cap) {
×
5135
    code = TSDB_CODE_OUT_OF_RANGE;
×
5136
    goto _ERROR;
×
5137
  }
5138

5139
  if (taosCopyFile(srcBuf, dstBuf) < 0) {
×
5140
    code = terrno;
×
5141
    stError("failed to copy file from %s to %s, reason:%s", srcBuf, dstBuf, tstrerror(code));
×
5142
    goto _ERROR;
×
5143
  }
5144
  memset(dstBuf, 0, cap);
×
5145
  nBytes = snprintf(dstBuf, cap, "%s%s%s", dstDir, TD_DIRSEP, chkpMeta);
×
5146
  if (nBytes <= 0 || nBytes >= cap) {
×
5147
    code = TSDB_CODE_OUT_OF_RANGE;
×
5148
    goto _ERROR;
×
5149
  }
5150

5151
  TdFilePtr pFile = taosOpenFile(dstBuf, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
×
5152
  if (pFile == NULL) {
×
5153
    code = terrno;
×
5154
    stError("chkp failed to create meta file: %s, reason:%s", dstDir, tstrerror(code));
×
5155
    goto _ERROR;
×
5156
  }
5157

5158
  char content[256] = {0};
×
5159
  nBytes = tsnprintf(content, sizeof(content), META_ON_S3_FORMATE, p->pCurrent, p->curChkpId, p->pManifest,
×
5160
                     p->curChkpId, "processVer", processId);
5161
  if (nBytes <= 0 || nBytes >= sizeof(content)) {
×
5162
    code = TSDB_CODE_OUT_OF_RANGE;
×
5163
    stError("chkp failed to format meta file: %s, reason: invalid msg", dstDir);
×
5164
    TAOS_UNUSED(taosCloseFile(&pFile));
×
5165
    goto _ERROR;
×
5166
  }
5167

5168
  nBytes = taosWriteFile(pFile, content, strlen(content));
×
5169
  if (nBytes != strlen(content)) {
×
5170
    code = terrno;
×
5171
    stError("chkp failed to write meta file: %s,reason:%s", dstDir, tstrerror(code));
×
5172
    TAOS_UNUSED(taosCloseFile(&pFile));
×
5173
    goto _ERROR;
×
5174
  }
5175
  TAOS_UNUSED(taosCloseFile(&pFile));
×
5176

5177
  // clear delta data buf
5178
  taosArrayClearP(p->pAdd, taosMemoryFree);
×
5179
  taosArrayClearP(p->pDel, taosMemoryFree);
×
5180
  code = 0;
×
5181

5182
_ERROR:
×
5183
  taosMemoryFree(buffer);
×
5184
  TAOS_UNUSED(taosThreadRwlockUnlock(&p->rwLock));
×
5185
  return code;
×
5186
}
5187

5188
int32_t bkdMgtCreate(char* path, SBkdMgt** mgt) {
12,280✔
5189
  int32_t  code = 0;
12,280✔
5190
  SBkdMgt* p = taosMemoryCalloc(1, sizeof(SBkdMgt));
12,280✔
5191
  if (p == NULL) {
12,282!
5192
    return terrno;
×
5193
  }
5194

5195
  p->pDbChkpTbl = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
12,282✔
5196
  if (p->pDbChkpTbl == NULL) {
12,282!
5197
    code = terrno;
×
5198
    bkdMgtDestroy(p);
×
5199
    return code;
×
5200
  }
5201

5202
  p->path = taosStrdup(path);
12,282✔
5203
  if (p->path == NULL) {
12,281!
5204
    code = terrno;
×
5205
    bkdMgtDestroy(p);
×
5206
    return code;
×
5207
  }
5208

5209
  if (taosThreadRwlockInit(&p->rwLock, NULL) != 0) {
12,281!
5210
    code = TAOS_SYSTEM_ERROR(errno);
×
5211
    bkdMgtDestroy(p);
×
5212
    return code;
×
5213
  }
5214
  *mgt = p;
12,281✔
5215

5216
  return code;
12,281✔
5217
}
5218

5219
void bkdMgtDestroy(SBkdMgt* bm) {
12,281✔
5220
  if (bm == NULL) return;
12,281!
5221
  void* pIter = taosHashIterate(bm->pDbChkpTbl, NULL);
12,281✔
5222
  while (pIter) {
12,281!
5223
    SDbChkp* pChkp = *(SDbChkp**)(pIter);
×
5224
    dbChkpDestroy(pChkp);
×
5225

5226
    pIter = taosHashIterate(bm->pDbChkpTbl, pIter);
×
5227
  }
5228

5229
  TAOS_UNUSED(taosThreadRwlockDestroy(&bm->rwLock));
12,281✔
5230
  taosMemoryFree(bm->path);
12,281✔
5231
  taosHashCleanup(bm->pDbChkpTbl);
12,282✔
5232

5233
  taosMemoryFree(bm);
12,282✔
5234
}
5235
int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, char* dname) {
×
5236
  int32_t code = 0;
×
5237
  TAOS_UNUSED(taosThreadRwlockWrlock(&bm->rwLock));
×
5238
  SDbChkp** ppChkp = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId));
×
5239
  SDbChkp*  pChkp = ppChkp != NULL ? *ppChkp : NULL;
×
5240

5241
  if (pChkp == NULL) {
×
5242
    int32_t cap = strlen(bm->path) + 64;
×
5243
    char*   path = taosMemoryCalloc(1, cap);
×
5244
    if (path == NULL) {
×
5245
      TAOS_UNUSED(taosThreadRwlockUnlock(&bm->rwLock));
×
5246
      return terrno;
×
5247
    }
5248

5249
    int32_t nBytes = snprintf(path, cap, "%s%s%s", bm->path, TD_DIRSEP, taskId);
×
5250
    if (nBytes <= 0 || nBytes >= cap) {
×
5251
      taosMemoryFree(path);
×
5252
      TAOS_UNUSED(taosThreadRwlockUnlock(&bm->rwLock));
×
5253
      code = TSDB_CODE_OUT_OF_RANGE;
×
5254
      return code;
×
5255
    }
5256

5257
    SDbChkp* p = NULL;
×
5258
    code = dbChkpCreate(path, chkpId, &p);
×
5259
    if (code != 0) {
×
5260
      taosMemoryFree(path);
×
5261
      TAOS_UNUSED(taosThreadRwlockUnlock(&bm->rwLock));
×
5262
      return code;
×
5263
    }
5264

5265
    if (taosHashPut(bm->pDbChkpTbl, taskId, strlen(taskId), &p, sizeof(void*)) != 0) {
×
5266
      dbChkpDestroy(p);
×
5267
      TAOS_UNUSED(taosThreadRwlockUnlock(&bm->rwLock));
×
5268
      code = terrno;
×
5269
      return code;
×
5270
    }
5271

5272
    pChkp = p;
×
5273
    code = dbChkpDumpTo(pChkp, dname, list);
×
5274
    TAOS_UNUSED(taosThreadRwlockUnlock(&bm->rwLock));
×
5275
    return code;
×
5276
  } else {
5277
    code = dbChkpGetDelta(pChkp, chkpId, NULL);
×
5278
    if (code == 0) {
×
5279
      code = dbChkpDumpTo(pChkp, dname, list);
×
5280
    }
5281
  }
5282

5283
  TAOS_UNUSED(taosThreadRwlockUnlock(&bm->rwLock));
×
5284
  return code;
×
5285
}
5286

5287
#ifdef BUILD_NO_CALL
5288
int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path) {
5289
  int32_t code = -1;
5290

5291
  taosThreadRwlockWrlock(&bm->rwLock);
5292
  SDbChkp** pp = taosHashGet(bm->pDbChkpTbl, task, strlen(task));
5293
  if (pp == NULL) {
5294
    SDbChkp* p = NULL;
5295
    code = dbChkpCreate(path, 0, &p);
5296
    if (code != 0) {
5297
      taosHashPut(bm->pDbChkpTbl, task, strlen(task), &p, sizeof(void*));
5298
      code = 0;
5299
    }
5300
  } else {
5301
    stError("task chkp already exists");
5302
  }
5303

5304
  taosThreadRwlockUnlock(&bm->rwLock);
5305

5306
  return code;
5307
}
5308

5309
int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname) {
5310
  int32_t code = 0;
5311
  taosThreadRwlockRdlock(&bm->rwLock);
5312

5313
  SDbChkp* p = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId));
5314
  code = dbChkpDumpTo(p, dname, NULL);
5315

5316
  taosThreadRwlockUnlock(&bm->rwLock);
5317
  return code;
5318
}
5319
#endif
5320

5321
SStreamStateCur* streamStateSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) {
18✔
5322
  stDebug("streamStateSeekKeyPrev_rocksdb");
18!
5323
  STaskDbWrapper*  wrapper = pState->pTdbState->pOwner->pBackend;
18✔
5324
  SStreamStateCur* pCur = createStreamStateCursor();
18✔
5325
  if (pCur == NULL) {
18!
5326
    return NULL;
×
5327
  }
5328

5329
  pCur->db = wrapper->db;
18✔
5330
  pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
36✔
5331
                                     (rocksdb_readoptions_t**)&pCur->readOpt);
18✔
5332
  pCur->number = pState->number;
18✔
5333

5334
  char buf[128] = {0};
18✔
5335
  int  len = winKeyEncode((void*)key, buf);
18✔
5336
  if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
18!
5337
    streamStateFreeCur(pCur);
18✔
5338
    return NULL;
18✔
5339
  }
5340
  while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
×
5341
    rocksdb_iter_prev(pCur->iter);
×
5342
  }
5343

5344
  if (rocksdb_iter_valid(pCur->iter)) {
×
5345
    SWinKey curKey;
5346
    size_t  kLen = 0;
×
5347
    char*   keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
×
5348
    TAOS_UNUSED(winKeyDecode((void*)&curKey, keyStr));
×
5349
    if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) > 0) {
×
5350
      return pCur;
×
5351
    }
5352
    rocksdb_iter_prev(pCur->iter);
×
5353
    return pCur;
×
5354
  }
5355

5356
  streamStateFreeCur(pCur);
×
5357
  return NULL;
×
5358
}
5359

5360
int32_t streamStateGetGroupKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
18✔
5361
  if (!pCur) {
18!
5362
    return -1;
18✔
5363
  }
5364
  uint64_t groupId = pKey->groupId;
×
5365

5366
  int32_t code = streamStateGetKVByCur_rocksdb(pState, pCur, pKey, pVal, pVLen);
×
5367
  if (code == 0) {
×
5368
    if (pKey->groupId == groupId) {
×
5369
      return 0;
×
5370
    }
5371
    if (pVal != NULL) {
×
5372
      taosMemoryFree((void*)*pVal);
×
5373
      *pVal = NULL;
×
5374
    }
5375
  }
5376
  return -1;
×
5377
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc