• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4875

09 Dec 2025 01:22AM UTC coverage: 64.472% (-0.2%) from 64.623%
#4875

push

travis-ci

guanshengliang
fix: temporarily disable memory leak detection for UDF tests (#33856)

162014 of 251293 relevant lines covered (64.47%)

104318075.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

48.04
/source/dnode/mnode/sdb/src/sdbFile.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "crypt.h"
18
#include "sdb.h"
19
#include "sync.h"
20
#include "tchecksum.h"
21
#include "tglobal.h"
22
#include "wal.h"
23

24
#define SDB_TABLE_SIZE   24
25
#define SDB_RESERVE_SIZE 512
26
#define SDB_FILE_VER     1
27

28
#define SDB_TABLE_SIZE_EXTRA   SDB_MAX
29
#define SDB_RESERVE_SIZE_EXTRA (512 - (SDB_TABLE_SIZE_EXTRA - SDB_TABLE_SIZE) * 2 * sizeof(int64_t))
30

31
static int32_t sdbDeployData(SSdb *pSdb) {
327,396✔
32
  int32_t code = 0;
327,396✔
33
  mInfo("start to deploy sdb");
327,396✔
34

35
  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
13,423,236✔
36
    SdbDeployFp fp = pSdb->deployFps[i];
13,095,840✔
37
    if (fp == NULL) continue;
13,095,840✔
38

39
    mInfo("start to deploy sdb:%s", sdbTableName(i));
1,964,376✔
40
    code = (*fp)(pSdb->pMnode);
1,964,376✔
41
    if (code != 0) {
1,964,376✔
42
      mError("failed to deploy sdb:%s since %s", sdbTableName(i), tstrerror(code));
×
43
      return -1;
×
44
    }
45
  }
46

47
  mInfo("sdb deploy success");
327,396✔
48
  return 0;
327,396✔
49
}
50

51
static int32_t sdbUpgradeData(SSdb *pSdb, int32_t version) {
411,077✔
52
  int32_t code = 0;
411,077✔
53
  mInfo("start to upgrade sdb");
411,077✔
54

55
  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
16,854,157✔
56
    SdbUpgradeFp fp = pSdb->upgradeFps[i];
16,443,080✔
57
    if (fp == NULL) continue;
16,443,080✔
58

59
    mInfo("start to upgrade sdb:%s", sdbTableName(i));
411,077✔
60
    code = (*fp)(pSdb->pMnode, version);
411,077✔
61
    if (code != 0) {
411,077✔
62
      mError("failed to upgrade sdb:%s since %s", sdbTableName(i), tstrerror(code));
×
63
      return -1;
×
64
    }
65
  }
66

67
  mInfo("sdb upgrade success");
411,077✔
68
  return 0;
411,077✔
69
}
70

71
static int32_t sdbAfterRestoredData(SSdb *pSdb) {
267,262✔
72
  int32_t code = 0;
267,262✔
73
  mInfo("start to prepare sdb");
267,262✔
74

75
  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
10,957,742✔
76
    SdbAfterRestoredFp fp = pSdb->afterRestoredFps[i];
10,690,480✔
77
    if (fp == NULL) continue;
10,690,480✔
78

79
    mInfo("start to prepare sdb:%s", sdbTableName(i));
267,262✔
80
    code = (*fp)(pSdb->pMnode);
267,262✔
81
    if (code != 0) {
267,262✔
82
      mError("failed to prepare sdb:%s since %s", sdbTableName(i), tstrerror(code));
×
83
      return -1;
×
84
    }
85
  }
86

87
  mInfo("sdb prepare success");
267,262✔
88
  return 0;
267,262✔
89
}
90

91
static void sdbResetData(SSdb *pSdb) {
175,089✔
92
  mInfo("start to reset sdb");
175,089✔
93

94
  for (ESdbType i = 0; i < SDB_MAX; ++i) {
7,178,649✔
95
    SHashObj *hash = pSdb->hashObjs[i];
7,003,560✔
96
    if (hash == NULL) continue;
7,003,560✔
97

98
    sdbWriteLock(pSdb, i);
6,303,204✔
99

100
    SSdbRow **ppRow = taosHashIterate(hash, NULL);
6,303,204✔
101
    while (ppRow != NULL) {
6,303,204✔
102
      SSdbRow *pRow = *ppRow;
×
103
      if (pRow == NULL) continue;
×
104

105
      sdbFreeRow(pSdb, pRow, true);
×
106
      ppRow = taosHashIterate(hash, ppRow);
×
107
    }
108

109
    taosHashClear(pSdb->hashObjs[i]);
6,303,204✔
110
    pSdb->tableVer[i] = 0;
6,303,204✔
111
    pSdb->maxId[i] = 0;
6,303,204✔
112

113
    sdbUnLock(pSdb, i);
6,303,204✔
114

115
    mInfo("sdb:%s is reset", sdbTableName(i));
6,303,204✔
116
  }
117

118
  pSdb->applyIndex = -1;
175,089✔
119
  pSdb->applyTerm = -1;
175,089✔
120
  pSdb->applyConfig = -1;
175,089✔
121
  pSdb->commitIndex = -1;
175,089✔
122
  pSdb->commitTerm = -1;
175,089✔
123
  pSdb->commitConfig = -1;
175,089✔
124
  mInfo("sdb reset success");
175,089✔
125
}
175,089✔
126

127
static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
144,595✔
128
  int32_t code = 0;
144,595✔
129
  int64_t sver = 0;
144,595✔
130
  int32_t ret = taosReadFile(pFile, &sver, sizeof(int64_t));
144,595✔
131
  if (ret < 0) {
144,595✔
132
    return terrno;
×
133
  }
134
  if (ret != sizeof(int64_t)) {
144,595✔
135
    code = TSDB_CODE_FILE_CORRUPTED;
×
136
    TAOS_RETURN(code);
×
137
  }
138
  if (sver != SDB_FILE_VER) {
144,595✔
139
    code = TSDB_CODE_FILE_CORRUPTED;
×
140
    TAOS_RETURN(code);
×
141
  }
142

143
  ret = taosReadFile(pFile, &pSdb->applyIndex, sizeof(int64_t));
144,595✔
144
  if (ret < 0) {
144,595✔
145
    return terrno;
×
146
  }
147
  if (ret != sizeof(int64_t)) {
144,595✔
148
    code = TSDB_CODE_FILE_CORRUPTED;
×
149
    TAOS_RETURN(code);
×
150
  }
151

152
  ret = taosReadFile(pFile, &pSdb->applyTerm, sizeof(int64_t));
144,595✔
153
  if (ret < 0) {
144,595✔
154
    return terrno;
×
155
  }
156
  if (ret != sizeof(int64_t)) {
144,595✔
157
    code = TSDB_CODE_FILE_CORRUPTED;
×
158
    TAOS_RETURN(code);
×
159
  }
160

161
  ret = taosReadFile(pFile, &pSdb->applyConfig, sizeof(int64_t));
144,595✔
162
  if (ret < 0) {
144,595✔
163
    return terrno;
×
164
  }
165
  if (ret != sizeof(int64_t)) {
144,595✔
166
    code = TSDB_CODE_FILE_CORRUPTED;
×
167
    TAOS_RETURN(code);
×
168
  }
169

170
  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
3,614,875✔
171
    int64_t maxId = 0;
3,470,280✔
172
    ret = taosReadFile(pFile, &maxId, sizeof(int64_t));
3,470,280✔
173
    if (ret < 0) {
3,470,280✔
174
      return terrno;
×
175
    }
176
    if (ret != sizeof(int64_t)) {
3,470,280✔
177
      code = TSDB_CODE_FILE_CORRUPTED;
×
178
      TAOS_RETURN(code);
×
179
    }
180
    if (i < SDB_MAX) {
3,470,280✔
181
      pSdb->maxId[i] = maxId;
3,470,280✔
182
    }
183
  }
184

185
  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
3,614,875✔
186
    int64_t ver = 0;
3,470,280✔
187
    ret = taosReadFile(pFile, &ver, sizeof(int64_t));
3,470,280✔
188
    if (ret < 0) {
3,470,280✔
189
      return terrno;
×
190
    }
191
    if (ret != sizeof(int64_t)) {
3,470,280✔
192
      code = TSDB_CODE_FILE_CORRUPTED;
×
193
      TAOS_RETURN(code);
×
194
    }
195
    if (i < SDB_MAX) {
3,470,280✔
196
      pSdb->tableVer[i] = ver;
3,470,280✔
197
    }
198
  }
199

200
  // for sdb compatibility
201
  for (int32_t i = SDB_TABLE_SIZE; i < SDB_TABLE_SIZE_EXTRA; ++i) {
2,458,115✔
202
    int64_t maxId = 0;
2,313,520✔
203
    ret = taosReadFile(pFile, &maxId, sizeof(int64_t));
2,313,520✔
204
    if (ret < 0) {
2,313,520✔
205
      code = TAOS_SYSTEM_ERROR(ERRNO);
×
206
      TAOS_RETURN(code);
×
207
    }
208
    if (ret != sizeof(int64_t)) {
2,313,520✔
209
      code = TSDB_CODE_FILE_CORRUPTED;
×
210
      TAOS_RETURN(code);
×
211
    }
212
    if (i < SDB_MAX) {
2,313,520✔
213
      pSdb->maxId[i] = maxId;
2,313,520✔
214
    }
215

216
    int64_t ver = 0;
2,313,520✔
217
    ret = taosReadFile(pFile, &ver, sizeof(int64_t));
2,313,520✔
218
    if (ret < 0) {
2,313,520✔
219
      code = TAOS_SYSTEM_ERROR(ERRNO);
×
220
      TAOS_RETURN(code);
×
221
    }
222
    if (ret != sizeof(int64_t)) {
2,313,520✔
223
      code = TSDB_CODE_FILE_CORRUPTED;
×
224
      TAOS_RETURN(code);
×
225
    }
226
    if (i < SDB_MAX) {
2,313,520✔
227
      pSdb->tableVer[i] = ver;
2,313,520✔
228
    }
229
  }
230

231
  char reserve[SDB_RESERVE_SIZE_EXTRA] = {0};
144,595✔
232
  ret = taosReadFile(pFile, reserve, sizeof(reserve));
144,595✔
233
  if (ret < 0) {
144,595✔
234
    return terrno;
×
235
  }
236
  if (ret != sizeof(reserve)) {
144,595✔
237
    code = TSDB_CODE_FILE_CORRUPTED;
×
238
    TAOS_RETURN(code);
×
239
  }
240

241
  return 0;
144,595✔
242
}
243

244
static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) {
924,933✔
245
  int64_t sver = SDB_FILE_VER;
924,933✔
246
  if (taosWriteFile(pFile, &sver, sizeof(int64_t)) != sizeof(int64_t)) {
924,933✔
247
    return terrno;
×
248
  }
249

250
  mInfo("vgId:1, write sdb file with sdb applyIndex:%" PRId64 " term:%" PRId64 " config:%" PRId64, pSdb->applyIndex,
924,933✔
251
        pSdb->applyTerm, pSdb->applyConfig);
252
  if (taosWriteFile(pFile, &pSdb->applyIndex, sizeof(int64_t)) != sizeof(int64_t)) {
924,933✔
253
    return terrno;
×
254
  }
255

256
  if (taosWriteFile(pFile, &pSdb->applyTerm, sizeof(int64_t)) != sizeof(int64_t)) {
924,933✔
257
    return terrno;
×
258
  }
259

260
  if (taosWriteFile(pFile, &pSdb->applyConfig, sizeof(int64_t)) != sizeof(int64_t)) {
924,933✔
261
    return terrno;
×
262
  }
263

264
  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
23,123,325✔
265
    int64_t maxId = 0;
22,198,392✔
266
    if (i < SDB_MAX) {
22,198,392✔
267
      maxId = pSdb->maxId[i];
22,198,392✔
268
    }
269
    if (taosWriteFile(pFile, &maxId, sizeof(int64_t)) != sizeof(int64_t)) {
22,198,392✔
270
      return terrno;
×
271
    }
272
  }
273

274
  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
23,123,325✔
275
    int64_t ver = 0;
22,198,392✔
276
    if (i < SDB_MAX) {
22,198,392✔
277
      ver = pSdb->tableVer[i];
22,198,392✔
278
    }
279
    if (taosWriteFile(pFile, &ver, sizeof(int64_t)) != sizeof(int64_t)) {
22,198,392✔
280
      return terrno;
×
281
    }
282
  }
283

284
  // for sdb compatibility
285
  for (int32_t i = SDB_TABLE_SIZE; i < SDB_TABLE_SIZE_EXTRA; ++i) {
15,723,861✔
286
    int64_t maxId = 0;
14,798,928✔
287
    if (i < SDB_MAX) {
14,798,928✔
288
      maxId = pSdb->maxId[i];
14,798,928✔
289
    }
290
    if (taosWriteFile(pFile, &maxId, sizeof(int64_t)) != sizeof(int64_t)) {
14,798,928✔
291
      return terrno;
×
292
    }
293

294
    int64_t ver = 0;
14,798,928✔
295
    if (i < SDB_MAX) {
14,798,928✔
296
      ver = pSdb->tableVer[i];
14,798,928✔
297
    }
298
    if (taosWriteFile(pFile, &ver, sizeof(int64_t)) != sizeof(int64_t)) {
14,798,928✔
299
      return terrno;
×
300
    }
301
  }
302

303
  char reserve[SDB_RESERVE_SIZE_EXTRA] = {0};
924,933✔
304
  if (taosWriteFile(pFile, reserve, sizeof(reserve)) != sizeof(reserve)) {
924,933✔
305
    return terrno;
×
306
  }
307

308
  return 0;
924,933✔
309
}
310

311
static int32_t sdbReadFileImp(SSdb *pSdb) {
175,089✔
312
  int64_t offset = 0;
175,089✔
313
  int32_t code = 0;
175,089✔
314
  int32_t readLen = 0;
175,089✔
315
  int64_t ret = 0;
175,089✔
316
  char    file[PATH_MAX] = {0};
175,089✔
317
  int32_t bufLen = TSDB_MAX_MSG_SIZE;
175,089✔
318

319
  snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
175,089✔
320
  mInfo("start to read sdb file:%s", file);
175,089✔
321

322
  SSdbRaw *pRaw = taosMemoryMalloc(bufLen + 100);
175,089✔
323
  if (pRaw == NULL) {
175,089✔
324
    code = terrno;
×
325
    mError("failed read sdb file since %s", tstrerror(code));
×
326
    TAOS_RETURN(code);
×
327
  }
328

329
  TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
175,089✔
330
  if (pFile == NULL) {
175,089✔
331
    taosMemoryFree(pRaw);
30,494✔
332
    code = terrno;
30,494✔
333
    mInfo("read sdb file:%s finished since %s", file, tstrerror(code));
30,494✔
334
    return 0;
30,494✔
335
  }
336

337
  code = sdbReadFileHead(pSdb, pFile);
144,595✔
338
  if (code != 0) {
144,595✔
339
    mError("failed to read sdb file:%s head since %s", file, tstrerror(code));
×
340
    taosMemoryFree(pRaw);
×
341
    int32_t ret = 0;
×
342
    if ((ret = taosCloseFile(&pFile)) != 0) {
×
343
      mError("failed to close sdb file:%s since %s", file, tstrerror(ret));
×
344
    }
345
    return code;
×
346
  }
347

348
  int64_t tableVer[SDB_MAX] = {0};
144,595✔
349
  memcpy(tableVer, pSdb->tableVer, sizeof(tableVer));
144,595✔
350

351
  while (1) {
19,857,829✔
352
    readLen = sizeof(SSdbRaw);
20,002,424✔
353
    ret = taosReadFile(pFile, pRaw, readLen);
20,002,424✔
354
    if (ret == 0) break;
20,002,424✔
355

356
    if (ret < 0) {
19,857,829✔
357
      code = terrno;
×
358
      mError("failed to read sdb file:%s since %s", file, tstrerror(code));
×
359
      goto _OVER;
×
360
    }
361

362
    if (ret != readLen) {
19,857,829✔
363
      code = TSDB_CODE_FILE_CORRUPTED;
×
364
      mError("failed to read sdb file:%s since %s, ret:%" PRId64 " != readLen:%d", file, tstrerror(code), ret, readLen);
×
365
      goto _OVER;
×
366
    }
367

368
    readLen = pRaw->dataLen + sizeof(int32_t);
19,857,829✔
369
    if (tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_SDB) == DND_CS_SDB) {
19,857,829✔
370
      readLen = ENCRYPTED_LEN(pRaw->dataLen) + sizeof(int32_t);
×
371
    }
372
    if (readLen >= bufLen) {
19,857,829✔
373
      bufLen = pRaw->dataLen * 2;
×
374
      SSdbRaw *pNewRaw = taosMemoryMalloc(bufLen + 100);
×
375
      if (pNewRaw == NULL) {
×
376
        code = terrno;
×
377
        mError("failed read sdb file since malloc new sdbRaw size:%d failed", bufLen);
×
378
        goto _OVER;
×
379
      }
380
      mInfo("malloc new sdb raw size:%d, type:%d", bufLen, pRaw->type);
×
381
      memcpy(pNewRaw, pRaw, sizeof(SSdbRaw));
×
382
      sdbFreeRaw(pRaw);
×
383
      pRaw = pNewRaw;
×
384
    }
385

386
    ret = taosReadFile(pFile, pRaw->pData, readLen);
19,857,829✔
387
    if (ret < 0) {
19,857,829✔
388
      code = terrno;
×
389
      mError("failed to read sdb file:%s since %s, ret:%" PRId64 " readLen:%d", file, tstrerror(code), ret, readLen);
×
390
      goto _OVER;
×
391
    }
392

393
    if (ret != readLen) {
19,857,829✔
394
      code = TSDB_CODE_FILE_CORRUPTED;
×
395
      mError("failed to read sdb file:%s since %s, ret:%" PRId64 " != readLen:%d", file, tstrerror(code), ret, readLen);
×
396
      goto _OVER;
×
397
    }
398

399
    if (tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_SDB) == DND_CS_SDB) {
19,857,829✔
400
      int32_t count = 0;
×
401

402
      char *plantContent = taosMemoryMalloc(ENCRYPTED_LEN(pRaw->dataLen));
×
403
      if (plantContent == NULL) {
×
404
        code = terrno;
×
405
        goto _OVER;
×
406
      }
407

408
      SCryptOpts opts = {0};
×
409
      opts.len = ENCRYPTED_LEN(pRaw->dataLen);
×
410
      opts.source = pRaw->pData;
×
411
      opts.result = plantContent;
×
412
      opts.unitLen = 16;
×
413
      tstrncpy(opts.key, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
×
414

415
      count = Builtin_CBC_Decrypt(&opts);
×
416

417
      // mDebug("read sdb, CBC Decrypt dataLen:%d, descrypted len:%d, %s", pRaw->dataLen, count, __FUNCTION__);
418

419
      memcpy(pRaw->pData, plantContent, pRaw->dataLen);
×
420
      taosMemoryFree(plantContent);
×
421
      memcpy(pRaw->pData + pRaw->dataLen, &pRaw->pData[ENCRYPTED_LEN(pRaw->dataLen)], sizeof(int32_t));
×
422
    }
423

424
    int32_t totalLen = sizeof(SSdbRaw) + pRaw->dataLen + sizeof(int32_t);
19,857,829✔
425
    if ((!taosCheckChecksumWhole((const uint8_t *)pRaw, totalLen)) != 0) {
39,715,658✔
426
      code = TSDB_CODE_CHECKSUM_ERROR;
×
427
      mError("failed to read sdb file:%s since %s, readLen:%d", file, tstrerror(code), readLen);
×
428
      goto _OVER;
×
429
    }
430

431
    if (pRaw->type >= SDB_MAX) {
19,857,829✔
432
      mInfo("skip sdb raw type:%d since it is not supported", pRaw->type);
×
433
      continue;
×
434
    }
435

436
    code = sdbWriteWithoutFree(pSdb, pRaw);
19,857,829✔
437
    if (code != 0) {
19,857,829✔
438
      mError("failed to exec sdbWrite while read sdb file:%s since %s", file, terrstr());
×
439
      goto _OVER;
×
440
    }
441
  }
442

443
  code = 0;
144,595✔
444
  pSdb->commitIndex = pSdb->applyIndex;
144,595✔
445
  pSdb->commitTerm = pSdb->applyTerm;
144,595✔
446
  pSdb->commitConfig = pSdb->applyConfig;
144,595✔
447
  memcpy(pSdb->tableVer, tableVer, sizeof(tableVer));
144,595✔
448
  mInfo("vgId:1, trans:0, read sdb file:%s success, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64, file,
144,595✔
449
        pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig);
450

451
_OVER:
142,819✔
452
  if ((ret = taosCloseFile(&pFile)) != 0) {
144,595✔
453
    mError("failed to close sdb file:%s since %s", file, tstrerror(ret));
×
454
  }
455
  sdbFreeRaw(pRaw);
144,595✔
456

457
  TAOS_RETURN(code);
144,595✔
458
}
459

460
int32_t sdbReadFile(SSdb *pSdb) {
175,089✔
461
  (void)taosThreadMutexLock(&pSdb->filelock);
175,089✔
462

463
  sdbResetData(pSdb);
175,089✔
464
  int32_t code = sdbReadFileImp(pSdb);
175,089✔
465
  if (code != 0) {
175,089✔
466
    mError("failed to read sdb file since %s", tstrerror(code));
×
467
    sdbResetData(pSdb);
×
468
  }
469

470
  (void)taosThreadMutexUnlock(&pSdb->filelock);
175,089✔
471
  return code;
175,089✔
472
}
473

474
static int32_t sdbWriteFileImp(SSdb *pSdb, int32_t skip_type) {
924,933✔
475
  int32_t code = 0;
924,933✔
476

477
  char tmpfile[PATH_MAX] = {0};
924,933✔
478
  snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
924,933✔
479
  char curfile[PATH_MAX] = {0};
924,933✔
480
  snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
924,933✔
481

482
  mInfo("start to write sdb file, apply index:%" PRId64 " term:%" PRId64 " config:%" PRId64 ", commit index:%" PRId64
924,933✔
483
        " term:%" PRId64 " config:%" PRId64 ", file:%s",
484
        pSdb->applyIndex, pSdb->applyTerm, pSdb->applyConfig, pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig,
485
        curfile);
486

487
  TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
924,933✔
488
  if (pFile == NULL) {
924,933✔
489
    code = terrno;
×
490
    mError("failed to open sdb file:%s for write since %s", tmpfile, tstrerror(code));
×
491
    TAOS_RETURN(code);
×
492
  }
493

494
  code = sdbWriteFileHead(pSdb, pFile);
924,933✔
495
  if (code != 0) {
924,933✔
496
    mError("failed to write sdb file:%s head since %s", tmpfile, tstrerror(code));
×
497
    int32_t ret = 0;
×
498
    if ((ret = taosCloseFile(&pFile)) != 0) {
×
499
      mError("failed to close sdb file:%s since %s", tmpfile, tstrerror(ret));
×
500
    }
501
    return code;
×
502
  }
503

504
  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
37,922,253✔
505
    if (i == skip_type) continue;
36,997,320✔
506
    SdbEncodeFp encodeFp = pSdb->encodeFps[i];
36,997,320✔
507
    if (encodeFp == NULL) continue;
36,997,320✔
508

509
    mInfo("write %s to sdb file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i));
32,372,655✔
510

511
    SHashObj *hash = pSdb->hashObjs[i];
32,372,655✔
512
    sdbWriteLock(pSdb, i);
32,372,655✔
513

514
    SSdbRow **ppRow = taosHashIterate(hash, NULL);
32,372,655✔
515
    while (ppRow != NULL) {
150,507,270✔
516
      SSdbRow *pRow = *ppRow;
118,134,615✔
517
      if (pRow == NULL) {
118,134,615✔
518
        ppRow = taosHashIterate(hash, ppRow);
×
519
        continue;
×
520
      }
521

522
      if (pRow->status != SDB_STATUS_READY && pRow->status != SDB_STATUS_DROPPING) {
118,134,615✔
523
        sdbPrintOper(pSdb, pRow, "not-write");
59,581✔
524
        ppRow = taosHashIterate(hash, ppRow);
59,581✔
525
        continue;
59,581✔
526
      }
527

528
      sdbPrintOper(pSdb, pRow, "write");
118,075,034✔
529

530
      SSdbRaw *pRaw = (*encodeFp)(pRow->pObj);
118,075,034✔
531
      if (pRaw != NULL) {
118,075,034✔
532
        pRaw->status = pRow->status;
118,075,034✔
533

534
        if (taosWriteFile(pFile, pRaw, sizeof(SSdbRaw)) != sizeof(SSdbRaw)) {
118,075,034✔
535
          code = terrno;
×
536
          taosHashCancelIterate(hash, ppRow);
×
537
          sdbFreeRaw(pRaw);
×
538
          break;
×
539
        }
540

541
        int32_t newDataLen = pRaw->dataLen;
118,075,034✔
542
        char   *newData = pRaw->pData;
118,075,034✔
543
        if (tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_SDB) == DND_CS_SDB) {
118,075,034✔
544
          newDataLen = ENCRYPTED_LEN(pRaw->dataLen);
×
545
          newData = taosMemoryMalloc(newDataLen);
×
546
          if (newData == NULL) {
×
547
            code = terrno;
×
548
            taosHashCancelIterate(hash, ppRow);
×
549
            sdbFreeRaw(pRaw);
×
550
            break;
×
551
          }
552

553
          SCryptOpts opts = {0};
×
554
          opts.len = newDataLen;
×
555
          opts.source = pRaw->pData;
×
556
          opts.result = newData;
×
557
          opts.unitLen = 16;
×
558
          tstrncpy(opts.key, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
×
559

560
          int32_t count = Builtin_CBC_Encrypt(&opts);
×
561

562
          // mDebug("write sdb, CBC Encrypt encryptedDataLen:%d, dataLen:%d, %s",
563
          //       newDataLen, pRaw->dataLen, __FUNCTION__);
564
        }
565

566
        if (taosWriteFile(pFile, newData, newDataLen) != newDataLen) {
118,075,034✔
567
          code = terrno;
×
568
          taosHashCancelIterate(hash, ppRow);
×
569
          sdbFreeRaw(pRaw);
×
570
          break;
×
571
        }
572

573
        if (tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_SDB) == DND_CS_SDB) {
118,075,034✔
574
          taosMemoryFree(newData);
×
575
        }
576

577
        int32_t cksum = taosCalcChecksum(0, (const uint8_t *)pRaw, sizeof(SSdbRaw) + pRaw->dataLen);
118,075,034✔
578
        if (taosWriteFile(pFile, &cksum, sizeof(int32_t)) != sizeof(int32_t)) {
118,075,034✔
579
          code = terrno;
×
580
          taosHashCancelIterate(hash, ppRow);
×
581
          sdbFreeRaw(pRaw);
×
582
          break;
×
583
        }
584
      } else {
585
        code = TSDB_CODE_APP_ERROR;
×
586
        taosHashCancelIterate(hash, ppRow);
×
587
        break;
×
588
      }
589

590
      sdbFreeRaw(pRaw);
118,075,034✔
591
      ppRow = taosHashIterate(hash, ppRow);
118,075,034✔
592
    }
593
    sdbUnLock(pSdb, i);
32,372,655✔
594
  }
595

596
  if (code == 0) {
924,933✔
597
    code = taosFsyncFile(pFile);
924,933✔
598
    if (code != 0) {
924,933✔
599
      code = TAOS_SYSTEM_ERROR(ERRNO);
×
600
      mError("failed to sync sdb file:%s since %s", tmpfile, tstrerror(code));
×
601
    }
602
  }
603

604
  if (taosCloseFile(&pFile) != 0) {
924,933✔
605
    code = taosRenameFile(tmpfile, curfile);
×
606
  }
607

608
  if (code == 0) {
924,933✔
609
    code = taosRenameFile(tmpfile, curfile);
924,933✔
610
    if (code != 0) {
924,933✔
611
      mError("failed to write sdb file:%s since %s", curfile, tstrerror(code));
×
612
    }
613
  }
614

615
  if (code != 0) {
924,933✔
616
    mError("failed to write sdb file:%s since %s", curfile, tstrerror(code));
×
617
  } else {
618
    pSdb->commitIndex = pSdb->applyIndex;
924,933✔
619
    pSdb->commitTerm = pSdb->applyTerm;
924,933✔
620
    pSdb->commitConfig = pSdb->applyConfig;
924,933✔
621
    mInfo("vgId:1, trans:0, write sdb file success, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64
924,933✔
622
          " file:%s",
623
          pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig, curfile);
624
  }
625

626
  terrno = code;
924,933✔
627
  return code;
924,933✔
628
}
629

630
int32_t sdbWriteFile(SSdb *pSdb, int32_t delta) {
42,968,231✔
631
  int32_t code = 0;
42,968,231✔
632
  if (pSdb->applyIndex == pSdb->commitIndex) {
42,968,231✔
633
    return 0;
911,455✔
634
  }
635

636
  if (pSdb->applyIndex - pSdb->commitIndex < delta) {
42,056,776✔
637
    return 0;
41,126,868✔
638
  }
639

640
  (void)taosThreadMutexLock(&pSdb->filelock);
929,908✔
641
  if (pSdb->pWal != NULL) {
929,908✔
642
    if (pSdb->sync > 0) {
929,908✔
643
      code = syncBeginSnapshot(pSdb->sync, pSdb->applyIndex);
929,908✔
644
    }
645
  }
646
  if (code == 0) {
929,908✔
647
    code = sdbWriteFileImp(pSdb, -1);
924,933✔
648
  }
649
  if (code == 0) {
929,908✔
650
    if (pSdb->pWal != NULL) {
924,933✔
651
      if (pSdb->sync > 0) {
924,933✔
652
        code = syncEndSnapshot(pSdb->sync, false);
924,933✔
653
      }
654
    }
655
  }
656
  if (code != 0) {
929,908✔
657
    mError("failed to write sdb file since %s", tstrerror(code));
4,975✔
658
  } else {
659
    mInfo("vgId:1, trans:0, write sdb file success, apply index:%" PRId64 ", term:%" PRId64 ", config:%" PRId64,
924,933✔
660
          pSdb->applyIndex, pSdb->applyTerm, pSdb->applyConfig);
661
  }
662
  (void)taosThreadMutexUnlock(&pSdb->filelock);
929,908✔
663
  return code;
929,908✔
664
}
665

666
int32_t sdbWriteFileForDump(SSdb *pSdb, int32_t skip_type) {
×
667
  int32_t code = 0;
×
668

669
  code = sdbWriteFileImp(pSdb, skip_type);
×
670

671
  return code;
×
672
}
673

674
int32_t sdbDeploy(SSdb *pSdb) {
327,396✔
675
  int32_t code = 0;
327,396✔
676
  code = sdbDeployData(pSdb);
327,396✔
677
  if (code != 0) {
327,396✔
678
    TAOS_RETURN(code);
×
679
  }
680

681
  code = sdbWriteFile(pSdb, 0);
327,396✔
682
  if (code != 0) {
327,396✔
683
    TAOS_RETURN(code);
×
684
  }
685

686
  return 0;
327,396✔
687
}
688

689
int32_t sdbUpgrade(SSdb *pSdb, int32_t version) {
411,077✔
690
  int32_t code = 0;
411,077✔
691
  code = sdbUpgradeData(pSdb, version);
411,077✔
692
  if (code != 0) {
411,077✔
693
    TAOS_RETURN(code);
×
694
  }
695

696
  code = sdbWriteFile(pSdb, 0);
411,077✔
697
  if (code != 0) {
411,077✔
698
    TAOS_RETURN(code);
×
699
  }
700

701
  return 0;
411,077✔
702
}
703

704
int32_t sdbAfterRestored(SSdb *pSdb) {
267,262✔
705
  int32_t code = 0;
267,262✔
706
  code = sdbAfterRestoredData(pSdb);
267,262✔
707
  if (code != 0) {
267,262✔
708
    TAOS_RETURN(code);
×
709
  }
710
  return 0;
267,262✔
711
}
712

713
static SSdbIter *sdbCreateIter(SSdb *pSdb) {
×
714
  SSdbIter *pIter = taosMemoryCalloc(1, sizeof(SSdbIter));
×
715
  if (pIter == NULL) {
×
716
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
717
    return NULL;
×
718
  }
719

720
  char name[PATH_MAX + 100] = {0};
×
721
  snprintf(name, sizeof(name), "%s%ssdb.data.%" PRIu64, pSdb->tmpDir, TD_DIRSEP, (uint64_t)pIter);
×
722
  pIter->name = taosStrdup(name);
×
723
  if (pIter->name == NULL) {
×
724
    taosMemoryFree(pIter);
×
725
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
726
    return NULL;
×
727
  }
728

729
  return pIter;
×
730
}
731

732
static void sdbCloseIter(SSdbIter *pIter) {
×
733
  if (pIter == NULL) return;
×
734

735
  if (pIter->file != NULL) {
×
736
    int32_t ret = 0;
×
737
    if ((ret = taosCloseFile(&pIter->file)) != 0) {
×
738
      mError("failed to close sdb file since %s", tstrerror(ret));
×
739
    }
740
    pIter->file = NULL;
×
741
  }
742

743
  if (pIter->name != NULL) {
×
744
    int32_t ret = 0;
×
745
    if ((ret = taosRemoveFile(pIter->name)) != 0) {
×
746
      mError("failed to remove sdb file:%s since %s", pIter->name, tstrerror(ret));
×
747
    }
748
    taosMemoryFree(pIter->name);
×
749
    pIter->name = NULL;
×
750
  }
751

752
  mInfo("sdbiter:%p, is closed, total:%" PRId64, pIter, pIter->total);
×
753
  taosMemoryFree(pIter);
×
754
}
755

756
int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter, int64_t *index, int64_t *term, int64_t *config) {
×
757
  int32_t   code = 0;
×
758
  SSdbIter *pIter = sdbCreateIter(pSdb);
×
759
  if (pIter == NULL) return -1;
×
760

761
  char datafile[PATH_MAX] = {0};
×
762
  snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
×
763

764
  (void)taosThreadMutexLock(&pSdb->filelock);
×
765
  int64_t commitIndex = pSdb->commitIndex;
×
766
  int64_t commitTerm = pSdb->commitTerm;
×
767
  int64_t commitConfig = pSdb->commitConfig;
×
768
  if (taosCopyFile(datafile, pIter->name) < 0) {
×
769
    code = terrno;
×
770
    (void)taosThreadMutexUnlock(&pSdb->filelock);
×
771
    mError("failed to copy sdb file %s to %s since %s", datafile, pIter->name, tstrerror(code));
×
772
    sdbCloseIter(pIter);
×
773
    TAOS_RETURN(code);
×
774
  }
775
  (void)taosThreadMutexUnlock(&pSdb->filelock);
×
776

777
  pIter->file = taosOpenFile(pIter->name, TD_FILE_READ);
×
778
  if (pIter->file == NULL) {
×
779
    code = terrno;
×
780
    mError("failed to open sdb file:%s since %s", pIter->name, tstrerror(code));
×
781
    sdbCloseIter(pIter);
×
782
    TAOS_RETURN(code);
×
783
  }
784

785
  *ppIter = pIter;
×
786
  if (index != NULL) *index = commitIndex;
×
787
  if (term != NULL) *term = commitTerm;
×
788
  if (config != NULL) *config = commitConfig;
×
789

790
  mInfo("sdbiter:%p, is created to read snapshot, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s",
×
791
        pIter, commitIndex, commitTerm, commitConfig, pIter->name);
792
  return 0;
×
793
}
794

795
void sdbStopRead(SSdb *pSdb, SSdbIter *pIter) { sdbCloseIter(pIter); }
×
796

797
int32_t sdbDoRead(SSdb *pSdb, SSdbIter *pIter, void **ppBuf, int32_t *len) {
×
798
  int32_t code = 0;
×
799
  int32_t maxlen = 4096;
×
800
  void   *pBuf = taosMemoryCalloc(1, maxlen);
×
801
  if (pBuf == NULL) {
×
802
    code = terrno;
×
803
    TAOS_RETURN(code);
×
804
  }
805

806
  int32_t readlen = taosReadFile(pIter->file, pBuf, maxlen);
×
807
  if (readlen < 0 || readlen > maxlen) {
×
808
    code = terrno;
×
809
    mError("sdbiter:%p, failed to read snapshot since %s, total:%" PRId64, pIter, tstrerror(code), pIter->total);
×
810
    *ppBuf = NULL;
×
811
    *len = 0;
×
812
    taosMemoryFree(pBuf);
×
813
    TAOS_RETURN(code);
×
814
  } else if (readlen == 0) {
×
815
    mInfo("sdbiter:%p, read snapshot to the end, total:%" PRId64, pIter, pIter->total);
×
816
    *ppBuf = NULL;
×
817
    *len = 0;
×
818
    taosMemoryFree(pBuf);
×
819
    return 0;
×
820
  } else {  // (readlen <= maxlen)
821
    pIter->total += readlen;
×
822
    mInfo("sdbiter:%p, read:%d bytes from snapshot, total:%" PRId64, pIter, readlen, pIter->total);
×
823
    *ppBuf = pBuf;
×
824
    *len = readlen;
×
825
    return 0;
×
826
  }
827
}
828

829
int32_t sdbStartWrite(SSdb *pSdb, SSdbIter **ppIter) {
×
830
  int32_t   code = 0;
×
831
  SSdbIter *pIter = sdbCreateIter(pSdb);
×
832
  if (pIter == NULL) {
×
833
    code = terrno;
×
834
    TAOS_RETURN(code);
×
835
  }
836

837
  pIter->file = taosOpenFile(pIter->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
×
838
  if (pIter->file == NULL) {
×
839
    code = terrno;
×
840
    mError("failed to open %s since %s", pIter->name, tstrerror(code));
×
841
    sdbCloseIter(pIter);
×
842
    TAOS_RETURN(code);
×
843
  }
844

845
  *ppIter = pIter;
×
846
  mInfo("sdbiter:%p, is created to write snapshot, file:%s", pIter, pIter->name);
×
847
  return 0;
×
848
}
849

850
int32_t sdbStopWrite(SSdb *pSdb, SSdbIter *pIter, bool isApply, int64_t index, int64_t term, int64_t config) {
×
851
  int32_t code = -1;
×
852

853
  if (!isApply) {
×
854
    mInfo("sdbiter:%p, not apply to sdb", pIter);
×
855
    code = 0;
×
856
    goto _OVER;
×
857
  }
858

859
  if (taosFsyncFile(pIter->file) != 0) {
×
860
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
861
    mError("sdbiter:%p, failed to fasync file %s since %s", pIter, pIter->name, tstrerror(code));
×
862
    goto _OVER;
×
863
  }
864

865
  if (taosCloseFile(&pIter->file) != 0) {
×
866
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
867
    goto _OVER;
×
868
  }
869
  pIter->file = NULL;
×
870

871
  char datafile[PATH_MAX] = {0};
×
872
  snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
×
873
  code = taosRenameFile(pIter->name, datafile);
×
874
  if (code != 0) {
×
875
    mError("sdbiter:%p, failed to rename file %s to %s since %s", pIter, pIter->name, datafile, tstrerror(code));
×
876
    goto _OVER;
×
877
  }
878

879
  code = sdbReadFile(pSdb);
×
880
  if (code != 0) {
×
881
    mError("sdbiter:%p, failed to read from %s since %s", pIter, datafile, tstrerror(code));
×
882
    goto _OVER;
×
883
  }
884

885
  if (config > 0) {
×
886
    pSdb->commitConfig = config;
×
887
  }
888
  if (term > 0) {
×
889
    pSdb->commitTerm = term;
×
890
  }
891
  if (index > 0) {
×
892
    pSdb->commitIndex = index;
×
893
  }
894

895
  mInfo("sdbiter:%p, success applyed to sdb", pIter);
×
896
  code = 0;
×
897

898
_OVER:
×
899
  sdbCloseIter(pIter);
×
900
  return code;
×
901
}
902

903
int32_t sdbDoWrite(SSdb *pSdb, SSdbIter *pIter, void *pBuf, int32_t len) {
×
904
  int32_t code = 0;
×
905
  int32_t writelen = taosWriteFile(pIter->file, pBuf, len);
×
906
  if (writelen != len) {
×
907
    code = terrno;
×
908
    mError("failed to write len:%d since %s, total:%" PRId64, len, tstrerror(code), pIter->total);
×
909
    TAOS_RETURN(code);
×
910
  }
911

912
  pIter->total += writelen;
×
913
  mInfo("sdbiter:%p, write:%d bytes to snapshot, total:%" PRId64, pIter, writelen, pIter->total);
×
914
  return 0;
×
915
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc