• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4308

14 Jun 2025 02:06PM UTC coverage: 62.454% (-0.3%) from 62.777%
#4308

push

travis-ci

web-flow
fix: taosdump windows pthread_mutex_unlock crash(3.0) (#31357)

* fix: windows pthread_mutex_unlock crash

* enh: sync from main fix taosdump crash windows

* fix: restore .github action branch to main

153985 of 315105 branches covered (48.87%)

Branch coverage included in aggregate %.

238120 of 312727 relevant lines covered (76.14%)

6462519.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.87
/source/libs/wal/src/walWrite.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "crypt.h"
17
#include "os.h"
18
#include "taoserror.h"
19
#include "tchecksum.h"
20
#include "tglobal.h"
21
#include "walInt.h"
22

23
int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
93✔
24
  int32_t code = 0;
93✔
25

26
  TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex));
93✔
27

28
  wInfo("vgId:%d, restore from snapshot, index:%" PRId64, pWal->cfg.vgId, ver);
93!
29

30
  void *pIter = NULL;
93✔
31
  while (1) {
×
32
    pIter = taosHashIterate(pWal->pRefHash, pIter);
93✔
33
    if (pIter == NULL) break;
93✔
34
    SWalRef *pRef = *(SWalRef **)pIter;
1✔
35
    if (pRef->refVer != -1 && pRef->refVer <= ver) {
1!
36
      taosHashCancelIterate(pWal->pRefHash, pIter);
1✔
37
      TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
1✔
38

39
      TAOS_RETURN(TSDB_CODE_FAILED);
1✔
40
    }
41
  }
42

43
  TAOS_UNUSED(taosCloseFile(&pWal->pLogFile));
92✔
44
  TAOS_UNUSED(taosCloseFile(&pWal->pIdxFile));
92✔
45

46
  if (pWal->vers.firstVer != -1) {
92✔
47
    int32_t fileSetSize = taosArrayGetSize(pWal->fileInfoSet);
1✔
48
    for (int32_t i = 0; i < fileSetSize; i++) {
2✔
49
      SWalFileInfo *pFileInfo = taosArrayGet(pWal->fileInfoSet, i);
1✔
50
      char          fnameStr[WAL_FILE_LEN];
51
      walBuildLogName(pWal, pFileInfo->firstVer, fnameStr);
1✔
52
      if (taosRemoveFile(fnameStr) < 0) {
1!
53
        wError("vgId:%d, restore from snapshot, cannot remove file %s since %s", pWal->cfg.vgId, fnameStr, terrstr());
×
54
        TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
×
55

56
        TAOS_RETURN(terrno);
×
57
      }
58
      wInfo("vgId:%d, restore from snapshot, remove file %s", pWal->cfg.vgId, fnameStr);
1!
59

60
      walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr);
1✔
61
      if (taosRemoveFile(fnameStr) < 0) {
1!
62
        wError("vgId:%d, cannot remove file %s since %s", pWal->cfg.vgId, fnameStr, terrstr());
×
63
        TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
×
64

65
        TAOS_RETURN(terrno);
×
66
      }
67
      wInfo("vgId:%d, restore from snapshot, remove file %s", pWal->cfg.vgId, fnameStr);
1!
68
    }
69
  }
70

71
  TAOS_CHECK_RETURN(walRemoveMeta(pWal));
92!
72

73
  pWal->writeCur = -1;
92✔
74
  pWal->totSize = 0;
92✔
75
  pWal->lastRollSeq = -1;
92✔
76

77
  taosArrayClear(pWal->fileInfoSet);
92✔
78
  pWal->vers.firstVer = ver + 1;
92✔
79
  pWal->vers.lastVer = ver;
92✔
80
  pWal->vers.commitVer = ver;
92✔
81
  pWal->vers.snapshotVer = ver;
92✔
82
  pWal->vers.verInSnapshotting = -1;
92✔
83

84
  TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
92✔
85

86
  TAOS_RETURN(TSDB_CODE_SUCCESS);
92✔
87
}
88

89
void walApplyVer(SWal *pWal, int64_t ver) {
3,115,639✔
90
  // TODO: error check
91
  pWal->vers.appliedVer = ver;
3,115,639✔
92
}
3,115,639✔
93

94
int32_t walCommit(SWal *pWal, int64_t ver) {
2,902,832✔
95
  if (ver < pWal->vers.commitVer) TAOS_RETURN(TSDB_CODE_SUCCESS);
2,902,832!
96
  if (ver > pWal->vers.lastVer || pWal->vers.commitVer < pWal->vers.snapshotVer) TAOS_RETURN(TSDB_CODE_WAL_INVALID_VER);
2,902,832!
97

98
  pWal->vers.commitVer = ver;
2,903,124✔
99

100
  TAOS_RETURN(TSDB_CODE_SUCCESS);
2,903,124✔
101
}
102

103
int64_t walChangeWrite(SWal *pWal, int64_t ver) {
1✔
104
  int       code;
105
  TdFilePtr pIdxTFile, pLogTFile;
106
  char      fnameStr[WAL_FILE_LEN];
107
  if (pWal->pLogFile != NULL) {
1!
108
    if (pWal->cfg.level != TAOS_WAL_SKIP && (code = taosFsyncFile(pWal->pLogFile)) != 0) {
1!
109
      return -1;
×
110
    }
111
    code = taosCloseFile(&pWal->pLogFile);
1✔
112
    if (code != 0) {
1!
113
      return -1;
×
114
    }
115
  }
116
  if (pWal->pIdxFile != NULL) {
1!
117
    if (pWal->cfg.level != TAOS_WAL_SKIP && (code = taosFsyncFile(pWal->pIdxFile)) != 0) {
1!
118
      return -1;
×
119
    }
120
    code = taosCloseFile(&pWal->pIdxFile);
1✔
121
    if (code != 0) {
1!
122
      return -1;
×
123
    }
124
  }
125

126
  SWalFileInfo tmpInfo;
127
  tmpInfo.firstVer = ver;
1✔
128
  // bsearch in fileSet
129
  int32_t idx = taosArraySearchIdx(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
1✔
130
  /*A(idx != -1);*/
131
  SWalFileInfo *pFileInfo = taosArrayGet(pWal->fileInfoSet, idx);
1✔
132

133
  int64_t fileFirstVer = pFileInfo->firstVer;
1✔
134
  walBuildIdxName(pWal, fileFirstVer, fnameStr);
1✔
135
  pIdxTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
1✔
136
  if (pIdxTFile == NULL) {
1!
137
    pWal->pIdxFile = NULL;
×
138

139
    return -1;
×
140
  }
141
  walBuildLogName(pWal, fileFirstVer, fnameStr);
1✔
142
  pLogTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
1✔
143
  if (pLogTFile == NULL) {
1!
144
    TAOS_UNUSED(taosCloseFile(&pIdxTFile));
×
145
    pWal->pLogFile = NULL;
×
146

147
    return -1;
×
148
  }
149

150
  pWal->pLogFile = pLogTFile;
1✔
151
  pWal->pIdxFile = pIdxTFile;
1✔
152
  pWal->writeCur = idx;
1✔
153

154
  return fileFirstVer;
1✔
155
}
156

157
int32_t walRollback(SWal *pWal, int64_t ver) {
12✔
158
  TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex));
12✔
159
  wInfo("vgId:%d, wal rollback for index:%" PRId64, pWal->cfg.vgId, ver);
12!
160
  int32_t   code = 0;
12✔
161
  int32_t   lino = 0;
12✔
162
  int64_t   ret;
163
  char      fnameStr[WAL_FILE_LEN];
164
  TdFilePtr pIdxFile = NULL, pLogFile = NULL;
12✔
165
  if (ver > pWal->vers.lastVer || ver <= pWal->vers.commitVer || ver <= pWal->vers.snapshotVer) {
12✔
166
    code = TSDB_CODE_WAL_INVALID_VER;
4✔
167
    goto _exit;
4✔
168
  }
169

170
  // find correct file
171
  if (ver < walGetLastFileFirstVer(pWal)) {
8!
172
    // change current files
173
    TAOS_CHECK_EXIT_SET_CODE(walChangeWrite(pWal, ver), code, terrno);
×
174

175
    // delete files in descending order
176
    int fileSetSize = taosArrayGetSize(pWal->fileInfoSet);
×
177
    for (int i = pWal->writeCur + 1; i < fileSetSize; i++) {
×
178
      SWalFileInfo *pInfo = taosArrayPop(pWal->fileInfoSet);
×
179

180
      walBuildLogName(pWal, pInfo->firstVer, fnameStr);
×
181
      wDebug("vgId:%d, wal remove file %s for rollback", pWal->cfg.vgId, fnameStr);
×
182
      if (taosRemoveFile(fnameStr) != 0) {
×
183
        wWarn("vgId:%d, failed to remove file %s for rollback since %s", pWal->cfg.vgId, fnameStr, terrstr());
×
184
      }
185
      walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
×
186
      wDebug("vgId:%d, wal remove file %s for rollback", pWal->cfg.vgId, fnameStr);
×
187
      if (taosRemoveFile(fnameStr) != 0) {
×
188
        wWarn("vgId:%d, failed to remove file %s for rollback since %s", pWal->cfg.vgId, fnameStr, terrstr());
×
189
      }
190
    }
191
  }
192

193
  walBuildIdxName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
8✔
194
  pIdxFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
8✔
195
  TSDB_CHECK_NULL(pIdxFile, code, lino, _exit, terrno);
8!
196
  int64_t idxOff = walGetVerIdxOffset(pWal, ver);
8✔
197
  ret = taosLSeekFile(pIdxFile, idxOff, SEEK_SET);
8✔
198
  if (ret < 0) {
8!
199
    code = terrno;
×
200
    goto _exit;
×
201
  }
202
  // read idx file and get log file pos
203
  SWalIdxEntry entry;
204
  if (taosReadFile(pIdxFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
8!
205
    code = terrno;
×
206
    goto _exit;
×
207
  }
208

209
  walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
8✔
210
  pLogFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
8✔
211
  wDebug("vgId:%d, wal truncate file %s", pWal->cfg.vgId, fnameStr);
8!
212
  TSDB_CHECK_NULL(pLogFile, code, lino, _exit, terrno);
8!
213

214
  ret = taosLSeekFile(pLogFile, entry.offset, SEEK_SET);
8✔
215
  if (ret < 0) {
8!
216
    // TODO
217
    code = terrno;
×
218
    goto _exit;
×
219
  }
220
  // validate offset
221
  SWalCkHead head;
222
  int64_t    size = taosReadFile(pLogFile, &head, sizeof(SWalCkHead));
8✔
223
  if (size != sizeof(SWalCkHead)) {
8!
224
    code = terrno;
×
225
    goto _exit;
×
226
  }
227
  code = walValidHeadCksum(&head);
8✔
228

229
  if (code != 0 || head.head.version != ver) {
8!
230
    code = TSDB_CODE_WAL_FILE_CORRUPTED;
×
231
    goto _exit;
×
232
  }
233

234
  // truncate old files
235
  if ((code = taosFtruncateFile(pLogFile, entry.offset)) < 0) goto _exit;
8!
236

237
  if ((code = taosFtruncateFile(pIdxFile, idxOff)) < 0) goto _exit;
8!
238

239
  pWal->vers.lastVer = ver - 1;
8✔
240
  ((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->lastVer = ver - 1;
8✔
241
  ((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->fileSize = entry.offset;
8✔
242

243
  TAOS_CHECK_EXIT(walSaveMeta(pWal));
8!
244

245
_exit:
8✔
246
  if (code != 0) {
12✔
247
    wError("vgId:%d, %s failed at line %d since %s", pWal->cfg.vgId, __func__, lino, tstrerror(code));
4!
248
  }
249
  TAOS_UNUSED(taosCloseFile(&pIdxFile));
12✔
250
  TAOS_UNUSED(taosCloseFile(&pLogFile));
12✔
251
  TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
12✔
252

253
  TAOS_RETURN(code);
12✔
254
}
255

256
int32_t walRollImpl(SWal *pWal) {
41,641✔
257
  int32_t code = 0, lino = 0;
41,641✔
258

259
  if (pWal->cfg.level == TAOS_WAL_SKIP && pWal->pIdxFile != NULL && pWal->pLogFile != NULL) {
41,641!
260
    TAOS_RETURN(TSDB_CODE_SUCCESS);
×
261
  }
262
  if (pWal->pIdxFile != NULL) {
41,641✔
263
    if ((code = taosFsyncFile(pWal->pIdxFile)) != 0) {
28,909!
264
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
265
    }
266
    code = taosCloseFile(&pWal->pIdxFile);
28,909✔
267
    if (code != 0) {
28,909!
268
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
269
    }
270
  }
271

272
  if (pWal->pLogFile != NULL) {
41,641✔
273
    if ((code = taosFsyncFile(pWal->pLogFile)) != 0) {
28,909!
274
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
275
    }
276
    code = taosCloseFile(&pWal->pLogFile);
28,909✔
277
    if (code != 0) {
28,909!
278
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
279
    }
280
  }
281

282
  TdFilePtr pIdxFile, pLogFile;
283
  // create new file
284
  int64_t newFileFirstVer = pWal->vers.lastVer + 1;
41,641✔
285
  char    fnameStr[WAL_FILE_LEN];
286
  walBuildIdxName(pWal, newFileFirstVer, fnameStr);
41,641✔
287
  pIdxFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_READ | TD_FILE_WRITE | TD_FILE_APPEND);
41,640✔
288
  TSDB_CHECK_NULL(pIdxFile, code, lino, _exit, terrno);
41,639!
289

290
  walBuildLogName(pWal, newFileFirstVer, fnameStr);
41,639✔
291
  pLogFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_READ | TD_FILE_WRITE | TD_FILE_APPEND);
41,641✔
292
  wDebug("vgId:%d, wal create new file for write:%s", pWal->cfg.vgId, fnameStr);
41,640✔
293
  TSDB_CHECK_NULL(pLogFile, code, lino, _exit, terrno);
41,641!
294

295
  TAOS_CHECK_GOTO(walRollFileInfo(pWal), &lino, _exit);
41,641!
296

297
  // switch file
298
  pWal->pIdxFile = pIdxFile;
41,639✔
299
  pWal->pLogFile = pLogFile;
41,639✔
300
  pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
41,639✔
301

302
  pWal->lastRollSeq = walGetSeq();
41,640✔
303

304
  TAOS_CHECK_GOTO(walSaveMeta(pWal), &lino, _exit);
41,640!
305

306
_exit:
41,641✔
307
  if (code) {
41,641!
308
    wError("vgId:%d, %s failed at line %d since %s", pWal->cfg.vgId, __func__, lino, tstrerror(code));
×
309
  }
310

311
  TAOS_RETURN(code);
41,641✔
312
}
313

314
static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) {
315
  if (taosArrayGetSize(pWal->fileInfoSet) == 0) {
2,869,820✔
316
    TAOS_CHECK_RETURN(walRollImpl(pWal));
12,721!
317

318
    TAOS_RETURN(TSDB_CODE_SUCCESS);
12,721✔
319
  }
320

321
  int64_t passed = walGetSeq() - pWal->lastRollSeq;
2,857,420✔
322
  if (pWal->cfg.rollPeriod != -1 && pWal->cfg.rollPeriod != 0 && passed > pWal->cfg.rollPeriod) {
2,857,245!
323
    TAOS_CHECK_RETURN(walRollImpl(pWal));
×
324
  } else if (pWal->cfg.segSize != -1 && pWal->cfg.segSize != 0 && walGetLastFileSize(pWal) > pWal->cfg.segSize) {
2,857,245!
325
    TAOS_CHECK_RETURN(walRollImpl(pWal));
×
326
  }
327

328
  if (walGetLastFileCachedSize(pWal) > tsWalFsyncDataSizeLimit) {
2,857,245!
329
    TAOS_CHECK_RETURN(walSaveMeta(pWal));
×
330
  }
331

332
  TAOS_RETURN(TSDB_CODE_SUCCESS);
2,857,449✔
333
}
334

335
int32_t walBeginSnapshot(SWal *pWal, int64_t ver, int64_t logRetention) {
28,984✔
336
  int32_t code = 0;
28,984✔
337
  int32_t lino = 0;
28,984✔
338

339
  if (pWal->cfg.level == TAOS_WAL_SKIP) {
28,984✔
340
    TAOS_RETURN(TSDB_CODE_SUCCESS);
4✔
341
  }
342

343
  if (logRetention < 0) {
28,980!
344
    TAOS_RETURN(TSDB_CODE_FAILED);
×
345
  }
346

347
  TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex));
28,980✔
348
  pWal->vers.verInSnapshotting = ver;
28,980✔
349
  pWal->vers.logRetention = logRetention;
28,980✔
350

351
  wDebug("vgId:%d, wal begin snapshot for index:%" PRId64 ", log retention:%" PRId64 " first index:%" PRId64
28,980✔
352
         ", last index:%" PRId64,
353
         pWal->cfg.vgId, ver, pWal->vers.logRetention, pWal->vers.firstVer, pWal->vers.lastVer);
354
  // check file rolling
355
  if (walGetLastFileSize(pWal) != 0 && (code = walRollImpl(pWal)) < 0) goto _exit;
28,980!
356

357
_exit:
28,980✔
358
  if (code) {
28,980!
359
    wError("vgId:%d, %s failed since %s at line %d", pWal->cfg.vgId, __func__, tstrerror(code), lino);
×
360
  }
361
  TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
28,980✔
362
  TAOS_RETURN(code);
28,980✔
363
}
364

365
int32_t walEndSnapshot(SWal *pWal) {
28,984✔
366
  int32_t code = 0, lino = 0;
28,984✔
367

368
  if (pWal->cfg.level == TAOS_WAL_SKIP) {
28,984✔
369
    TAOS_RETURN(TSDB_CODE_SUCCESS);
4✔
370
  }
371

372
  TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex));
28,980✔
373
  int64_t ver = pWal->vers.verInSnapshotting;
28,980✔
374

375
  wDebug("vgId:%d, wal end snapshot for index:%" PRId64 ", log retention:%" PRId64 " first index:%" PRId64
28,980✔
376
         ", last index:%" PRId64,
377
         pWal->cfg.vgId, ver, pWal->vers.logRetention, pWal->vers.firstVer, pWal->vers.lastVer);
378

379
  if (ver == -1) {
28,980!
380
    TAOS_CHECK_GOTO(TSDB_CODE_FAILED, &lino, _exit);
×
381
  }
382

383
  pWal->vers.snapshotVer = ver;
28,980✔
384
  int ts = taosGetTimestampSec();
28,980✔
385
  ver = TMAX(ver - pWal->vers.logRetention, pWal->vers.firstVer - 1);
28,980✔
386

387
  // compatible mode for refVer
388
  bool    hasTopic = false;
28,980✔
389
  int64_t refVer = INT64_MAX;
28,980✔
390
  void   *pIter = NULL;
28,980✔
391
  while (1) {
983✔
392
    pIter = taosHashIterate(pWal->pRefHash, pIter);
29,963✔
393
    if (pIter == NULL) break;
29,963✔
394
    SWalRef *pRef = *(SWalRef **)pIter;
983✔
395
    if (pRef->refVer == -1) continue;
983✔
396
    refVer = TMIN(refVer, pRef->refVer - 1);
570✔
397
    hasTopic = true;
570✔
398
  }
399
  if (pWal->cfg.retentionPeriod == 0 && hasTopic) {
28,980!
400
    wInfo("vgId:%d, wal found ref index:%" PRId64 " in compatible mode, index:%" PRId64, pWal->cfg.vgId, refVer, ver);
×
401
    ver = TMIN(ver, refVer);
×
402
  }
403

404
  // find files safe to delete
405
  int          deleteCnt = 0;
28,980✔
406
  int64_t      newTotSize = pWal->totSize;
28,980✔
407
  SWalFileInfo tmp = {0};
28,980✔
408
  tmp.firstVer = ver;
28,980✔
409
  SWalFileInfo *pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE);
28,980✔
410

411
  if (pInfo) {
28,980✔
412
    wDebug("vgId:%d, wal search found file info, index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64,
24,036✔
413
           pWal->cfg.vgId, ver, pInfo->firstVer, pInfo->lastVer);
414
    if (ver > pInfo->lastVer) {
24,036!
415
      TAOS_CHECK_GOTO(TSDB_CODE_FAILED, &lino, _exit);
×
416
    }
417

418
    if (ver == pInfo->lastVer) {
24,036✔
419
      pInfo++;
23,326✔
420
    }
421

422
    // iterate files, until the searched result
423
    // delete according to file size or close time
424
    SWalFileInfo *pUntil = NULL;
24,036✔
425
    for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
5,560,756✔
426
      if ((pWal->cfg.retentionSize > 0 && newTotSize > pWal->cfg.retentionSize) ||
5,536,720✔
427
          (pWal->cfg.retentionPeriod == 0 ||
5,536,345✔
428
           pWal->cfg.retentionPeriod > 0 && iter->closeTs >= 0 && iter->closeTs + pWal->cfg.retentionPeriod < ts)) {
5,536,176✔
429
        newTotSize -= iter->fileSize;
585✔
430
        pUntil = iter;
585✔
431
      }
432
    }
433
    for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter <= pUntil; iter++) {
24,621✔
434
      deleteCnt++;
585✔
435
      if (taosArrayPush(pWal->toDeleteFiles, iter) == NULL) {
1,170!
436
        wError("vgId:%d, failed to push file info to delete list", pWal->cfg.vgId);
×
437
      }
438
    }
439

440
    // make new array, remove files
441
    taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt);
24,036✔
442
    if (taosArrayGetSize(pWal->fileInfoSet) == 0) {
24,036!
443
      pWal->writeCur = -1;
×
444
      pWal->vers.firstVer = pWal->vers.lastVer + 1;
×
445
    } else {
446
      pWal->vers.firstVer = ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
24,036✔
447
    }
448
  }
449

450
  // update meta
451
  pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
28,980✔
452
  pWal->totSize = newTotSize;
28,980✔
453
  pWal->vers.verInSnapshotting = -1;
28,980✔
454

455
  TAOS_CHECK_GOTO(walSaveMeta(pWal), &lino, _exit);
28,980!
456

457
  // delete files
458
  deleteCnt = taosArrayGetSize(pWal->toDeleteFiles);
28,980✔
459
  char fnameStr[WAL_FILE_LEN] = {0};
28,980✔
460
  pInfo = NULL;
28,980✔
461

462
  for (int i = 0; i < deleteCnt; i++) {
29,565✔
463
    pInfo = taosArrayGet(pWal->toDeleteFiles, i);
585✔
464

465
    walBuildLogName(pWal, pInfo->firstVer, fnameStr);
585✔
466
    if (taosRemoveFile(fnameStr) < 0 && ERRNO != ENOENT) {
585!
467
      wError("vgId:%d, failed to remove log file %s since %s", pWal->cfg.vgId, fnameStr, strerror(ERRNO));
×
468
      goto _exit;
×
469
    }
470
    walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
585✔
471
    if (taosRemoveFile(fnameStr) < 0 && ERRNO != ENOENT) {
585!
472
      wError("vgId:%d, failed to remove idx file %s since %s", pWal->cfg.vgId, fnameStr, strerror(ERRNO));
×
473
      goto _exit;
×
474
    }
475
  }
476
  if (pInfo != NULL) {
28,980✔
477
    wInfo("vgId:%d, wal log files recycled, count:%d, until index:%" PRId64 ", closeTs:%" PRId64, pWal->cfg.vgId,
551✔
478
          deleteCnt, pInfo->lastVer, pInfo->closeTs);
479
  }
480
  taosArrayClear(pWal->toDeleteFiles);
28,980✔
481

482
_exit:
28,980✔
483
  taosThreadRwlockUnlock(&pWal->mutex);
28,980✔
484

485
  if (code) {
28,979!
486
    wError("vgId:%d, %s failed at line %d since %s", pWal->cfg.vgId, __func__, lino, tstrerror(code));
×
487
  }
488

489
  return code;
28,979✔
490
}
491

492
static void walStopDnode(SWal *pWal) {
×
493
  if (pWal->stopDnode != NULL) {
×
494
    wWarn("vgId:%d, set stop dnode flag", pWal->cfg.vgId);
×
495
    pWal->stopDnode();
×
496
  }
497
}
×
498

499
static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset, const STraceId *trace) {
2,869,547✔
500
  int32_t code = 0;
2,869,547✔
501

502
  SWalIdxEntry  entry = {.ver = ver, .offset = offset};
2,869,547✔
503
  SWalFileInfo *pFileInfo = walGetCurFileInfo(pWal);
2,869,547✔
504

505
  int64_t idxOffset = (entry.ver - pFileInfo->firstVer) * sizeof(SWalIdxEntry);
2,869,714✔
506
  wGTrace(trace, "vgId:%d, index:%" PRId64 ", write log entry at %" PRId64 ", offset:%" PRId64, pWal->cfg.vgId, ver,
2,869,714!
507
          idxOffset, offset);
508

509
  int64_t size = taosWriteFile(pWal->pIdxFile, &entry, sizeof(SWalIdxEntry));
2,869,714✔
510
  if (size != sizeof(SWalIdxEntry)) {
2,869,321!
511
    wGError(trace, "vgId:%d, index:%" PRId64 ", failed to write entry since %s", pWal->cfg.vgId, ver, strerror(ERRNO));
×
512
    walStopDnode(pWal);
×
513
    TAOS_RETURN(terrno);
×
514
  }
515

516
  // check alignment of idx entries
517
  int64_t endOffset = taosLSeekFile(pWal->pIdxFile, 0, SEEK_END);
2,869,321✔
518
  if (endOffset < 0) {
2,869,484!
519
    wGFatal(trace, "vgId:%d, index:%" PRId64 ", failed to seek end of WAL idxfile since %s, endOffset:%" PRId64,
×
520
            pWal->cfg.vgId, ver, tstrerror(terrno), endOffset);
521
    taosMsleep(100);
×
522
    exit(EXIT_FAILURE);
×
523
  }
524

525
  TAOS_RETURN(TSDB_CODE_SUCCESS);
2,869,484✔
526
}
527

528
static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta,
529
                                         const void *body, int32_t bodyLen, const STraceId *trace) {
530
  int32_t code = 0, lino = 0;
2,870,028✔
531
  int32_t plainBodyLen = bodyLen;
2,870,028✔
532

533
  int64_t       offset = walGetCurFileOffset(pWal);
2,870,028✔
534
  SWalFileInfo *pFileInfo = walGetCurFileInfo(pWal);
2,869,643✔
535

536
  pWal->writeHead.head.version = index;
2,870,016✔
537
  pWal->writeHead.head.bodyLen = plainBodyLen;
2,870,016✔
538
  pWal->writeHead.head.msgType = msgType;
2,870,016✔
539
  pWal->writeHead.head.ingestTs = taosGetTimestampUs();
2,870,021✔
540

541
  // sync info for sync module
542
  pWal->writeHead.head.syncMeta = syncMeta;
2,870,021✔
543

544
  pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead);
2,870,021✔
545
  pWal->writeHead.cksumBody = walCalcBodyCksum(body, plainBodyLen);
2,870,132✔
546
  wGDebug(trace, "vgId:%d, index:%" PRId64 ", write log, type:%s, cksum head:%u, cksum body:%u", pWal->cfg.vgId, index,
2,870,124!
547
          TMSG_INFO(msgType), pWal->writeHead.cksumHead, pWal->writeHead.cksumBody);
548

549
  if (pWal->cfg.level != TAOS_WAL_SKIP) {
2,869,346!
550
    TAOS_CHECK_GOTO(walWriteIndex(pWal, index, offset, trace), &lino, _exit);
2,869,715!
551
  }
552

553
  if (pWal->cfg.level != TAOS_WAL_SKIP &&
5,738,696!
554
      taosWriteFile(pWal->pLogFile, &pWal->writeHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) {
2,869,511✔
555
    code = terrno;
×
556
    wGError(trace, "vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId,
×
557
            walGetLastFileFirstVer(pWal), strerror(ERRNO));
558
    walStopDnode(pWal);
×
559
    TAOS_CHECK_GOTO(code, &lino, _exit);
×
560
  }
561

562
  int32_t cyptedBodyLen = plainBodyLen;
2,869,185✔
563
  char   *buf = (char *)body;
2,869,185✔
564
  char   *newBody = NULL;
2,869,185✔
565
  char   *newBodyEncrypted = NULL;
2,869,185✔
566

567
  if (pWal->cfg.encryptAlgorithm == DND_CA_SM4) {
2,869,185✔
568
    cyptedBodyLen = ENCRYPTED_LEN(cyptedBodyLen);
100!
569

570
    newBody = taosMemoryMalloc(cyptedBodyLen);
100!
571
    TSDB_CHECK_NULL(newBody, code, lino, _exit, terrno);
100!
572

573
    (void)memset(newBody, 0, cyptedBodyLen);
100✔
574
    (void)memcpy(newBody, body, plainBodyLen);
100✔
575

576
    newBodyEncrypted = taosMemoryMalloc(cyptedBodyLen);
100!
577
    if (newBodyEncrypted == NULL) {
100!
578
      wGError(trace, "vgId:%d, file:%" PRId64 ".log, failed to malloc since %s", pWal->cfg.vgId,
×
579
              walGetLastFileFirstVer(pWal), strerror(ERRNO));
580

581
      if (newBody != NULL) taosMemoryFreeClear(newBody);
×
582

583
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
584
    }
585

586
    SCryptOpts opts;
587
    opts.len = cyptedBodyLen;
100✔
588
    opts.source = newBody;
100✔
589
    opts.result = newBodyEncrypted;
100✔
590
    opts.unitLen = 16;
100✔
591
    tstrncpy((char *)opts.key, pWal->cfg.encryptKey, ENCRYPT_KEY_LEN + 1);
100✔
592

593
    int32_t count = CBC_Encrypt(&opts);
100✔
594

595
    // wDebug("vgId:%d, file:%" PRId64 ".log, index:%" PRId64 ", CBC_Encrypt cryptedBodyLen:%d, plainBodyLen:%d, %s",
596
    //       pWal->cfg.vgId, walGetLastFileFirstVer(pWal), index, count, plainBodyLen, __FUNCTION__);
597

598
    buf = newBodyEncrypted;
100✔
599
  }
600

601
  if (pWal->cfg.level != TAOS_WAL_SKIP && taosWriteFile(pWal->pLogFile, (char *)buf, cyptedBodyLen) != cyptedBodyLen) {
2,869,185!
602
    code = terrno;
×
603
    wGError(trace, "vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId,
×
604
            walGetLastFileFirstVer(pWal), strerror(ERRNO));
605

606
    if (pWal->cfg.encryptAlgorithm == DND_CA_SM4) {
×
607
      taosMemoryFreeClear(newBody);
×
608
      taosMemoryFreeClear(newBodyEncrypted);
×
609
    }
610

611
    walStopDnode(pWal);
×
612

613
    TAOS_CHECK_GOTO(code, &lino, _exit);
×
614
  }
615

616
  if (pWal->cfg.encryptAlgorithm == DND_CA_SM4) {
2,869,794✔
617
    taosMemoryFreeClear(newBody);
100!
618
    taosMemoryFreeClear(newBodyEncrypted);
100!
619
  }
620

621
  // set status
622
  if (pWal->vers.firstVer == -1) {
2,869,794✔
623
    pWal->vers.firstVer = index;
12,638✔
624
  }
625
  pWal->vers.lastVer = index;
2,869,794✔
626
  pWal->totSize += sizeof(SWalCkHead) + cyptedBodyLen;
2,869,794✔
627
  pFileInfo->lastVer = index;
2,869,794✔
628
  pFileInfo->fileSize += sizeof(SWalCkHead) + cyptedBodyLen;
2,869,794✔
629

630
  return 0;
2,869,794✔
631

632
_exit:
×
633
  if (code) {
×
634
    wGError(trace, "vgId:%d, %s failed at line %d since %s", pWal->cfg.vgId, __func__, lino, tstrerror(code));
×
635
  }
636

637
  // recover in a reverse order
638
  if (taosFtruncateFile(pWal->pLogFile, offset) < 0) {
×
639
    wGFatal(trace, "vgId:%d, failed to recover WAL logfile from write error since %s, offset:%" PRId64, pWal->cfg.vgId,
×
640
            terrstr(), offset);
641
    taosMsleep(100);
×
642
    exit(EXIT_FAILURE);
×
643
  }
644

645
  int64_t idxOffset = (index - pFileInfo->firstVer) * sizeof(SWalIdxEntry);
×
646
  if (taosFtruncateFile(pWal->pIdxFile, idxOffset) < 0) {
×
647
    wGFatal(trace, "vgId:%d, failed to recover WAL idxfile from write error since %s, offset:%" PRId64, pWal->cfg.vgId,
×
648
            terrstr(), idxOffset);
649
    taosMsleep(100);
×
650
    exit(EXIT_FAILURE);
×
651
  }
652

653
  TAOS_RETURN(TSDB_CODE_FAILED);
×
654
}
655

656
static int32_t walInitWriteFile(SWal *pWal) {
3,243✔
657
  TdFilePtr     pIdxTFile, pLogTFile;
658
  int64_t       fileFirstVer = -1;
3,243✔
659
  int32_t       code = 0;
3,243✔
660
  SWalFileInfo *pRet = taosArrayGetLast(pWal->fileInfoSet);
3,243✔
661
  if (pRet == NULL) {
3,243!
662
    fileFirstVer = pWal->vers.lastVer + 1;
×
663
  }
664
  fileFirstVer = pRet->firstVer;
3,243✔
665

666
  char fnameStr[WAL_FILE_LEN];
667
  walBuildIdxName(pWal, fileFirstVer, fnameStr);
3,243✔
668
  pIdxTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_READ | TD_FILE_WRITE | TD_FILE_APPEND);
3,243✔
669
  if (pIdxTFile == NULL) {
3,242!
670
    TAOS_RETURN(terrno);
×
671
  }
672
  walBuildLogName(pWal, fileFirstVer, fnameStr);
3,242✔
673
  pLogTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_READ |TD_FILE_WRITE | TD_FILE_APPEND);
3,242✔
674
  if (pLogTFile == NULL) {
3,243!
675
    TAOS_UNUSED(taosCloseFile(&pIdxTFile));
×
676
    TAOS_RETURN(terrno);
×
677
  }
678
  // switch file
679
  pWal->pIdxFile = pIdxTFile;
3,243✔
680
  pWal->pLogFile = pLogTFile;
3,243✔
681
  if (taosArrayGetSize(pWal->fileInfoSet) == 0) {
3,243!
682
    code = walRollFileInfo(pWal);
×
683
    if (code < 0) {
×
684
      wError("vgId:%d, failed to roll file info while init write file since %s", pWal->cfg.vgId, terrstr());
×
685
      TAOS_RETURN(code);
×
686
    }
687
  }
688
  pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
3,243✔
689

690
  TAOS_RETURN(TSDB_CODE_SUCCESS);
3,243✔
691
}
692

693
int32_t walAppendLog(SWal *pWal, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body,
2,869,638✔
694
                     int32_t bodyLen, const STraceId *trace) {
695
  int32_t code = 0, lino = 0;
2,869,638✔
696

697
  TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex));
2,869,638✔
698

699
  if (index != pWal->vers.lastVer + 1) {
2,869,842✔
700
    TAOS_CHECK_GOTO(TSDB_CODE_WAL_INVALID_VER, &lino, _exit);
22!
701
  }
702

703
  TAOS_CHECK_GOTO(walCheckAndRoll(pWal), &lino, _exit);
2,869,840!
704

705
  if (pWal->pLogFile == NULL || pWal->pIdxFile == NULL || pWal->writeCur < 0) {
2,869,840!
706
    TAOS_CHECK_GOTO(walInitWriteFile(pWal), &lino, _exit);
3,055!
707
  }
708

709
  TAOS_CHECK_GOTO(walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen, trace), &lino, _exit);
5,739,822!
710

711
_exit:
2,869,794✔
712
  if (code) {
2,869,816✔
713
    wError("vgId:%d, %s failed at line %d since %s", pWal->cfg.vgId, __func__, lino, tstrerror(code));
22!
714
  }
715

716
  TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
2,869,816✔
717
  return code;
2,869,968✔
718
}
719

720
int32_t walFsync(SWal *pWal, bool forceFsync) {
2,867,657✔
721
  int32_t code = 0;
2,867,657✔
722

723
  if (pWal->cfg.level == TAOS_WAL_SKIP) {
2,867,657✔
724
    return code;
41✔
725
  }
726

727
  TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex));
2,867,616✔
728
  if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) {
2,868,380✔
729
    wTrace("vgId:%d, fileId:%" PRId64 ".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));
125,348✔
730
    if (taosFsyncFile(pWal->pLogFile) < 0) {
125,348!
731
      wError("vgId:%d, file:%" PRId64 ".log, fsync failed since %s", pWal->cfg.vgId, walGetCurFileFirstVer(pWal),
×
732
             strerror(ERRNO));
733
      code = terrno;
×
734
    }
735
  }
736
  TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
2,868,394✔
737

738
  return code;
2,868,096✔
739
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc