• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3548

04 Dec 2024 01:03PM UTC coverage: 59.846% (-0.8%) from 60.691%
#3548

push

travis-ci

web-flow
Merge pull request #29033 from taosdata/fix/calculate-vnode-memory-used

fix/calculate-vnode-memory-used

118484 of 254183 branches covered (46.61%)

Branch coverage included in aggregate %.

199691 of 277471 relevant lines covered (71.97%)

18794141.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

40.97
/source/dnode/mnode/sdb/src/sdbFile.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "crypt.h"
18
#include "sdb.h"
19
#include "sync.h"
20
#include "tchecksum.h"
21
#include "tglobal.h"
22
#include "wal.h"
23

24
#define SDB_TABLE_SIZE   24
25
#define SDB_RESERVE_SIZE 512
26
#define SDB_FILE_VER     1
27

28
#define SDB_TABLE_SIZE_EXTRA   SDB_MAX
29
#define SDB_RESERVE_SIZE_EXTRA (512 - (SDB_TABLE_SIZE_EXTRA - SDB_TABLE_SIZE) * 2 * sizeof(int64_t))
30

31
static int32_t sdbDeployData(SSdb *pSdb) {
1,221✔
32
  int32_t code = 0;
1,221✔
33
  mInfo("start to deploy sdb");
1,221!
34

35
  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
36,630✔
36
    SdbDeployFp fp = pSdb->deployFps[i];
35,409✔
37
    if (fp == NULL) continue;
35,409✔
38

39
    mInfo("start to deploy sdb:%s", sdbTableName(i));
6,105!
40
    code = (*fp)(pSdb->pMnode);
6,105✔
41
    if (code != 0) {
6,105!
42
      mError("failed to deploy sdb:%s since %s", sdbTableName(i), tstrerror(code));
×
43
      return -1;
×
44
    }
45
  }
46

47
  mInfo("sdb deploy success");
1,221!
48
  return 0;
1,221✔
49
}
50

51
static void sdbResetData(SSdb *pSdb) {
480✔
52
  mInfo("start to reset sdb");
480!
53

54
  for (ESdbType i = 0; i < SDB_MAX; ++i) {
14,400✔
55
    SHashObj *hash = pSdb->hashObjs[i];
13,920✔
56
    if (hash == NULL) continue;
13,920✔
57

58
    sdbWriteLock(pSdb, i);
12,000✔
59

60
    SSdbRow **ppRow = taosHashIterate(hash, NULL);
12,000✔
61
    while (ppRow != NULL) {
12,000!
62
      SSdbRow *pRow = *ppRow;
×
63
      if (pRow == NULL) continue;
×
64

65
      sdbFreeRow(pSdb, pRow, true);
×
66
      ppRow = taosHashIterate(hash, ppRow);
×
67
    }
68

69
    taosHashClear(pSdb->hashObjs[i]);
12,000✔
70
    pSdb->tableVer[i] = 0;
12,000✔
71
    pSdb->maxId[i] = 0;
12,000✔
72

73
    sdbUnLock(pSdb, i);
12,000✔
74

75
    mInfo("sdb:%s is reset", sdbTableName(i));
12,000!
76
  }
77

78
  pSdb->applyIndex = -1;
480✔
79
  pSdb->applyTerm = -1;
480✔
80
  pSdb->applyConfig = -1;
480✔
81
  pSdb->commitIndex = -1;
480✔
82
  pSdb->commitTerm = -1;
480✔
83
  pSdb->commitConfig = -1;
480✔
84
  mInfo("sdb reset success");
480!
85
}
480✔
86

87
static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
388✔
88
  int32_t code = 0;
388✔
89
  int64_t sver = 0;
388✔
90
  int32_t ret = taosReadFile(pFile, &sver, sizeof(int64_t));
388✔
91
  if (ret < 0) {
388!
92
    return terrno;
×
93
  }
94
  if (ret != sizeof(int64_t)) {
388!
95
    code = TSDB_CODE_FILE_CORRUPTED;
×
96
    TAOS_RETURN(code);
×
97
  }
98
  if (sver != SDB_FILE_VER) {
388!
99
    code = TSDB_CODE_FILE_CORRUPTED;
×
100
    TAOS_RETURN(code);
×
101
  }
102

103
  ret = taosReadFile(pFile, &pSdb->applyIndex, sizeof(int64_t));
388✔
104
  if (ret < 0) {
388!
105
    return terrno;
×
106
  }
107
  if (ret != sizeof(int64_t)) {
388!
108
    code = TSDB_CODE_FILE_CORRUPTED;
×
109
    TAOS_RETURN(code);
×
110
  }
111

112
  ret = taosReadFile(pFile, &pSdb->applyTerm, sizeof(int64_t));
388✔
113
  if (ret < 0) {
388!
114
    return terrno;
×
115
  }
116
  if (ret != sizeof(int64_t)) {
388!
117
    code = TSDB_CODE_FILE_CORRUPTED;
×
118
    TAOS_RETURN(code);
×
119
  }
120

121
  ret = taosReadFile(pFile, &pSdb->applyConfig, sizeof(int64_t));
388✔
122
  if (ret < 0) {
388!
123
    return terrno;
×
124
  }
125
  if (ret != sizeof(int64_t)) {
388!
126
    code = TSDB_CODE_FILE_CORRUPTED;
×
127
    TAOS_RETURN(code);
×
128
  }
129

130
  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
9,700✔
131
    int64_t maxId = 0;
9,312✔
132
    ret = taosReadFile(pFile, &maxId, sizeof(int64_t));
9,312✔
133
    if (ret < 0) {
9,312!
134
      return terrno;
×
135
    }
136
    if (ret != sizeof(int64_t)) {
9,312!
137
      code = TSDB_CODE_FILE_CORRUPTED;
×
138
      TAOS_RETURN(code);
×
139
    }
140
    if (i < SDB_MAX) {
9,312!
141
      pSdb->maxId[i] = maxId;
9,312✔
142
    }
143
  }
144

145
  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
9,700✔
146
    int64_t ver = 0;
9,312✔
147
    ret = taosReadFile(pFile, &ver, sizeof(int64_t));
9,312✔
148
    if (ret < 0) {
9,312!
149
      return terrno;
×
150
    }
151
    if (ret != sizeof(int64_t)) {
9,312!
152
      code = TSDB_CODE_FILE_CORRUPTED;
×
153
      TAOS_RETURN(code);
×
154
    }
155
    if (i < SDB_MAX) {
9,312!
156
      pSdb->tableVer[i] = ver;
9,312✔
157
    }
158
  }
159

160
  // for sdb compatibility
161
  for (int32_t i = SDB_TABLE_SIZE; i < SDB_TABLE_SIZE_EXTRA; ++i) {
2,328✔
162
    int64_t maxId = 0;
1,940✔
163
    ret = taosReadFile(pFile, &maxId, sizeof(int64_t));
1,940✔
164
    if (ret < 0) {
1,940!
165
      code = TAOS_SYSTEM_ERROR(errno);
×
166
      TAOS_RETURN(code);
×
167
    }
168
    if (ret != sizeof(int64_t)) {
1,940!
169
      code = TSDB_CODE_FILE_CORRUPTED;
×
170
      TAOS_RETURN(code);
×
171
    }
172
    if (i < SDB_MAX) {
1,940!
173
      pSdb->maxId[i] = maxId;
1,940✔
174
    }
175

176
    int64_t ver = 0;
1,940✔
177
    ret = taosReadFile(pFile, &ver, sizeof(int64_t));
1,940✔
178
    if (ret < 0) {
1,940!
179
      code = TAOS_SYSTEM_ERROR(errno);
×
180
      TAOS_RETURN(code);
×
181
    }
182
    if (ret != sizeof(int64_t)) {
1,940!
183
      code = TSDB_CODE_FILE_CORRUPTED;
×
184
      TAOS_RETURN(code);
×
185
    }
186
    if (i < SDB_MAX) {
1,940!
187
      pSdb->tableVer[i] = ver;
1,940✔
188
    }
189
  }
190

191
  char reserve[SDB_RESERVE_SIZE_EXTRA] = {0};
388✔
192
  ret = taosReadFile(pFile, reserve, sizeof(reserve));
388✔
193
  if (ret < 0) {
388!
194
    return terrno;
×
195
  }
196
  if (ret != sizeof(reserve)) {
388!
197
    code = TSDB_CODE_FILE_CORRUPTED;
×
198
    TAOS_RETURN(code);
×
199
  }
200

201
  return 0;
388✔
202
}
203

204
static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) {
2,898✔
205
  int64_t sver = SDB_FILE_VER;
2,898✔
206
  if (taosWriteFile(pFile, &sver, sizeof(int64_t)) != sizeof(int64_t)) {
2,898!
207
    return terrno;
×
208
  }
209

210
  mInfo("vgId:1, write sdb file with sdb applyIndex:%" PRId64 " term:%" PRId64 " config:%" PRId64, pSdb->applyIndex,
2,898!
211
        pSdb->applyTerm, pSdb->applyConfig);
212
  if (taosWriteFile(pFile, &pSdb->applyIndex, sizeof(int64_t)) != sizeof(int64_t)) {
2,898!
213
    return terrno;
×
214
  }
215

216
  if (taosWriteFile(pFile, &pSdb->applyTerm, sizeof(int64_t)) != sizeof(int64_t)) {
2,898!
217
    return terrno;
×
218
  }
219

220
  if (taosWriteFile(pFile, &pSdb->applyConfig, sizeof(int64_t)) != sizeof(int64_t)) {
2,898!
221
    return terrno;
×
222
  }
223

224
  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
72,450✔
225
    int64_t maxId = 0;
69,552✔
226
    if (i < SDB_MAX) {
69,552!
227
      maxId = pSdb->maxId[i];
69,552✔
228
    }
229
    if (taosWriteFile(pFile, &maxId, sizeof(int64_t)) != sizeof(int64_t)) {
69,552!
230
      return terrno;
×
231
    }
232
  }
233

234
  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
72,450✔
235
    int64_t ver = 0;
69,552✔
236
    if (i < SDB_MAX) {
69,552!
237
      ver = pSdb->tableVer[i];
69,552✔
238
    }
239
    if (taosWriteFile(pFile, &ver, sizeof(int64_t)) != sizeof(int64_t)) {
69,552!
240
      return terrno;
×
241
    }
242
  }
243

244
  // for sdb compatibility
245
  for (int32_t i = SDB_TABLE_SIZE; i < SDB_TABLE_SIZE_EXTRA; ++i) {
17,388✔
246
    int64_t maxId = 0;
14,490✔
247
    if (i < SDB_MAX) {
14,490!
248
      maxId = pSdb->maxId[i];
14,490✔
249
    }
250
    if (taosWriteFile(pFile, &maxId, sizeof(int64_t)) != sizeof(int64_t)) {
14,490!
251
      return terrno;
×
252
    }
253

254
    int64_t ver = 0;
14,490✔
255
    if (i < SDB_MAX) {
14,490!
256
      ver = pSdb->tableVer[i];
14,490✔
257
    }
258
    if (taosWriteFile(pFile, &ver, sizeof(int64_t)) != sizeof(int64_t)) {
14,490!
259
      return terrno;
×
260
    }
261
  }
262

263
  char reserve[SDB_RESERVE_SIZE_EXTRA] = {0};
2,898✔
264
  if (taosWriteFile(pFile, reserve, sizeof(reserve)) != sizeof(reserve)) {
2,898!
265
    return terrno;
×
266
  }
267

268
  return 0;
2,898✔
269
}
270

271
static int32_t sdbReadFileImp(SSdb *pSdb) {
480✔
272
  int64_t offset = 0;
480✔
273
  int32_t code = 0;
480✔
274
  int32_t readLen = 0;
480✔
275
  int64_t ret = 0;
480✔
276
  char    file[PATH_MAX] = {0};
480✔
277
  int32_t bufLen = TSDB_MAX_MSG_SIZE;
480✔
278

279
  snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
480✔
280
  mInfo("start to read sdb file:%s", file);
480!
281

282
  SSdbRaw *pRaw = taosMemoryMalloc(bufLen + 100);
480✔
283
  if (pRaw == NULL) {
480!
284
    code = terrno;
×
285
    mError("failed read sdb file since %s", tstrerror(code));
×
286
    TAOS_RETURN(code);
×
287
  }
288

289
  TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
480✔
290
  if (pFile == NULL) {
480✔
291
    taosMemoryFree(pRaw);
92✔
292
    code = terrno;
92✔
293
    mInfo("read sdb file:%s finished since %s", file, tstrerror(code));
92!
294
    return 0;
92✔
295
  }
296

297
  code = sdbReadFileHead(pSdb, pFile);
388✔
298
  if (code != 0) {
388!
299
    mError("failed to read sdb file:%s head since %s", file, tstrerror(code));
×
300
    taosMemoryFree(pRaw);
×
301
    int32_t ret = 0;
×
302
    if ((ret = taosCloseFile(&pFile)) != 0) {
×
303
      mError("failed to close sdb file:%s since %s", file, tstrerror(ret));
×
304
    }
305
    return code;
×
306
  }
307

308
  int64_t tableVer[SDB_MAX] = {0};
388✔
309
  memcpy(tableVer, pSdb->tableVer, sizeof(tableVer));
388✔
310

311
  while (1) {
6,900✔
312
    readLen = sizeof(SSdbRaw);
7,288✔
313
    ret = taosReadFile(pFile, pRaw, readLen);
7,288✔
314
    if (ret == 0) break;
7,288✔
315

316
    if (ret < 0) {
6,900!
317
      code = terrno;
×
318
      mError("failed to read sdb file:%s since %s", file, tstrerror(code));
×
319
      goto _OVER;
×
320
    }
321

322
    if (ret != readLen) {
6,900!
323
      code = TSDB_CODE_FILE_CORRUPTED;
×
324
      mError("failed to read sdb file:%s since %s, ret:%" PRId64 " != readLen:%d", file, tstrerror(code), ret, readLen);
×
325
      goto _OVER;
×
326
    }
327

328
    readLen = pRaw->dataLen + sizeof(int32_t);
6,900✔
329
    if (tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_SDB) == DND_CS_SDB) {
6,900!
330
      readLen = ENCRYPTED_LEN(pRaw->dataLen) + sizeof(int32_t);
×
331
    }
332
    if (readLen >= bufLen) {
6,900!
333
      bufLen = pRaw->dataLen * 2;
×
334
      SSdbRaw *pNewRaw = taosMemoryMalloc(bufLen + 100);
×
335
      if (pNewRaw == NULL) {
×
336
        code = terrno;
×
337
        mError("failed read sdb file since malloc new sdbRaw size:%d failed", bufLen);
×
338
        goto _OVER;
×
339
      }
340
      mInfo("malloc new sdb raw size:%d, type:%d", bufLen, pRaw->type);
×
341
      memcpy(pNewRaw, pRaw, sizeof(SSdbRaw));
×
342
      sdbFreeRaw(pRaw);
×
343
      pRaw = pNewRaw;
×
344
    }
345

346
    ret = taosReadFile(pFile, pRaw->pData, readLen);
6,900✔
347
    if (ret < 0) {
6,900!
348
      code = terrno;
×
349
      mError("failed to read sdb file:%s since %s, ret:%" PRId64 " readLen:%d", file, tstrerror(code), ret, readLen);
×
350
      goto _OVER;
×
351
    }
352

353
    if (ret != readLen) {
6,900!
354
      code = TSDB_CODE_FILE_CORRUPTED;
×
355
      mError("failed to read sdb file:%s since %s, ret:%" PRId64 " != readLen:%d", file, tstrerror(code), ret, readLen);
×
356
      goto _OVER;
×
357
    }
358

359
    if (tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_SDB) == DND_CS_SDB) {
6,900!
360
      int32_t count = 0;
×
361

362
      char *plantContent = taosMemoryMalloc(ENCRYPTED_LEN(pRaw->dataLen));
×
363
      if (plantContent == NULL) {
×
364
        code = terrno;
×
365
        goto _OVER;
×
366
      }
367

368
      SCryptOpts opts;
369
      opts.len = ENCRYPTED_LEN(pRaw->dataLen);
×
370
      opts.source = pRaw->pData;
×
371
      opts.result = plantContent;
×
372
      opts.unitLen = 16;
×
373
      strncpy(opts.key, tsEncryptKey, ENCRYPT_KEY_LEN);
×
374

375
      count = CBC_Decrypt(&opts);
×
376

377
      // mDebug("read sdb, CBC_Decrypt dataLen:%d, descrypted len:%d, %s", pRaw->dataLen, count, __FUNCTION__);
378

379
      memcpy(pRaw->pData, plantContent, pRaw->dataLen);
×
380
      taosMemoryFree(plantContent);
×
381
      memcpy(pRaw->pData + pRaw->dataLen, &pRaw->pData[ENCRYPTED_LEN(pRaw->dataLen)], sizeof(int32_t));
×
382
    }
383

384
    int32_t totalLen = sizeof(SSdbRaw) + pRaw->dataLen + sizeof(int32_t);
6,900✔
385
    if ((!taosCheckChecksumWhole((const uint8_t *)pRaw, totalLen)) != 0) {
13,800!
386
      code = TSDB_CODE_CHECKSUM_ERROR;
×
387
      mError("failed to read sdb file:%s since %s, readLen:%d", file, tstrerror(code), readLen);
×
388
      goto _OVER;
×
389
    }
390

391
    code = sdbWriteWithoutFree(pSdb, pRaw);
6,900✔
392
    if (code != 0) {
6,900!
393
      mError("failed to read sdb file:%s since %s", file, terrstr());
×
394
      goto _OVER;
×
395
    }
396
  }
397

398
  code = 0;
388✔
399
  pSdb->commitIndex = pSdb->applyIndex;
388✔
400
  pSdb->commitTerm = pSdb->applyTerm;
388✔
401
  pSdb->commitConfig = pSdb->applyConfig;
388✔
402
  memcpy(pSdb->tableVer, tableVer, sizeof(tableVer));
388✔
403
  mInfo("vgId:1, trans:0, read sdb file:%s success, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64, file,
388!
404
        pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig);
405

406
_OVER:
×
407
  if ((ret = taosCloseFile(&pFile)) != 0) {
388!
408
    mError("failed to close sdb file:%s since %s", file, tstrerror(ret));
×
409
  }
410
  sdbFreeRaw(pRaw);
388✔
411

412
  TAOS_RETURN(code);
388✔
413
}
414

415
int32_t sdbReadFile(SSdb *pSdb) {
480✔
416
  (void)taosThreadMutexLock(&pSdb->filelock);
480✔
417

418
  sdbResetData(pSdb);
480✔
419
  int32_t code = sdbReadFileImp(pSdb);
480✔
420
  if (code != 0) {
480!
421
    mError("failed to read sdb file since %s", tstrerror(code));
×
422
    sdbResetData(pSdb);
×
423
  }
424

425
  (void)taosThreadMutexUnlock(&pSdb->filelock);
480✔
426
  return code;
480✔
427
}
428

429
static int32_t sdbWriteFileImp(SSdb *pSdb, int32_t skip_type) {
2,898✔
430
  int32_t code = 0;
2,898✔
431

432
  char tmpfile[PATH_MAX] = {0};
2,898✔
433
  snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
2,898✔
434
  char curfile[PATH_MAX] = {0};
2,898✔
435
  snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
2,898✔
436

437
  mInfo("start to write sdb file, apply index:%" PRId64 " term:%" PRId64 " config:%" PRId64 ", commit index:%" PRId64
2,898!
438
        " term:%" PRId64 " config:%" PRId64 ", file:%s",
439
        pSdb->applyIndex, pSdb->applyTerm, pSdb->applyConfig, pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig,
440
        curfile);
441

442
  TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
2,898✔
443
  if (pFile == NULL) {
2,898!
444
    code = terrno;
×
445
    mError("failed to open sdb file:%s for write since %s", tmpfile, tstrerror(code));
×
446
    TAOS_RETURN(code);
×
447
  }
448

449
  code = sdbWriteFileHead(pSdb, pFile);
2,898✔
450
  if (code != 0) {
2,898!
451
    mError("failed to write sdb file:%s head since %s", tmpfile, tstrerror(code));
×
452
    int32_t ret = 0;
×
453
    if ((ret = taosCloseFile(&pFile)) != 0) {
×
454
      mError("failed to close sdb file:%s since %s", tmpfile, tstrerror(ret));
×
455
    }
456
    return code;
×
457
  }
458

459
  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
86,940✔
460
    if (i == skip_type) continue;
84,042!
461
    SdbEncodeFp encodeFp = pSdb->encodeFps[i];
84,042✔
462
    if (encodeFp == NULL) continue;
84,042✔
463

464
    mInfo("write %s to sdb file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i));
72,450!
465

466
    SHashObj *hash = pSdb->hashObjs[i];
72,450✔
467
    sdbWriteLock(pSdb, i);
72,450✔
468

469
    SSdbRow **ppRow = taosHashIterate(hash, NULL);
72,450✔
470
    while (ppRow != NULL) {
126,832✔
471
      SSdbRow *pRow = *ppRow;
54,382✔
472
      if (pRow == NULL) {
54,382!
473
        ppRow = taosHashIterate(hash, ppRow);
×
474
        continue;
×
475
      }
476

477
      if (pRow->status != SDB_STATUS_READY && pRow->status != SDB_STATUS_DROPPING) {
54,382✔
478
        sdbPrintOper(pSdb, pRow, "not-write");
350✔
479
        ppRow = taosHashIterate(hash, ppRow);
350✔
480
        continue;
350✔
481
      }
482

483
      sdbPrintOper(pSdb, pRow, "write");
54,032✔
484

485
      SSdbRaw *pRaw = (*encodeFp)(pRow->pObj);
54,032✔
486
      if (pRaw != NULL) {
54,032!
487
        pRaw->status = pRow->status;
54,032✔
488

489
        if (taosWriteFile(pFile, pRaw, sizeof(SSdbRaw)) != sizeof(SSdbRaw)) {
54,032!
490
          code = terrno;
×
491
          taosHashCancelIterate(hash, ppRow);
×
492
          sdbFreeRaw(pRaw);
×
493
          break;
×
494
        }
495

496
        int32_t newDataLen = pRaw->dataLen;
54,032✔
497
        char   *newData = pRaw->pData;
54,032✔
498
        if (tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_SDB) == DND_CS_SDB) {
54,032!
499
          newDataLen = ENCRYPTED_LEN(pRaw->dataLen);
×
500
          newData = taosMemoryMalloc(newDataLen);
×
501
          if (newData == NULL) {
×
502
            code = terrno;
×
503
            taosHashCancelIterate(hash, ppRow);
×
504
            sdbFreeRaw(pRaw);
×
505
            break;
×
506
          }
507

508
          SCryptOpts opts;
509
          opts.len = newDataLen;
×
510
          opts.source = pRaw->pData;
×
511
          opts.result = newData;
×
512
          opts.unitLen = 16;
×
513
          strncpy(opts.key, tsEncryptKey, ENCRYPT_KEY_LEN);
×
514

515
          int32_t count = CBC_Encrypt(&opts);
×
516

517
          // mDebug("write sdb, CBC_Encrypt encryptedDataLen:%d, dataLen:%d, %s",
518
          //       newDataLen, pRaw->dataLen, __FUNCTION__);
519
        }
520

521
        if (taosWriteFile(pFile, newData, newDataLen) != newDataLen) {
54,032!
522
          code = terrno;
×
523
          taosHashCancelIterate(hash, ppRow);
×
524
          sdbFreeRaw(pRaw);
×
525
          break;
×
526
        }
527

528
        if (tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_SDB) == DND_CS_SDB) {
54,032!
529
          taosMemoryFree(newData);
×
530
        }
531

532
        int32_t cksum = taosCalcChecksum(0, (const uint8_t *)pRaw, sizeof(SSdbRaw) + pRaw->dataLen);
54,032✔
533
        if (taosWriteFile(pFile, &cksum, sizeof(int32_t)) != sizeof(int32_t)) {
54,032!
534
          code = errno;
×
535
          taosHashCancelIterate(hash, ppRow);
×
536
          sdbFreeRaw(pRaw);
×
537
          break;
×
538
        }
539
      } else {
540
        code = TSDB_CODE_APP_ERROR;
×
541
        taosHashCancelIterate(hash, ppRow);
×
542
        break;
×
543
      }
544

545
      sdbFreeRaw(pRaw);
54,032✔
546
      ppRow = taosHashIterate(hash, ppRow);
54,032✔
547
    }
548
    sdbUnLock(pSdb, i);
72,450✔
549
  }
550

551
  if (code == 0) {
2,898!
552
    code = taosFsyncFile(pFile);
2,898✔
553
    if (code != 0) {
2,898!
554
      code = TAOS_SYSTEM_ERROR(errno);
×
555
      mError("failed to sync sdb file:%s since %s", tmpfile, tstrerror(code));
×
556
    }
557
  }
558

559
  if (taosCloseFile(&pFile) != 0) {
2,898!
560
    code = taosRenameFile(tmpfile, curfile);
×
561
  }
562

563
  if (code == 0) {
2,898!
564
    code = taosRenameFile(tmpfile, curfile);
2,898✔
565
    if (code != 0) {
2,898!
566
      mError("failed to write sdb file:%s since %s", curfile, tstrerror(code));
×
567
    }
568
  }
569

570
  if (code != 0) {
2,898!
571
    mError("failed to write sdb file:%s since %s", curfile, tstrerror(code));
×
572
  } else {
573
    pSdb->commitIndex = pSdb->applyIndex;
2,898✔
574
    pSdb->commitTerm = pSdb->applyTerm;
2,898✔
575
    pSdb->commitConfig = pSdb->applyConfig;
2,898✔
576
    mInfo("vgId:1, trans:0, write sdb file success, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64
2,898!
577
          " file:%s",
578
          pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig, curfile);
579
  }
580

581
  terrno = code;
2,898✔
582
  return code;
2,898✔
583
}
584

585
int32_t sdbWriteFile(SSdb *pSdb, int32_t delta) {
93,312✔
586
  int32_t code = 0;
93,312✔
587
  if (pSdb->applyIndex == pSdb->commitIndex) {
93,312✔
588
    return 0;
1,869✔
589
  }
590

591
  if (pSdb->applyIndex - pSdb->commitIndex < delta) {
91,443✔
592
    return 0;
88,534✔
593
  }
594

595
  (void)taosThreadMutexLock(&pSdb->filelock);
2,909✔
596
  if (pSdb->pWal != NULL) {
2,909!
597
    if (pSdb->sync > 0) {
2,909!
598
      code = syncBeginSnapshot(pSdb->sync, pSdb->applyIndex);
2,909✔
599
    }
600
  }
601
  if (code == 0) {
2,909✔
602
    code = sdbWriteFileImp(pSdb, -1);
2,898✔
603
  }
604
  if (code == 0) {
2,909✔
605
    if (pSdb->pWal != NULL) {
2,898!
606
      if (pSdb->sync > 0) {
2,898!
607
        code = syncEndSnapshot(pSdb->sync);
2,898✔
608
      }
609
    }
610
  }
611
  if (code != 0) {
2,909✔
612
    mError("failed to write sdb file since %s", tstrerror(code));
11!
613
  } else {
614
    mInfo("vgId:1, trans:0, write sdb file success, apply index:%" PRId64 ", term:%" PRId64 ", config:%" PRId64,
2,898!
615
          pSdb->applyIndex, pSdb->applyTerm, pSdb->applyConfig);
616
  }
617
  (void)taosThreadMutexUnlock(&pSdb->filelock);
2,909✔
618
  return code;
2,909✔
619
}
620

621
int32_t sdbWriteFileForDump(SSdb *pSdb) {
×
622
  int32_t code = 0;
×
623

624
  code = sdbWriteFileImp(pSdb, 0);
×
625

626
  return code;
×
627
}
628

629
int32_t sdbDeploy(SSdb *pSdb) {
1,221✔
630
  int32_t code = 0;
1,221✔
631
  code = sdbDeployData(pSdb);
1,221✔
632
  if (code != 0) {
1,221!
633
    TAOS_RETURN(code);
×
634
  }
635

636
  code = sdbWriteFile(pSdb, 0);
1,221✔
637
  if (code != 0) {
1,221!
638
    TAOS_RETURN(code);
×
639
  }
640

641
  return 0;
1,221✔
642
}
643

644
static SSdbIter *sdbCreateIter(SSdb *pSdb) {
×
645
  SSdbIter *pIter = taosMemoryCalloc(1, sizeof(SSdbIter));
×
646
  if (pIter == NULL) {
×
647
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
648
    return NULL;
×
649
  }
650

651
  char name[PATH_MAX + 100] = {0};
×
652
  snprintf(name, sizeof(name), "%s%ssdb.data.%" PRIu64, pSdb->tmpDir, TD_DIRSEP, (uint64_t)pIter);
×
653
  pIter->name = taosStrdup(name);
×
654
  if (pIter->name == NULL) {
×
655
    taosMemoryFree(pIter);
×
656
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
657
    return NULL;
×
658
  }
659

660
  return pIter;
×
661
}
662

663
static void sdbCloseIter(SSdbIter *pIter) {
×
664
  if (pIter == NULL) return;
×
665

666
  if (pIter->file != NULL) {
×
667
    int32_t ret = 0;
×
668
    if ((ret = taosCloseFile(&pIter->file)) != 0) {
×
669
      mError("failed to close sdb file since %s", tstrerror(ret));
×
670
    }
671
    pIter->file = NULL;
×
672
  }
673

674
  if (pIter->name != NULL) {
×
675
    int32_t ret = 0;
×
676
    if ((ret = taosRemoveFile(pIter->name)) != 0) {
×
677
      mError("failed to remove sdb file:%s since %s", pIter->name, tstrerror(ret));
×
678
    }
679
    taosMemoryFree(pIter->name);
×
680
    pIter->name = NULL;
×
681
  }
682

683
  mInfo("sdbiter:%p, is closed, total:%" PRId64, pIter, pIter->total);
×
684
  taosMemoryFree(pIter);
×
685
}
686

687
int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter, int64_t *index, int64_t *term, int64_t *config) {
×
688
  int32_t   code = 0;
×
689
  SSdbIter *pIter = sdbCreateIter(pSdb);
×
690
  if (pIter == NULL) return -1;
×
691

692
  char datafile[PATH_MAX] = {0};
×
693
  snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
×
694

695
  (void)taosThreadMutexLock(&pSdb->filelock);
×
696
  int64_t commitIndex = pSdb->commitIndex;
×
697
  int64_t commitTerm = pSdb->commitTerm;
×
698
  int64_t commitConfig = pSdb->commitConfig;
×
699
  if (taosCopyFile(datafile, pIter->name) < 0) {
×
700
    code = terrno;
×
701
    (void)taosThreadMutexUnlock(&pSdb->filelock);
×
702
    mError("failed to copy sdb file %s to %s since %s", datafile, pIter->name, tstrerror(code));
×
703
    sdbCloseIter(pIter);
×
704
    TAOS_RETURN(code);
×
705
  }
706
  (void)taosThreadMutexUnlock(&pSdb->filelock);
×
707

708
  pIter->file = taosOpenFile(pIter->name, TD_FILE_READ);
×
709
  if (pIter->file == NULL) {
×
710
    code = terrno;
×
711
    mError("failed to open sdb file:%s since %s", pIter->name, tstrerror(code));
×
712
    sdbCloseIter(pIter);
×
713
    TAOS_RETURN(code);
×
714
  }
715

716
  *ppIter = pIter;
×
717
  if (index != NULL) *index = commitIndex;
×
718
  if (term != NULL) *term = commitTerm;
×
719
  if (config != NULL) *config = commitConfig;
×
720

721
  mInfo("sdbiter:%p, is created to read snapshot, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s",
×
722
        pIter, commitIndex, commitTerm, commitConfig, pIter->name);
723
  return 0;
×
724
}
725

726
void sdbStopRead(SSdb *pSdb, SSdbIter *pIter) { sdbCloseIter(pIter); }
×
727

728
int32_t sdbDoRead(SSdb *pSdb, SSdbIter *pIter, void **ppBuf, int32_t *len) {
×
729
  int32_t code = 0;
×
730
  int32_t maxlen = 4096;
×
731
  void   *pBuf = taosMemoryCalloc(1, maxlen);
×
732
  if (pBuf == NULL) {
×
733
    code = terrno;
×
734
    TAOS_RETURN(code);
×
735
  }
736

737
  int32_t readlen = taosReadFile(pIter->file, pBuf, maxlen);
×
738
  if (readlen < 0 || readlen > maxlen) {
×
739
    code = terrno;
×
740
    mError("sdbiter:%p, failed to read snapshot since %s, total:%" PRId64, pIter, tstrerror(code), pIter->total);
×
741
    *ppBuf = NULL;
×
742
    *len = 0;
×
743
    taosMemoryFree(pBuf);
×
744
    TAOS_RETURN(code);
×
745
  } else if (readlen == 0) {
×
746
    mInfo("sdbiter:%p, read snapshot to the end, total:%" PRId64, pIter, pIter->total);
×
747
    *ppBuf = NULL;
×
748
    *len = 0;
×
749
    taosMemoryFree(pBuf);
×
750
    return 0;
×
751
  } else {  // (readlen <= maxlen)
752
    pIter->total += readlen;
×
753
    mInfo("sdbiter:%p, read:%d bytes from snapshot, total:%" PRId64, pIter, readlen, pIter->total);
×
754
    *ppBuf = pBuf;
×
755
    *len = readlen;
×
756
    return 0;
×
757
  }
758
}
759

760
int32_t sdbStartWrite(SSdb *pSdb, SSdbIter **ppIter) {
×
761
  int32_t   code = 0;
×
762
  SSdbIter *pIter = sdbCreateIter(pSdb);
×
763
  if (pIter == NULL) {
×
764
    code = terrno;
×
765
    TAOS_RETURN(code);
×
766
  }
767

768
  pIter->file = taosOpenFile(pIter->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
×
769
  if (pIter->file == NULL) {
×
770
    code = terrno;
×
771
    mError("failed to open %s since %s", pIter->name, tstrerror(code));
×
772
    sdbCloseIter(pIter);
×
773
    TAOS_RETURN(code);
×
774
  }
775

776
  *ppIter = pIter;
×
777
  mInfo("sdbiter:%p, is created to write snapshot, file:%s", pIter, pIter->name);
×
778
  return 0;
×
779
}
780

781
int32_t sdbStopWrite(SSdb *pSdb, SSdbIter *pIter, bool isApply, int64_t index, int64_t term, int64_t config) {
×
782
  int32_t code = -1;
×
783

784
  if (!isApply) {
×
785
    mInfo("sdbiter:%p, not apply to sdb", pIter);
×
786
    code = 0;
×
787
    goto _OVER;
×
788
  }
789

790
  if (taosFsyncFile(pIter->file) != 0) {
×
791
    code = TAOS_SYSTEM_ERROR(errno);
×
792
    mError("sdbiter:%p, failed to fasync file %s since %s", pIter, pIter->name, tstrerror(code));
×
793
    goto _OVER;
×
794
  }
795

796
  if (taosCloseFile(&pIter->file) != 0) {
×
797
    code = TAOS_SYSTEM_ERROR(errno);
×
798
    goto _OVER;
×
799
  }
800
  pIter->file = NULL;
×
801

802
  char datafile[PATH_MAX] = {0};
×
803
  snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
×
804
  code = taosRenameFile(pIter->name, datafile);
×
805
  if (code != 0) {
×
806
    mError("sdbiter:%p, failed to rename file %s to %s since %s", pIter, pIter->name, datafile, tstrerror(code));
×
807
    goto _OVER;
×
808
  }
809

810
  code = sdbReadFile(pSdb);
×
811
  if (code != 0) {
×
812
    mError("sdbiter:%p, failed to read from %s since %s", pIter, datafile, tstrerror(code));
×
813
    goto _OVER;
×
814
  }
815

816
  if (config > 0) {
×
817
    pSdb->commitConfig = config;
×
818
  }
819
  if (term > 0) {
×
820
    pSdb->commitTerm = term;
×
821
  }
822
  if (index > 0) {
×
823
    pSdb->commitIndex = index;
×
824
  }
825

826
  mInfo("sdbiter:%p, success applyed to sdb", pIter);
×
827
  code = 0;
×
828

829
_OVER:
×
830
  sdbCloseIter(pIter);
×
831
  return code;
×
832
}
833

834
int32_t sdbDoWrite(SSdb *pSdb, SSdbIter *pIter, void *pBuf, int32_t len) {
×
835
  int32_t code = 0;
×
836
  int32_t writelen = taosWriteFile(pIter->file, pBuf, len);
×
837
  if (writelen != len) {
×
838
    code = terrno;
×
839
    mError("failed to write len:%d since %s, total:%" PRId64, len, tstrerror(code), pIter->total);
×
840
    TAOS_RETURN(code);
×
841
  }
842

843
  pIter->total += writelen;
×
844
  mInfo("sdbiter:%p, write:%d bytes to snapshot, total:%" PRId64, pIter, writelen, pIter->total);
×
845
  return 0;
×
846
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc