• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4881

14 Dec 2025 03:48AM UTC coverage: 60.617% (+0.5%) from 60.092%
#4881

push

travis-ci

web-flow
test: update coverage workflow time (#33918)

156854 of 258761 relevant lines covered (60.62%)

75258957.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

47.39
/source/dnode/mnode/sdb/src/sdbFile.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "crypt.h"
18
#include "sdb.h"
19
#include "sync.h"
20
#include "tchecksum.h"
21
#include "tglobal.h"
22
#include "wal.h"
23

24
#define SDB_TABLE_SIZE   24
25
#define SDB_RESERVE_SIZE 512
26
#define SDB_FILE_VER     1
27

28
#define SDB_TABLE_SIZE_EXTRA   SDB_MAX
29
#define SDB_RESERVE_SIZE_EXTRA (512 - (SDB_TABLE_SIZE_EXTRA - SDB_TABLE_SIZE) * 2 * sizeof(int64_t))
30

31
static int32_t sdbDeployData(SSdb *pSdb) {
165,402✔
32
  int32_t code = 0;
165,402✔
33
  mInfo("start to deploy sdb");
165,402✔
34

35
  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
6,781,482✔
36
    SdbDeployFp fp = pSdb->deployFps[i];
6,616,080✔
37
    if (fp == NULL) continue;
6,616,080✔
38

39
    mInfo("start to deploy sdb:%s", sdbTableName(i));
992,412✔
40
    code = (*fp)(pSdb->pMnode);
992,412✔
41
    if (code != 0) {
992,412✔
42
      mError("failed to deploy sdb:%s since %s", sdbTableName(i), tstrerror(code));
×
43
      return -1;
×
44
    }
45
  }
46

47
  mInfo("sdb deploy success");
165,402✔
48
  return 0;
165,402✔
49
}
50

51
static int32_t sdbUpgradeData(SSdb *pSdb, int32_t version) {
213,223✔
52
  int32_t code = 0;
213,223✔
53
  mInfo("start to upgrade sdb");
213,223✔
54

55
  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
8,742,143✔
56
    SdbUpgradeFp fp = pSdb->upgradeFps[i];
8,528,920✔
57
    if (fp == NULL) continue;
8,528,920✔
58

59
    mInfo("start to upgrade sdb:%s", sdbTableName(i));
213,223✔
60
    code = (*fp)(pSdb->pMnode, version);
213,223✔
61
    if (code != 0) {
213,223✔
62
      mError("failed to upgrade sdb:%s since %s", sdbTableName(i), tstrerror(code));
×
63
      return -1;
×
64
    }
65
  }
66

67
  mInfo("sdb upgrade success");
213,223✔
68
  return 0;
213,223✔
69
}
70

71
static int32_t sdbAfterRestoredData(SSdb *pSdb) {
108,337✔
72
  int32_t code = 0;
108,337✔
73
  mInfo("start to prepare sdb");
108,337✔
74

75
  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
4,441,817✔
76
    SdbAfterRestoredFp fp = pSdb->afterRestoredFps[i];
4,333,480✔
77
    if (fp == NULL) continue;
4,333,480✔
78

79
    mInfo("start to prepare sdb:%s", sdbTableName(i));
108,337✔
80
    code = (*fp)(pSdb->pMnode);
108,337✔
81
    if (code != 0) {
108,337✔
82
      mError("failed to prepare sdb:%s since %s", sdbTableName(i), tstrerror(code));
×
83
      return -1;
×
84
    }
85
  }
86

87
  mInfo("sdb prepare success");
108,337✔
88
  return 0;
108,337✔
89
}
90

91
static void sdbResetData(SSdb *pSdb) {
63,051✔
92
  mInfo("start to reset sdb");
63,051✔
93

94
  for (ESdbType i = 0; i < SDB_MAX; ++i) {
2,585,091✔
95
    SHashObj *hash = pSdb->hashObjs[i];
2,522,040✔
96
    if (hash == NULL) continue;
2,522,040✔
97

98
    sdbWriteLock(pSdb, i);
2,269,836✔
99

100
    SSdbRow **ppRow = taosHashIterate(hash, NULL);
2,269,836✔
101
    while (ppRow != NULL) {
2,269,836✔
102
      SSdbRow *pRow = *ppRow;
×
103
      if (pRow == NULL) continue;
×
104

105
      sdbFreeRow(pSdb, pRow, true);
×
106
      ppRow = taosHashIterate(hash, ppRow);
×
107
    }
108

109
    taosHashClear(pSdb->hashObjs[i]);
2,269,836✔
110
    pSdb->tableVer[i] = 0;
2,269,836✔
111
    pSdb->maxId[i] = 0;
2,269,836✔
112

113
    sdbUnLock(pSdb, i);
2,269,836✔
114

115
    mInfo("sdb:%s is reset", sdbTableName(i));
2,269,836✔
116
  }
117

118
  pSdb->applyIndex = -1;
63,051✔
119
  pSdb->applyTerm = -1;
63,051✔
120
  pSdb->applyConfig = -1;
63,051✔
121
  pSdb->commitIndex = -1;
63,051✔
122
  pSdb->commitTerm = -1;
63,051✔
123
  pSdb->commitConfig = -1;
63,051✔
124
  mInfo("sdb reset success");
63,051✔
125
}
63,051✔
126

127
static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
57,379✔
128
  int32_t code = 0;
57,379✔
129
  int64_t sver = 0;
57,379✔
130
  int32_t ret = taosReadFile(pFile, &sver, sizeof(int64_t));
57,379✔
131
  if (ret < 0) {
57,379✔
132
    return terrno;
×
133
  }
134
  if (ret != sizeof(int64_t)) {
57,379✔
135
    code = TSDB_CODE_FILE_CORRUPTED;
×
136
    TAOS_RETURN(code);
×
137
  }
138
  if (sver != SDB_FILE_VER) {
57,379✔
139
    code = TSDB_CODE_FILE_CORRUPTED;
×
140
    TAOS_RETURN(code);
×
141
  }
142

143
  ret = taosReadFile(pFile, &pSdb->applyIndex, sizeof(int64_t));
57,379✔
144
  if (ret < 0) {
57,379✔
145
    return terrno;
×
146
  }
147
  if (ret != sizeof(int64_t)) {
57,379✔
148
    code = TSDB_CODE_FILE_CORRUPTED;
×
149
    TAOS_RETURN(code);
×
150
  }
151

152
  ret = taosReadFile(pFile, &pSdb->applyTerm, sizeof(int64_t));
57,379✔
153
  if (ret < 0) {
57,379✔
154
    return terrno;
×
155
  }
156
  if (ret != sizeof(int64_t)) {
57,379✔
157
    code = TSDB_CODE_FILE_CORRUPTED;
×
158
    TAOS_RETURN(code);
×
159
  }
160

161
  ret = taosReadFile(pFile, &pSdb->applyConfig, sizeof(int64_t));
57,379✔
162
  if (ret < 0) {
57,379✔
163
    return terrno;
×
164
  }
165
  if (ret != sizeof(int64_t)) {
57,379✔
166
    code = TSDB_CODE_FILE_CORRUPTED;
×
167
    TAOS_RETURN(code);
×
168
  }
169

170
  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
1,434,475✔
171
    int64_t maxId = 0;
1,377,096✔
172
    ret = taosReadFile(pFile, &maxId, sizeof(int64_t));
1,377,096✔
173
    if (ret < 0) {
1,377,096✔
174
      return terrno;
×
175
    }
176
    if (ret != sizeof(int64_t)) {
1,377,096✔
177
      code = TSDB_CODE_FILE_CORRUPTED;
×
178
      TAOS_RETURN(code);
×
179
    }
180
    if (i < SDB_MAX) {
1,377,096✔
181
      pSdb->maxId[i] = maxId;
1,377,096✔
182
    }
183
  }
184

185
  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
1,434,475✔
186
    int64_t ver = 0;
1,377,096✔
187
    ret = taosReadFile(pFile, &ver, sizeof(int64_t));
1,377,096✔
188
    if (ret < 0) {
1,377,096✔
189
      return terrno;
×
190
    }
191
    if (ret != sizeof(int64_t)) {
1,377,096✔
192
      code = TSDB_CODE_FILE_CORRUPTED;
×
193
      TAOS_RETURN(code);
×
194
    }
195
    if (i < SDB_MAX) {
1,377,096✔
196
      pSdb->tableVer[i] = ver;
1,377,096✔
197
    }
198
  }
199

200
  // for sdb compatibility
201
  for (int32_t i = SDB_TABLE_SIZE; i < SDB_TABLE_SIZE_EXTRA; ++i) {
975,443✔
202
    int64_t maxId = 0;
918,064✔
203
    ret = taosReadFile(pFile, &maxId, sizeof(int64_t));
918,064✔
204
    if (ret < 0) {
918,064✔
205
      code = TAOS_SYSTEM_ERROR(ERRNO);
×
206
      TAOS_RETURN(code);
×
207
    }
208
    if (ret != sizeof(int64_t)) {
918,064✔
209
      code = TSDB_CODE_FILE_CORRUPTED;
×
210
      TAOS_RETURN(code);
×
211
    }
212
    if (i < SDB_MAX) {
918,064✔
213
      pSdb->maxId[i] = maxId;
918,064✔
214
    }
215

216
    int64_t ver = 0;
918,064✔
217
    ret = taosReadFile(pFile, &ver, sizeof(int64_t));
918,064✔
218
    if (ret < 0) {
918,064✔
219
      code = TAOS_SYSTEM_ERROR(ERRNO);
×
220
      TAOS_RETURN(code);
×
221
    }
222
    if (ret != sizeof(int64_t)) {
918,064✔
223
      code = TSDB_CODE_FILE_CORRUPTED;
×
224
      TAOS_RETURN(code);
×
225
    }
226
    if (i < SDB_MAX) {
918,064✔
227
      pSdb->tableVer[i] = ver;
918,064✔
228
    }
229
  }
230

231
  char reserve[SDB_RESERVE_SIZE_EXTRA] = {0};
57,379✔
232
  ret = taosReadFile(pFile, reserve, sizeof(reserve));
57,379✔
233
  if (ret < 0) {
57,379✔
234
    return terrno;
×
235
  }
236
  if (ret != sizeof(reserve)) {
57,379✔
237
    code = TSDB_CODE_FILE_CORRUPTED;
×
238
    TAOS_RETURN(code);
×
239
  }
240

241
  return 0;
57,379✔
242
}
243

244
static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) {
447,638✔
245
  int64_t sver = SDB_FILE_VER;
447,638✔
246
  if (taosWriteFile(pFile, &sver, sizeof(int64_t)) != sizeof(int64_t)) {
447,638✔
247
    return terrno;
×
248
  }
249

250
  mInfo("vgId:1, write sdb file with sdb applyIndex:%" PRId64 " term:%" PRId64 " config:%" PRId64, pSdb->applyIndex,
447,638✔
251
        pSdb->applyTerm, pSdb->applyConfig);
252
  if (taosWriteFile(pFile, &pSdb->applyIndex, sizeof(int64_t)) != sizeof(int64_t)) {
447,638✔
253
    return terrno;
×
254
  }
255

256
  if (taosWriteFile(pFile, &pSdb->applyTerm, sizeof(int64_t)) != sizeof(int64_t)) {
447,638✔
257
    return terrno;
×
258
  }
259

260
  if (taosWriteFile(pFile, &pSdb->applyConfig, sizeof(int64_t)) != sizeof(int64_t)) {
447,638✔
261
    return terrno;
×
262
  }
263

264
  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
11,190,950✔
265
    int64_t maxId = 0;
10,743,312✔
266
    if (i < SDB_MAX) {
10,743,312✔
267
      maxId = pSdb->maxId[i];
10,743,312✔
268
    }
269
    if (taosWriteFile(pFile, &maxId, sizeof(int64_t)) != sizeof(int64_t)) {
10,743,312✔
270
      return terrno;
×
271
    }
272
  }
273

274
  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
11,190,950✔
275
    int64_t ver = 0;
10,743,312✔
276
    if (i < SDB_MAX) {
10,743,312✔
277
      ver = pSdb->tableVer[i];
10,743,312✔
278
    }
279
    if (taosWriteFile(pFile, &ver, sizeof(int64_t)) != sizeof(int64_t)) {
10,743,312✔
280
      return terrno;
×
281
    }
282
  }
283

284
  // for sdb compatibility
285
  for (int32_t i = SDB_TABLE_SIZE; i < SDB_TABLE_SIZE_EXTRA; ++i) {
7,609,846✔
286
    int64_t maxId = 0;
7,162,208✔
287
    if (i < SDB_MAX) {
7,162,208✔
288
      maxId = pSdb->maxId[i];
7,162,208✔
289
    }
290
    if (taosWriteFile(pFile, &maxId, sizeof(int64_t)) != sizeof(int64_t)) {
7,162,208✔
291
      return terrno;
×
292
    }
293

294
    int64_t ver = 0;
7,162,208✔
295
    if (i < SDB_MAX) {
7,162,208✔
296
      ver = pSdb->tableVer[i];
7,162,208✔
297
    }
298
    if (taosWriteFile(pFile, &ver, sizeof(int64_t)) != sizeof(int64_t)) {
7,162,208✔
299
      return terrno;
×
300
    }
301
  }
302

303
  char reserve[SDB_RESERVE_SIZE_EXTRA] = {0};
447,638✔
304
  if (taosWriteFile(pFile, reserve, sizeof(reserve)) != sizeof(reserve)) {
447,638✔
305
    return terrno;
×
306
  }
307

308
  return 0;
447,638✔
309
}
310

311
static int32_t sdbReadFileImp(SSdb *pSdb) {
63,051✔
312
  int64_t offset = 0;
63,051✔
313
  int32_t code = 0;
63,051✔
314
  int32_t readLen = 0;
63,051✔
315
  int64_t ret = 0;
63,051✔
316
  char    file[PATH_MAX] = {0};
63,051✔
317
  int32_t bufLen = TSDB_MAX_MSG_SIZE;
63,051✔
318

319
  snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
63,051✔
320
  mInfo("start to read sdb file:%s", file);
63,051✔
321

322
  SSdbRaw *pRaw = taosMemoryMalloc(bufLen + 100);
63,051✔
323
  if (pRaw == NULL) {
63,051✔
324
    code = terrno;
×
325
    mError("failed read sdb file since %s", tstrerror(code));
×
326
    TAOS_RETURN(code);
×
327
  }
328

329
  TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
63,051✔
330
  if (pFile == NULL) {
63,051✔
331
    taosMemoryFree(pRaw);
5,672✔
332
    code = terrno;
5,672✔
333
    mInfo("read sdb file:%s finished since %s", file, tstrerror(code));
5,672✔
334
    return 0;
5,672✔
335
  }
336

337
  code = sdbReadFileHead(pSdb, pFile);
57,379✔
338
  if (code != 0) {
57,379✔
339
    mError("failed to read sdb file:%s head since %s", file, tstrerror(code));
×
340
    taosMemoryFree(pRaw);
×
341
    int32_t ret = 0;
×
342
    if ((ret = taosCloseFile(&pFile)) != 0) {
×
343
      mError("failed to close sdb file:%s since %s", file, tstrerror(ret));
×
344
    }
345
    return code;
×
346
  }
347

348
  int64_t tableVer[SDB_MAX] = {0};
57,379✔
349
  memcpy(tableVer, pSdb->tableVer, sizeof(tableVer));
57,379✔
350

351
  while (1) {
7,184,434✔
352
    readLen = sizeof(SSdbRaw);
7,241,813✔
353
    ret = taosReadFile(pFile, pRaw, readLen);
7,241,813✔
354
    if (ret == 0) break;
7,241,813✔
355

356
    if (ret < 0) {
7,184,434✔
357
      code = terrno;
×
358
      mError("failed to read sdb file:%s since %s", file, tstrerror(code));
×
359
      goto _OVER;
×
360
    }
361

362
    if (ret != readLen) {
7,184,434✔
363
      code = TSDB_CODE_FILE_CORRUPTED;
×
364
      mError("failed to read sdb file:%s since %s, ret:%" PRId64 " != readLen:%d", file, tstrerror(code), ret, readLen);
×
365
      goto _OVER;
×
366
    }
367

368
    readLen = pRaw->dataLen + sizeof(int32_t);
7,184,434✔
369
    if (tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_SDB) == DND_CS_SDB) {
7,184,434✔
370
      readLen = ENCRYPTED_LEN(pRaw->dataLen) + sizeof(int32_t);
×
371
    }
372
    if (readLen >= bufLen) {
7,184,434✔
373
      bufLen = pRaw->dataLen * 2;
×
374
      SSdbRaw *pNewRaw = taosMemoryMalloc(bufLen + 100);
×
375
      if (pNewRaw == NULL) {
×
376
        code = terrno;
×
377
        mError("failed read sdb file since malloc new sdbRaw size:%d failed", bufLen);
×
378
        goto _OVER;
×
379
      }
380
      mInfo("malloc new sdb raw size:%d, type:%d", bufLen, pRaw->type);
×
381
      memcpy(pNewRaw, pRaw, sizeof(SSdbRaw));
×
382
      sdbFreeRaw(pRaw);
×
383
      pRaw = pNewRaw;
×
384
    }
385

386
    ret = taosReadFile(pFile, pRaw->pData, readLen);
7,184,434✔
387
    if (ret < 0) {
7,184,434✔
388
      code = terrno;
×
389
      mError("failed to read sdb file:%s since %s, ret:%" PRId64 " readLen:%d", file, tstrerror(code), ret, readLen);
×
390
      goto _OVER;
×
391
    }
392

393
    if (ret != readLen) {
7,184,434✔
394
      code = TSDB_CODE_FILE_CORRUPTED;
×
395
      mError("failed to read sdb file:%s since %s, ret:%" PRId64 " != readLen:%d", file, tstrerror(code), ret, readLen);
×
396
      goto _OVER;
×
397
    }
398

399
    if (tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_SDB) == DND_CS_SDB) {
7,184,434✔
400
      int32_t count = 0;
×
401

402
      char *plantContent = taosMemoryMalloc(ENCRYPTED_LEN(pRaw->dataLen));
×
403
      if (plantContent == NULL) {
×
404
        code = terrno;
×
405
        goto _OVER;
×
406
      }
407

408
      SCryptOpts opts = {0};
×
409
      opts.len = ENCRYPTED_LEN(pRaw->dataLen);
×
410
      opts.source = pRaw->pData;
×
411
      opts.result = plantContent;
×
412
      opts.unitLen = 16;
×
413
      tstrncpy(opts.key, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
×
414

415
      count = Builtin_CBC_Decrypt(&opts);
×
416
      if (count <= 0) {
×
417
        code = terrno;
×
418
        goto _OVER;
×
419
      }
420

421
      // mDebug("read sdb, CBC Decrypt dataLen:%d, descrypted len:%d, %s", pRaw->dataLen, count, __FUNCTION__);
422

423
      memcpy(pRaw->pData, plantContent, pRaw->dataLen);
×
424
      taosMemoryFree(plantContent);
×
425
      memcpy(pRaw->pData + pRaw->dataLen, &pRaw->pData[ENCRYPTED_LEN(pRaw->dataLen)], sizeof(int32_t));
×
426
    }
427

428
    int32_t totalLen = sizeof(SSdbRaw) + pRaw->dataLen + sizeof(int32_t);
7,184,434✔
429
    if ((!taosCheckChecksumWhole((const uint8_t *)pRaw, totalLen)) != 0) {
14,368,868✔
430
      code = TSDB_CODE_CHECKSUM_ERROR;
×
431
      mError("failed to read sdb file:%s since %s, readLen:%d", file, tstrerror(code), readLen);
×
432
      goto _OVER;
×
433
    }
434

435
    if (pRaw->type >= SDB_MAX) {
7,184,434✔
436
      mInfo("skip sdb raw type:%d since it is not supported", pRaw->type);
×
437
      continue;
×
438
    }
439

440
    code = sdbWriteWithoutFree(pSdb, pRaw);
7,184,434✔
441
    if (code != 0) {
7,184,434✔
442
      mError("failed to exec sdbWrite while read sdb file:%s since %s", file, terrstr());
×
443
      goto _OVER;
×
444
    }
445
  }
446

447
  code = 0;
57,379✔
448
  pSdb->commitIndex = pSdb->applyIndex;
57,379✔
449
  pSdb->commitTerm = pSdb->applyTerm;
57,379✔
450
  pSdb->commitConfig = pSdb->applyConfig;
57,379✔
451
  memcpy(pSdb->tableVer, tableVer, sizeof(tableVer));
57,379✔
452
  mInfo("vgId:1, trans:0, read sdb file:%s success, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64, file,
57,379✔
453
        pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig);
454

455
_OVER:
57,349✔
456
  if ((ret = taosCloseFile(&pFile)) != 0) {
57,379✔
457
    mError("failed to close sdb file:%s since %s", file, tstrerror(ret));
×
458
  }
459
  sdbFreeRaw(pRaw);
57,379✔
460

461
  TAOS_RETURN(code);
57,379✔
462
}
463

464
int32_t sdbReadFile(SSdb *pSdb) {
63,051✔
465
  (void)taosThreadMutexLock(&pSdb->filelock);
63,051✔
466

467
  sdbResetData(pSdb);
63,051✔
468
  int32_t code = sdbReadFileImp(pSdb);
63,051✔
469
  if (code != 0) {
63,051✔
470
    mError("failed to read sdb file since %s", tstrerror(code));
×
471
    sdbResetData(pSdb);
×
472
  }
473

474
  (void)taosThreadMutexUnlock(&pSdb->filelock);
63,051✔
475
  return code;
63,051✔
476
}
477

478
static int32_t sdbWriteFileImp(SSdb *pSdb, int32_t skip_type) {
447,638✔
479
  int32_t code = 0;
447,638✔
480

481
  char tmpfile[PATH_MAX] = {0};
447,638✔
482
  snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
447,638✔
483
  char curfile[PATH_MAX] = {0};
447,638✔
484
  snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
447,638✔
485

486
  mInfo("start to write sdb file, apply index:%" PRId64 " term:%" PRId64 " config:%" PRId64 ", commit index:%" PRId64
447,638✔
487
        " term:%" PRId64 " config:%" PRId64 ", file:%s",
488
        pSdb->applyIndex, pSdb->applyTerm, pSdb->applyConfig, pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig,
489
        curfile);
490

491
  TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
447,638✔
492
  if (pFile == NULL) {
447,638✔
493
    code = terrno;
×
494
    mError("failed to open sdb file:%s for write since %s", tmpfile, tstrerror(code));
×
495
    TAOS_RETURN(code);
×
496
  }
497

498
  code = sdbWriteFileHead(pSdb, pFile);
447,638✔
499
  if (code != 0) {
447,638✔
500
    mError("failed to write sdb file:%s head since %s", tmpfile, tstrerror(code));
×
501
    int32_t ret = 0;
×
502
    if ((ret = taosCloseFile(&pFile)) != 0) {
×
503
      mError("failed to close sdb file:%s since %s", tmpfile, tstrerror(ret));
×
504
    }
505
    return code;
×
506
  }
507

508
  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
18,353,158✔
509
    if (i == skip_type) continue;
17,905,520✔
510
    SdbEncodeFp encodeFp = pSdb->encodeFps[i];
17,905,520✔
511
    if (encodeFp == NULL) continue;
17,905,520✔
512

513
    mInfo("write %s to sdb file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i));
15,667,330✔
514

515
    SHashObj *hash = pSdb->hashObjs[i];
15,667,330✔
516
    sdbWriteLock(pSdb, i);
15,667,330✔
517

518
    SSdbRow **ppRow = taosHashIterate(hash, NULL);
15,667,330✔
519
    while (ppRow != NULL) {
70,447,206✔
520
      SSdbRow *pRow = *ppRow;
54,779,876✔
521
      if (pRow == NULL) {
54,779,876✔
522
        ppRow = taosHashIterate(hash, ppRow);
×
523
        continue;
×
524
      }
525

526
      if (pRow->status != SDB_STATUS_READY && pRow->status != SDB_STATUS_DROPPING) {
54,779,876✔
527
        sdbPrintOper(pSdb, pRow, "not-write");
13,531✔
528
        ppRow = taosHashIterate(hash, ppRow);
13,531✔
529
        continue;
13,531✔
530
      }
531

532
      sdbPrintOper(pSdb, pRow, "write");
54,766,345✔
533

534
      SSdbRaw *pRaw = (*encodeFp)(pRow->pObj);
54,766,345✔
535
      if (pRaw != NULL) {
54,766,345✔
536
        pRaw->status = pRow->status;
54,766,345✔
537

538
        if (taosWriteFile(pFile, pRaw, sizeof(SSdbRaw)) != sizeof(SSdbRaw)) {
54,766,345✔
539
          code = terrno;
×
540
          taosHashCancelIterate(hash, ppRow);
×
541
          sdbFreeRaw(pRaw);
×
542
          break;
×
543
        }
544

545
        int32_t newDataLen = pRaw->dataLen;
54,766,345✔
546
        char   *newData = pRaw->pData;
54,766,345✔
547
        if (tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_SDB) == DND_CS_SDB) {
54,766,345✔
548
          newDataLen = ENCRYPTED_LEN(pRaw->dataLen);
×
549
          newData = taosMemoryMalloc(newDataLen);
×
550
          if (newData == NULL) {
×
551
            code = terrno;
×
552
            taosHashCancelIterate(hash, ppRow);
×
553
            sdbFreeRaw(pRaw);
×
554
            break;
×
555
          }
556

557
          SCryptOpts opts = {0};
×
558
          opts.len = newDataLen;
×
559
          opts.source = pRaw->pData;
×
560
          opts.result = newData;
×
561
          opts.unitLen = 16;
×
562
          tstrncpy(opts.key, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
×
563

564
          int32_t count = Builtin_CBC_Encrypt(&opts);
×
565
          if (count <= 0) {
×
566
            code = terrno;
×
567
            taosHashCancelIterate(hash, ppRow);
×
568
            sdbFreeRaw(pRaw);
×
569
            break;
×
570
          }
571

572
          // mDebug("write sdb, CBC Encrypt encryptedDataLen:%d, dataLen:%d, %s",
573
          //       newDataLen, pRaw->dataLen, __FUNCTION__);
574
        }
575

576
        if (taosWriteFile(pFile, newData, newDataLen) != newDataLen) {
54,766,345✔
577
          code = terrno;
×
578
          taosHashCancelIterate(hash, ppRow);
×
579
          sdbFreeRaw(pRaw);
×
580
          break;
×
581
        }
582

583
        if (tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_SDB) == DND_CS_SDB) {
54,766,345✔
584
          taosMemoryFree(newData);
×
585
        }
586

587
        int32_t cksum = taosCalcChecksum(0, (const uint8_t *)pRaw, sizeof(SSdbRaw) + pRaw->dataLen);
54,766,345✔
588
        if (taosWriteFile(pFile, &cksum, sizeof(int32_t)) != sizeof(int32_t)) {
54,766,345✔
589
          code = terrno;
×
590
          taosHashCancelIterate(hash, ppRow);
×
591
          sdbFreeRaw(pRaw);
×
592
          break;
×
593
        }
594
      } else {
595
        code = TSDB_CODE_APP_ERROR;
×
596
        taosHashCancelIterate(hash, ppRow);
×
597
        break;
×
598
      }
599

600
      sdbFreeRaw(pRaw);
54,766,345✔
601
      ppRow = taosHashIterate(hash, ppRow);
54,766,345✔
602
    }
603
    sdbUnLock(pSdb, i);
15,667,330✔
604
  }
605

606
  if (code == 0) {
447,638✔
607
    code = taosFsyncFile(pFile);
447,638✔
608
    if (code != 0) {
447,638✔
609
      code = TAOS_SYSTEM_ERROR(ERRNO);
×
610
      mError("failed to sync sdb file:%s since %s", tmpfile, tstrerror(code));
×
611
    }
612
  }
613

614
  if (taosCloseFile(&pFile) != 0) {
447,638✔
615
    code = taosRenameFile(tmpfile, curfile);
×
616
  }
617

618
  if (code == 0) {
447,638✔
619
    code = taosRenameFile(tmpfile, curfile);
447,638✔
620
    if (code != 0) {
447,638✔
621
      mError("failed to write sdb file:%s since %s", curfile, tstrerror(code));
×
622
    }
623
  }
624

625
  if (code != 0) {
447,638✔
626
    mError("failed to write sdb file:%s since %s", curfile, tstrerror(code));
×
627
  } else {
628
    pSdb->commitIndex = pSdb->applyIndex;
447,638✔
629
    pSdb->commitTerm = pSdb->applyTerm;
447,638✔
630
    pSdb->commitConfig = pSdb->applyConfig;
447,638✔
631
    mInfo("vgId:1, trans:0, write sdb file success, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64
447,638✔
632
          " file:%s",
633
          pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig, curfile);
634
  }
635

636
  terrno = code;
447,638✔
637
  return code;
447,638✔
638
}
639

640
int32_t sdbWriteFile(SSdb *pSdb, int32_t delta) {
21,045,548✔
641
  int32_t code = 0;
21,045,548✔
642
  if (pSdb->applyIndex == pSdb->commitIndex) {
21,045,548✔
643
    return 0;
440,055✔
644
  }
645

646
  if (pSdb->applyIndex - pSdb->commitIndex < delta) {
20,605,493✔
647
    return 0;
20,156,080✔
648
  }
649

650
  (void)taosThreadMutexLock(&pSdb->filelock);
449,413✔
651
  if (pSdb->pWal != NULL) {
449,413✔
652
    if (pSdb->sync > 0) {
449,413✔
653
      code = syncBeginSnapshot(pSdb->sync, pSdb->applyIndex);
449,413✔
654
    }
655
  }
656
  if (code == 0) {
449,413✔
657
    code = sdbWriteFileImp(pSdb, -1);
447,638✔
658
  }
659
  if (code == 0) {
449,413✔
660
    if (pSdb->pWal != NULL) {
447,638✔
661
      if (pSdb->sync > 0) {
447,638✔
662
        code = syncEndSnapshot(pSdb->sync, false);
447,638✔
663
      }
664
    }
665
  }
666
  if (code != 0) {
449,413✔
667
    mError("failed to write sdb file since %s", tstrerror(code));
1,775✔
668
  } else {
669
    mInfo("vgId:1, trans:0, write sdb file success, apply index:%" PRId64 ", term:%" PRId64 ", config:%" PRId64,
447,638✔
670
          pSdb->applyIndex, pSdb->applyTerm, pSdb->applyConfig);
671
  }
672
  (void)taosThreadMutexUnlock(&pSdb->filelock);
449,413✔
673
  return code;
449,413✔
674
}
675

676
int32_t sdbWriteFileForDump(SSdb *pSdb, int32_t skip_type) {
×
677
  int32_t code = 0;
×
678

679
  code = sdbWriteFileImp(pSdb, skip_type);
×
680

681
  return code;
×
682
}
683

684
int32_t sdbDeploy(SSdb *pSdb) {
165,402✔
685
  int32_t code = 0;
165,402✔
686
  code = sdbDeployData(pSdb);
165,402✔
687
  if (code != 0) {
165,402✔
688
    TAOS_RETURN(code);
×
689
  }
690

691
  code = sdbWriteFile(pSdb, 0);
165,402✔
692
  if (code != 0) {
165,402✔
693
    TAOS_RETURN(code);
×
694
  }
695

696
  return 0;
165,402✔
697
}
698

699
int32_t sdbUpgrade(SSdb *pSdb, int32_t version) {
213,223✔
700
  int32_t code = 0;
213,223✔
701
  code = sdbUpgradeData(pSdb, version);
213,223✔
702
  if (code != 0) {
213,223✔
703
    TAOS_RETURN(code);
×
704
  }
705

706
  code = sdbWriteFile(pSdb, 0);
213,223✔
707
  if (code != 0) {
213,223✔
708
    TAOS_RETURN(code);
×
709
  }
710

711
  return 0;
213,223✔
712
}
713

714
int32_t sdbAfterRestored(SSdb *pSdb) {
108,337✔
715
  int32_t code = 0;
108,337✔
716
  code = sdbAfterRestoredData(pSdb);
108,337✔
717
  if (code != 0) {
108,337✔
718
    TAOS_RETURN(code);
×
719
  }
720
  return 0;
108,337✔
721
}
722

723
static SSdbIter *sdbCreateIter(SSdb *pSdb) {
×
724
  SSdbIter *pIter = taosMemoryCalloc(1, sizeof(SSdbIter));
×
725
  if (pIter == NULL) {
×
726
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
727
    return NULL;
×
728
  }
729

730
  char name[PATH_MAX + 100] = {0};
×
731
  snprintf(name, sizeof(name), "%s%ssdb.data.%" PRIu64, pSdb->tmpDir, TD_DIRSEP, (uint64_t)pIter);
×
732
  pIter->name = taosStrdup(name);
×
733
  if (pIter->name == NULL) {
×
734
    taosMemoryFree(pIter);
×
735
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
736
    return NULL;
×
737
  }
738

739
  return pIter;
×
740
}
741

742
static void sdbCloseIter(SSdbIter *pIter) {
×
743
  if (pIter == NULL) return;
×
744

745
  if (pIter->file != NULL) {
×
746
    int32_t ret = 0;
×
747
    if ((ret = taosCloseFile(&pIter->file)) != 0) {
×
748
      mError("failed to close sdb file since %s", tstrerror(ret));
×
749
    }
750
    pIter->file = NULL;
×
751
  }
752

753
  if (pIter->name != NULL) {
×
754
    int32_t ret = 0;
×
755
    if ((ret = taosRemoveFile(pIter->name)) != 0) {
×
756
      mError("failed to remove sdb file:%s since %s", pIter->name, tstrerror(ret));
×
757
    }
758
    taosMemoryFree(pIter->name);
×
759
    pIter->name = NULL;
×
760
  }
761

762
  mInfo("sdbiter:%p, is closed, total:%" PRId64, pIter, pIter->total);
×
763
  taosMemoryFree(pIter);
×
764
}
765

766
int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter, int64_t *index, int64_t *term, int64_t *config) {
×
767
  int32_t   code = 0;
×
768
  SSdbIter *pIter = sdbCreateIter(pSdb);
×
769
  if (pIter == NULL) return -1;
×
770

771
  char datafile[PATH_MAX] = {0};
×
772
  snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
×
773

774
  (void)taosThreadMutexLock(&pSdb->filelock);
×
775
  int64_t commitIndex = pSdb->commitIndex;
×
776
  int64_t commitTerm = pSdb->commitTerm;
×
777
  int64_t commitConfig = pSdb->commitConfig;
×
778
  if (taosCopyFile(datafile, pIter->name) < 0) {
×
779
    code = terrno;
×
780
    (void)taosThreadMutexUnlock(&pSdb->filelock);
×
781
    mError("failed to copy sdb file %s to %s since %s", datafile, pIter->name, tstrerror(code));
×
782
    sdbCloseIter(pIter);
×
783
    TAOS_RETURN(code);
×
784
  }
785
  (void)taosThreadMutexUnlock(&pSdb->filelock);
×
786

787
  pIter->file = taosOpenFile(pIter->name, TD_FILE_READ);
×
788
  if (pIter->file == NULL) {
×
789
    code = terrno;
×
790
    mError("failed to open sdb file:%s since %s", pIter->name, tstrerror(code));
×
791
    sdbCloseIter(pIter);
×
792
    TAOS_RETURN(code);
×
793
  }
794

795
  *ppIter = pIter;
×
796
  if (index != NULL) *index = commitIndex;
×
797
  if (term != NULL) *term = commitTerm;
×
798
  if (config != NULL) *config = commitConfig;
×
799

800
  mInfo("sdbiter:%p, is created to read snapshot, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s",
×
801
        pIter, commitIndex, commitTerm, commitConfig, pIter->name);
802
  return 0;
×
803
}
804

805
void sdbStopRead(SSdb *pSdb, SSdbIter *pIter) { sdbCloseIter(pIter); }
×
806

807
int32_t sdbDoRead(SSdb *pSdb, SSdbIter *pIter, void **ppBuf, int32_t *len) {
×
808
  int32_t code = 0;
×
809
  int32_t maxlen = 4096;
×
810
  void   *pBuf = taosMemoryCalloc(1, maxlen);
×
811
  if (pBuf == NULL) {
×
812
    code = terrno;
×
813
    TAOS_RETURN(code);
×
814
  }
815

816
  int32_t readlen = taosReadFile(pIter->file, pBuf, maxlen);
×
817
  if (readlen < 0 || readlen > maxlen) {
×
818
    code = terrno;
×
819
    mError("sdbiter:%p, failed to read snapshot since %s, total:%" PRId64, pIter, tstrerror(code), pIter->total);
×
820
    *ppBuf = NULL;
×
821
    *len = 0;
×
822
    taosMemoryFree(pBuf);
×
823
    TAOS_RETURN(code);
×
824
  } else if (readlen == 0) {
×
825
    mInfo("sdbiter:%p, read snapshot to the end, total:%" PRId64, pIter, pIter->total);
×
826
    *ppBuf = NULL;
×
827
    *len = 0;
×
828
    taosMemoryFree(pBuf);
×
829
    return 0;
×
830
  } else {  // (readlen <= maxlen)
831
    pIter->total += readlen;
×
832
    mInfo("sdbiter:%p, read:%d bytes from snapshot, total:%" PRId64, pIter, readlen, pIter->total);
×
833
    *ppBuf = pBuf;
×
834
    *len = readlen;
×
835
    return 0;
×
836
  }
837
}
838

839
int32_t sdbStartWrite(SSdb *pSdb, SSdbIter **ppIter) {
×
840
  int32_t   code = 0;
×
841
  SSdbIter *pIter = sdbCreateIter(pSdb);
×
842
  if (pIter == NULL) {
×
843
    code = terrno;
×
844
    TAOS_RETURN(code);
×
845
  }
846

847
  pIter->file = taosOpenFile(pIter->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
×
848
  if (pIter->file == NULL) {
×
849
    code = terrno;
×
850
    mError("failed to open %s since %s", pIter->name, tstrerror(code));
×
851
    sdbCloseIter(pIter);
×
852
    TAOS_RETURN(code);
×
853
  }
854

855
  *ppIter = pIter;
×
856
  mInfo("sdbiter:%p, is created to write snapshot, file:%s", pIter, pIter->name);
×
857
  return 0;
×
858
}
859

860
int32_t sdbStopWrite(SSdb *pSdb, SSdbIter *pIter, bool isApply, int64_t index, int64_t term, int64_t config) {
×
861
  int32_t code = -1;
×
862

863
  if (!isApply) {
×
864
    mInfo("sdbiter:%p, not apply to sdb", pIter);
×
865
    code = 0;
×
866
    goto _OVER;
×
867
  }
868

869
  if (taosFsyncFile(pIter->file) != 0) {
×
870
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
871
    mError("sdbiter:%p, failed to fasync file %s since %s", pIter, pIter->name, tstrerror(code));
×
872
    goto _OVER;
×
873
  }
874

875
  if (taosCloseFile(&pIter->file) != 0) {
×
876
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
877
    goto _OVER;
×
878
  }
879
  pIter->file = NULL;
×
880

881
  char datafile[PATH_MAX] = {0};
×
882
  snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
×
883
  code = taosRenameFile(pIter->name, datafile);
×
884
  if (code != 0) {
×
885
    mError("sdbiter:%p, failed to rename file %s to %s since %s", pIter, pIter->name, datafile, tstrerror(code));
×
886
    goto _OVER;
×
887
  }
888

889
  code = sdbReadFile(pSdb);
×
890
  if (code != 0) {
×
891
    mError("sdbiter:%p, failed to read from %s since %s", pIter, datafile, tstrerror(code));
×
892
    goto _OVER;
×
893
  }
894

895
  if (config > 0) {
×
896
    pSdb->commitConfig = config;
×
897
  }
898
  if (term > 0) {
×
899
    pSdb->commitTerm = term;
×
900
  }
901
  if (index > 0) {
×
902
    pSdb->commitIndex = index;
×
903
  }
904

905
  mInfo("sdbiter:%p, success applyed to sdb", pIter);
×
906
  code = 0;
×
907

908
_OVER:
×
909
  sdbCloseIter(pIter);
×
910
  return code;
×
911
}
912

913
int32_t sdbDoWrite(SSdb *pSdb, SSdbIter *pIter, void *pBuf, int32_t len) {
×
914
  int32_t code = 0;
×
915
  int32_t writelen = taosWriteFile(pIter->file, pBuf, len);
×
916
  if (writelen != len) {
×
917
    code = terrno;
×
918
    mError("failed to write len:%d since %s, total:%" PRId64, len, tstrerror(code), pIter->total);
×
919
    TAOS_RETURN(code);
×
920
  }
921

922
  pIter->total += writelen;
×
923
  mInfo("sdbiter:%p, write:%d bytes to snapshot, total:%" PRId64, pIter, writelen, pIter->total);
×
924
  return 0;
×
925
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc