• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4720

08 Sep 2025 08:43AM UTC coverage: 58.139% (-0.6%) from 58.762%
#4720

push

travis-ci

web-flow
Merge pull request #32881 from taosdata/enh/add-new-windows-ci

fix(ci): update workflow reference to use new Windows CI YAML

133181 of 292179 branches covered (45.58%)

Branch coverage included in aggregate %.

201691 of 283811 relevant lines covered (71.07%)

5442780.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.76
/source/libs/wal/src/walWrite.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "crypt.h"
17
#include "os.h"
18
#include "taoserror.h"
19
#include "tchecksum.h"
20
#include "tglobal.h"
21
#include "walInt.h"
22

23
int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
100✔
24
  int32_t code = 0;
100✔
25

26
  TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex));
100✔
27

28
  wInfo("vgId:%d, restore from snapshot, index:%" PRId64, pWal->cfg.vgId, ver);
100!
29

30
  void *pIter = NULL;
100✔
31
  while (1) {
×
32
    pIter = taosHashIterate(pWal->pRefHash, pIter);
100✔
33
    if (pIter == NULL) break;
100✔
34
    SWalRef *pRef = *(SWalRef **)pIter;
3✔
35
    if (pRef->refVer != -1 && pRef->refVer <= ver) {
3!
36
      taosHashCancelIterate(pWal->pRefHash, pIter);
3✔
37
      TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
3✔
38

39
      TAOS_RETURN(TSDB_CODE_FAILED);
3✔
40
    }
41
  }
42

43
  TAOS_UNUSED(taosCloseFile(&pWal->pLogFile));
97✔
44
  TAOS_UNUSED(taosCloseFile(&pWal->pIdxFile));
97✔
45

46
  if (pWal->vers.firstVer != -1) {
97✔
47
    int32_t fileSetSize = taosArrayGetSize(pWal->fileInfoSet);
4✔
48
    for (int32_t i = 0; i < fileSetSize; i++) {
9✔
49
      SWalFileInfo *pFileInfo = taosArrayGet(pWal->fileInfoSet, i);
5✔
50
      char          fnameStr[WAL_FILE_LEN];
51
      walBuildLogName(pWal, pFileInfo->firstVer, fnameStr);
5✔
52
      if (taosRemoveFile(fnameStr) < 0) {
5!
53
        wError("vgId:%d, restore from snapshot, cannot remove file %s since %s", pWal->cfg.vgId, fnameStr, terrstr());
×
54
        TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
×
55

56
        TAOS_RETURN(terrno);
×
57
      }
58
      wInfo("vgId:%d, restore from snapshot, remove file %s", pWal->cfg.vgId, fnameStr);
5!
59

60
      walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr);
5✔
61
      if (taosRemoveFile(fnameStr) < 0) {
5!
62
        wError("vgId:%d, cannot remove file %s since %s", pWal->cfg.vgId, fnameStr, terrstr());
×
63
        TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
×
64

65
        TAOS_RETURN(terrno);
×
66
      }
67
      wInfo("vgId:%d, restore from snapshot, remove file %s", pWal->cfg.vgId, fnameStr);
5!
68
    }
69
  }
70

71
  TAOS_CHECK_RETURN(walRemoveMeta(pWal));
97!
72

73
  pWal->writeCur = -1;
97✔
74
  pWal->totSize = 0;
97✔
75
  pWal->lastRollSeq = -1;
97✔
76

77
  taosArrayClear(pWal->fileInfoSet);
97✔
78
  pWal->vers.firstVer = ver + 1;
97✔
79
  pWal->vers.lastVer = ver;
97✔
80
  pWal->vers.commitVer = ver;
97✔
81
  pWal->vers.snapshotVer = ver;
97✔
82
  pWal->vers.verInSnapshotting = -1;
97✔
83

84
  TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
97✔
85

86
  TAOS_RETURN(TSDB_CODE_SUCCESS);
97✔
87
}
88

89
void walApplyVer(SWal *pWal, int64_t ver) {
2,225,088✔
90
  // TODO: error check
91
  pWal->vers.appliedVer = ver;
2,225,088✔
92
}
2,225,088✔
93

94
int32_t walCommit(SWal *pWal, int64_t ver) {
2,272,260✔
95
  if (ver < pWal->vers.commitVer) TAOS_RETURN(TSDB_CODE_SUCCESS);
2,272,260✔
96
  if (ver > pWal->vers.lastVer || pWal->vers.commitVer < pWal->vers.snapshotVer) TAOS_RETURN(TSDB_CODE_WAL_INVALID_VER);
2,272,257!
97

98
  pWal->vers.commitVer = ver;
2,272,276✔
99

100
  TAOS_RETURN(TSDB_CODE_SUCCESS);
2,272,276✔
101
}
102

103
int64_t walChangeWrite(SWal *pWal, int64_t ver) {
3✔
104
  int       code;
105
  TdFilePtr pIdxTFile, pLogTFile;
106
  char      fnameStr[WAL_FILE_LEN];
107
  if (pWal->pLogFile != NULL) {
3!
108
    if (pWal->cfg.level != TAOS_WAL_SKIP && (code = taosFsyncFile(pWal->pLogFile)) != 0) {
3!
109
      return -1;
×
110
    }
111
    code = taosCloseFile(&pWal->pLogFile);
3✔
112
    if (code != 0) {
3!
113
      return -1;
×
114
    }
115
  }
116
  if (pWal->pIdxFile != NULL) {
3!
117
    if (pWal->cfg.level != TAOS_WAL_SKIP && (code = taosFsyncFile(pWal->pIdxFile)) != 0) {
3!
118
      return -1;
×
119
    }
120
    code = taosCloseFile(&pWal->pIdxFile);
3✔
121
    if (code != 0) {
3!
122
      return -1;
×
123
    }
124
  }
125

126
  SWalFileInfo tmpInfo;
127
  tmpInfo.firstVer = ver;
3✔
128
  // bsearch in fileSet
129
  int32_t idx = taosArraySearchIdx(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
3✔
130
  /*A(idx != -1);*/
131
  SWalFileInfo *pFileInfo = taosArrayGet(pWal->fileInfoSet, idx);
3✔
132

133
  int64_t fileFirstVer = pFileInfo->firstVer;
3✔
134
  walBuildIdxName(pWal, fileFirstVer, fnameStr);
3✔
135
  pIdxTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
3✔
136
  if (pIdxTFile == NULL) {
3!
137
    pWal->pIdxFile = NULL;
×
138

139
    return -1;
×
140
  }
141
  walBuildLogName(pWal, fileFirstVer, fnameStr);
3✔
142
  pLogTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
3✔
143
  if (pLogTFile == NULL) {
3!
144
    TAOS_UNUSED(taosCloseFile(&pIdxTFile));
×
145
    pWal->pLogFile = NULL;
×
146

147
    return -1;
×
148
  }
149

150
  pWal->pLogFile = pLogTFile;
3✔
151
  pWal->pIdxFile = pIdxTFile;
3✔
152
  pWal->writeCur = idx;
3✔
153

154
  return fileFirstVer;
3✔
155
}
156

157
int32_t walRollback(SWal *pWal, int64_t ver) {
33✔
158
  TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex));
33✔
159
  wInfo("vgId:%d, wal rollback for index:%" PRId64, pWal->cfg.vgId, ver);
33!
160
  int32_t   code = 0;
33✔
161
  int32_t   lino = 0;
33✔
162
  int64_t   ret;
163
  char      fnameStr[WAL_FILE_LEN];
164
  TdFilePtr pIdxFile = NULL, pLogFile = NULL;
33✔
165
  if (ver > pWal->vers.lastVer || ver <= pWal->vers.commitVer || ver <= pWal->vers.snapshotVer) {
33✔
166
    code = TSDB_CODE_WAL_INVALID_VER;
12✔
167
    goto _exit;
12✔
168
  }
169

170
  // find correct file
171
  if (ver < walGetLastFileFirstVer(pWal)) {
21!
172
    // change current files
173
    TAOS_CHECK_EXIT_SET_CODE(walChangeWrite(pWal, ver), code, terrno);
×
174

175
    // delete files in descending order
176
    int fileSetSize = taosArrayGetSize(pWal->fileInfoSet);
×
177
    for (int i = pWal->writeCur + 1; i < fileSetSize; i++) {
×
178
      SWalFileInfo *pInfo = taosArrayPop(pWal->fileInfoSet);
×
179

180
      walBuildLogName(pWal, pInfo->firstVer, fnameStr);
×
181
      wDebug("vgId:%d, wal remove file %s for rollback", pWal->cfg.vgId, fnameStr);
×
182
      if (taosRemoveFile(fnameStr) != 0) {
×
183
        wWarn("vgId:%d, failed to remove file %s for rollback since %s", pWal->cfg.vgId, fnameStr, terrstr());
×
184
      }
185
      walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
×
186
      wDebug("vgId:%d, wal remove file %s for rollback", pWal->cfg.vgId, fnameStr);
×
187
      if (taosRemoveFile(fnameStr) != 0) {
×
188
        wWarn("vgId:%d, failed to remove file %s for rollback since %s", pWal->cfg.vgId, fnameStr, terrstr());
×
189
      }
190
    }
191
  }
192

193
  walBuildIdxName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
21✔
194
  pIdxFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
21✔
195
  TSDB_CHECK_NULL(pIdxFile, code, lino, _exit, terrno);
21!
196
  int64_t idxOff = walGetVerIdxOffset(pWal, ver);
21✔
197
  ret = taosLSeekFile(pIdxFile, idxOff, SEEK_SET);
21✔
198
  if (ret < 0) {
21!
199
    code = terrno;
×
200
    goto _exit;
×
201
  }
202
  // read idx file and get log file pos
203
  SWalIdxEntry entry;
204
  if (taosReadFile(pIdxFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
21!
205
    code = terrno;
×
206
    goto _exit;
×
207
  }
208

209
  walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
21✔
210
  pLogFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
21✔
211
  wDebug("vgId:%d, wal truncate file %s", pWal->cfg.vgId, fnameStr);
21!
212
  TSDB_CHECK_NULL(pLogFile, code, lino, _exit, terrno);
21!
213

214
  ret = taosLSeekFile(pLogFile, entry.offset, SEEK_SET);
21✔
215
  if (ret < 0) {
21!
216
    // TODO
217
    code = terrno;
×
218
    goto _exit;
×
219
  }
220
  // validate offset
221
  SWalCkHead head;
222
  int64_t    size = taosReadFile(pLogFile, &head, sizeof(SWalCkHead));
21✔
223
  if (size != sizeof(SWalCkHead)) {
21!
224
    code = terrno;
×
225
    goto _exit;
×
226
  }
227
  code = walValidHeadCksum(&head);
21✔
228

229
  if (code != 0 || head.head.version != ver) {
21!
230
    code = TSDB_CODE_WAL_FILE_CORRUPTED;
×
231
    goto _exit;
×
232
  }
233

234
  // truncate old files
235
  if ((code = taosFtruncateFile(pLogFile, entry.offset)) < 0) goto _exit;
21!
236

237
  if ((code = taosFtruncateFile(pIdxFile, idxOff)) < 0) goto _exit;
21!
238

239
  pWal->vers.lastVer = ver - 1;
21✔
240
  ((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->lastVer = ver - 1;
21✔
241
  ((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->fileSize = entry.offset;
21✔
242

243
  TAOS_CHECK_EXIT(walSaveMeta(pWal));
21!
244

245
_exit:
21✔
246
  if (code != 0) {
33✔
247
    wError("vgId:%d, %s failed at line %d since %s", pWal->cfg.vgId, __func__, lino, tstrerror(code));
12!
248
  }
249
  TAOS_UNUSED(taosCloseFile(&pIdxFile));
33✔
250
  TAOS_UNUSED(taosCloseFile(&pLogFile));
33✔
251
  TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
33✔
252

253
  TAOS_RETURN(code);
33✔
254
}
255

256
int32_t walRollImpl(SWal *pWal) {
36,989✔
257
  int32_t code = 0, lino = 0;
36,989✔
258

259
  if (pWal->cfg.level == TAOS_WAL_SKIP && pWal->pIdxFile != NULL && pWal->pLogFile != NULL) {
36,989!
260
    TAOS_RETURN(TSDB_CODE_SUCCESS);
×
261
  }
262
  if (pWal->pIdxFile != NULL) {
36,989✔
263
    if ((code = taosFsyncFile(pWal->pIdxFile)) != 0) {
25,605!
264
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
265
    }
266
    code = taosCloseFile(&pWal->pIdxFile);
25,604✔
267
    if (code != 0) {
25,605!
268
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
269
    }
270
  }
271

272
  if (pWal->pLogFile != NULL) {
36,989✔
273
    if ((code = taosFsyncFile(pWal->pLogFile)) != 0) {
25,605!
274
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
275
    }
276
    code = taosCloseFile(&pWal->pLogFile);
25,605✔
277
    if (code != 0) {
25,605!
278
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
279
    }
280
  }
281

282
  TdFilePtr pIdxFile, pLogFile;
283
  // create new file
284
  int64_t newFileFirstVer = pWal->vers.lastVer + 1;
36,989✔
285
  char    fnameStr[WAL_FILE_LEN];
286
  walBuildIdxName(pWal, newFileFirstVer, fnameStr);
36,989✔
287
  pIdxFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_READ | TD_FILE_WRITE | TD_FILE_APPEND);
36,989✔
288
  TSDB_CHECK_NULL(pIdxFile, code, lino, _exit, terrno);
36,989!
289

290
  walBuildLogName(pWal, newFileFirstVer, fnameStr);
36,989✔
291
  pLogFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_READ | TD_FILE_WRITE | TD_FILE_APPEND);
36,989✔
292
  wDebug("vgId:%d, wal create new file for write:%s", pWal->cfg.vgId, fnameStr);
36,988✔
293
  TSDB_CHECK_NULL(pLogFile, code, lino, _exit, terrno);
36,989!
294

295
  TAOS_CHECK_GOTO(walRollFileInfo(pWal), &lino, _exit);
36,989!
296

297
  // switch file
298
  pWal->pIdxFile = pIdxFile;
36,987✔
299
  pWal->pLogFile = pLogFile;
36,987✔
300
  pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
36,987✔
301

302
  pWal->lastRollSeq = walGetSeq();
36,988✔
303

304
  TAOS_CHECK_GOTO(walSaveMeta(pWal), &lino, _exit);
36,987!
305

306
_exit:
36,989✔
307
  if (code) {
36,989!
308
    wError("vgId:%d, %s failed at line %d since %s", pWal->cfg.vgId, __func__, lino, tstrerror(code));
×
309
  }
310

311
  TAOS_RETURN(code);
36,989✔
312
}
313

314
static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) {
315
  if (taosArrayGetSize(pWal->fileInfoSet) == 0) {
2,228,713✔
316
    TAOS_CHECK_RETURN(walRollImpl(pWal));
11,384!
317

318
    TAOS_RETURN(TSDB_CODE_SUCCESS);
11,384✔
319
  }
320

321
  int64_t passed = walGetSeq() - pWal->lastRollSeq;
2,217,320✔
322
  if (pWal->cfg.rollPeriod != -1 && pWal->cfg.rollPeriod != 0 && passed > pWal->cfg.rollPeriod) {
2,217,312!
323
    TAOS_CHECK_RETURN(walRollImpl(pWal));
×
324
  } else if (pWal->cfg.segSize != -1 && pWal->cfg.segSize != 0 && walGetLastFileSize(pWal) > pWal->cfg.segSize) {
2,217,312!
325
    TAOS_CHECK_RETURN(walRollImpl(pWal));
×
326
  }
327

328
  if (walGetLastFileCachedSize(pWal) > tsWalFsyncDataSizeLimit) {
2,217,312!
329
    TAOS_CHECK_RETURN(walSaveMeta(pWal));
×
330
  }
331

332
  TAOS_RETURN(TSDB_CODE_SUCCESS);
2,217,316✔
333
}
334

335
int32_t walBeginSnapshot(SWal *pWal, int64_t ver, int64_t logRetention) {
25,907✔
336
  int32_t code = 0;
25,907✔
337
  int32_t lino = 0;
25,907✔
338

339
  if (pWal->cfg.level == TAOS_WAL_SKIP) {
25,907✔
340
    TAOS_RETURN(TSDB_CODE_SUCCESS);
6✔
341
  }
342

343
  if (logRetention < 0) {
25,901!
344
    TAOS_RETURN(TSDB_CODE_FAILED);
×
345
  }
346

347
  TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex));
25,901✔
348
  pWal->vers.verInSnapshotting = ver;
25,901✔
349
  pWal->vers.logRetention = logRetention;
25,901✔
350

351
  wDebug("vgId:%d, wal begin snapshot for index:%" PRId64 ", log retention:%" PRId64 " first index:%" PRId64
25,901✔
352
         ", last index:%" PRId64,
353
         pWal->cfg.vgId, ver, pWal->vers.logRetention, pWal->vers.firstVer, pWal->vers.lastVer);
354
  // check file rolling
355
  if (walGetLastFileSize(pWal) != 0 && (code = walRollImpl(pWal)) < 0) goto _exit;
25,901!
356

357
_exit:
25,901✔
358
  if (code) {
25,901!
359
    wError("vgId:%d, %s failed since %s at line %d", pWal->cfg.vgId, __func__, tstrerror(code), lino);
×
360
  }
361
  TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
25,901✔
362
  TAOS_RETURN(code);
25,901✔
363
}
364

365
int32_t walEndSnapshot(SWal *pWal) {
25,906✔
366
  int32_t code = 0, lino = 0;
25,906✔
367

368
  if (pWal->cfg.level == TAOS_WAL_SKIP) {
25,906✔
369
    TAOS_RETURN(TSDB_CODE_SUCCESS);
6✔
370
  }
371

372
  TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex));
25,900✔
373
  int64_t ver = pWal->vers.verInSnapshotting;
25,901✔
374

375
  wDebug("vgId:%d, wal end snapshot for index:%" PRId64 ", log retention:%" PRId64 " first index:%" PRId64
25,901✔
376
         ", last index:%" PRId64,
377
         pWal->cfg.vgId, ver, pWal->vers.logRetention, pWal->vers.firstVer, pWal->vers.lastVer);
378

379
  if (ver == -1) {
25,901!
380
    TAOS_CHECK_GOTO(TSDB_CODE_FAILED, &lino, _exit);
×
381
  }
382

383
  pWal->vers.snapshotVer = ver;
25,901✔
384
  int ts = taosGetTimestampSec();
25,901✔
385
  ver = TMAX(ver - pWal->vers.logRetention, pWal->vers.firstVer - 1);
25,901✔
386

387
  // compatible mode for refVer
388
  bool    hasTopic = false;
25,901✔
389
  int64_t refVer = INT64_MAX;
25,901✔
390
  void   *pIter = NULL;
25,901✔
391
  while (1) {
229✔
392
    pIter = taosHashIterate(pWal->pRefHash, pIter);
26,130✔
393
    if (pIter == NULL) break;
26,130✔
394
    SWalRef *pRef = *(SWalRef **)pIter;
229✔
395
    if (pRef->refVer == -1) continue;
229✔
396
    refVer = TMIN(refVer, pRef->refVer - 1);
183✔
397
    hasTopic = true;
183✔
398
  }
399
  if (pWal->cfg.retentionPeriod == 0 && hasTopic) {
25,901!
400
    wInfo("vgId:%d, wal found ref index:%" PRId64 " in compatible mode, index:%" PRId64, pWal->cfg.vgId, refVer, ver);
×
401
    ver = TMIN(ver, refVer);
×
402
  }
403

404
  // find files safe to delete
405
  int          deleteCnt = 0;
25,901✔
406
  int64_t      newTotSize = pWal->totSize;
25,901✔
407
  SWalFileInfo tmp = {0};
25,901✔
408
  tmp.firstVer = ver;
25,901✔
409
  SWalFileInfo *pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE);
25,901✔
410

411
  if (pInfo) {
25,900✔
412
    wDebug("vgId:%d, wal search found file info, index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64,
21,593✔
413
           pWal->cfg.vgId, ver, pInfo->firstVer, pInfo->lastVer);
414
    if (ver > pInfo->lastVer) {
21,594!
415
      TAOS_CHECK_GOTO(TSDB_CODE_FAILED, &lino, _exit);
×
416
    }
417

418
    if (ver == pInfo->lastVer) {
21,594✔
419
      pInfo++;
21,112✔
420
    }
421

422
    // iterate files, until the searched result
423
    // delete according to file size or close time
424
    SWalFileInfo *pUntil = NULL;
21,594✔
425
    for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
5,446,895✔
426
      if ((pWal->cfg.retentionSize > 0 && newTotSize > pWal->cfg.retentionSize) ||
5,425,301✔
427
          (pWal->cfg.retentionPeriod == 0 ||
5,425,295✔
428
           pWal->cfg.retentionPeriod > 0 && iter->closeTs >= 0 && iter->closeTs + pWal->cfg.retentionPeriod < ts)) {
5,425,086!
429
        newTotSize -= iter->fileSize;
216✔
430
        pUntil = iter;
216✔
431
      }
432
    }
433
    for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter <= pUntil; iter++) {
21,810✔
434
      deleteCnt++;
216✔
435
      if (taosArrayPush(pWal->toDeleteFiles, iter) == NULL) {
432!
436
        wError("vgId:%d, failed to push file info to delete list", pWal->cfg.vgId);
×
437
      }
438
    }
439

440
    // make new array, remove files
441
    taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt);
21,594✔
442
    if (taosArrayGetSize(pWal->fileInfoSet) == 0) {
21,593!
443
      pWal->writeCur = -1;
×
444
      pWal->vers.firstVer = pWal->vers.lastVer + 1;
×
445
    } else {
446
      pWal->vers.firstVer = ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
21,592✔
447
    }
448
  }
449

450
  // update meta
451
  pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
25,899✔
452
  pWal->totSize = newTotSize;
25,899✔
453
  pWal->vers.verInSnapshotting = -1;
25,899✔
454

455
  TAOS_CHECK_GOTO(walSaveMeta(pWal), &lino, _exit);
25,899!
456

457
  // delete files
458
  deleteCnt = taosArrayGetSize(pWal->toDeleteFiles);
25,901✔
459
  char fnameStr[WAL_FILE_LEN] = {0};
25,901✔
460
  pInfo = NULL;
25,901✔
461

462
  for (int i = 0; i < deleteCnt; i++) {
26,117✔
463
    pInfo = taosArrayGet(pWal->toDeleteFiles, i);
216✔
464

465
    walBuildLogName(pWal, pInfo->firstVer, fnameStr);
216✔
466
    if (taosRemoveFile(fnameStr) < 0 && ERRNO != ENOENT) {
216!
467
      wError("vgId:%d, failed to remove log file %s since %s", pWal->cfg.vgId, fnameStr, strerror(ERRNO));
×
468
      goto _exit;
×
469
    }
470
    walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
216✔
471
    if (taosRemoveFile(fnameStr) < 0 && ERRNO != ENOENT) {
216!
472
      wError("vgId:%d, failed to remove idx file %s since %s", pWal->cfg.vgId, fnameStr, strerror(ERRNO));
×
473
      goto _exit;
×
474
    }
475
  }
476
  if (pInfo != NULL) {
25,901✔
477
    wInfo("vgId:%d, wal log files recycled, count:%d, until index:%" PRId64 ", closeTs:%" PRId64, pWal->cfg.vgId,
211✔
478
          deleteCnt, pInfo->lastVer, pInfo->closeTs);
479
  }
480
  taosArrayClear(pWal->toDeleteFiles);
25,901✔
481

482
_exit:
25,901✔
483
  taosThreadRwlockUnlock(&pWal->mutex);
25,901✔
484

485
  if (code) {
25,901!
486
    wError("vgId:%d, %s failed at line %d since %s", pWal->cfg.vgId, __func__, lino, tstrerror(code));
×
487
  }
488

489
  return code;
25,901✔
490
}
491

492
static void walStopDnode(SWal *pWal) {
×
493
  if (pWal->stopDnode != NULL) {
×
494
    wWarn("vgId:%d, set stop dnode flag", pWal->cfg.vgId);
×
495
    pWal->stopDnode();
×
496
  }
497
}
×
498

499
static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset, const STraceId *trace) {
2,227,804✔
500
  int32_t code = 0;
2,227,804✔
501

502
  SWalIdxEntry  entry = {.ver = ver, .offset = offset};
2,227,804✔
503
  SWalFileInfo *pFileInfo = walGetCurFileInfo(pWal);
2,227,804✔
504

505
  int64_t idxOffset = (entry.ver - pFileInfo->firstVer) * sizeof(SWalIdxEntry);
2,227,809✔
506
  wGTrace(trace, "vgId:%d, index:%" PRId64 ", write log entry at %" PRId64 ", offset:%" PRId64, pWal->cfg.vgId, ver,
2,227,809!
507
          idxOffset, offset);
508

509
  int64_t size = taosWriteFile(pWal->pIdxFile, &entry, sizeof(SWalIdxEntry));
2,227,809✔
510
  if (size != sizeof(SWalIdxEntry)) {
2,227,778!
511
    wGError(trace, "vgId:%d, index:%" PRId64 ", failed to write entry since %s", pWal->cfg.vgId, ver, strerror(ERRNO));
×
512
    walStopDnode(pWal);
×
513
    TAOS_RETURN(terrno);
×
514
  }
515

516
  // check alignment of idx entries
517
  int64_t endOffset = taosLSeekFile(pWal->pIdxFile, 0, SEEK_END);
2,227,778✔
518
  if (endOffset < 0) {
2,227,750!
519
    wGFatal(trace, "vgId:%d, index:%" PRId64 ", failed to seek end of WAL idxfile since %s, endOffset:%" PRId64,
×
520
            pWal->cfg.vgId, ver, tstrerror(terrno), endOffset);
521
    taosMsleep(100);
×
522
    exit(EXIT_FAILURE);
×
523
  }
524

525
  TAOS_RETURN(TSDB_CODE_SUCCESS);
2,227,750✔
526
}
527

528
static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta,
529
                                         const void *body, int32_t bodyLen, const STraceId *trace) {
530
  int32_t code = 0, lino = 0;
2,228,693✔
531
  int32_t plainBodyLen = bodyLen;
2,228,693✔
532

533
  int64_t       offset = walGetCurFileOffset(pWal);
2,228,693✔
534
  SWalFileInfo *pFileInfo = walGetCurFileInfo(pWal);
2,228,676✔
535

536
  pWal->writeHead.head.version = index;
2,228,678✔
537
  pWal->writeHead.head.bodyLen = plainBodyLen;
2,228,678✔
538
  pWal->writeHead.head.msgType = msgType;
2,228,678✔
539
  pWal->writeHead.head.ingestTs = taosGetTimestampUs();
2,228,682✔
540

541
  // sync info for sync module
542
  pWal->writeHead.head.syncMeta = syncMeta;
2,228,682✔
543

544
  pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead);
2,228,682✔
545
  pWal->writeHead.cksumBody = walCalcBodyCksum(body, plainBodyLen);
2,228,700✔
546
  wGDebug(trace, "vgId:%d, index:%" PRId64 ", write log, type:%s, cksum head:%u, cksum body:%u", pWal->cfg.vgId, index,
2,228,706!
547
          TMSG_INFO(msgType), pWal->writeHead.cksumHead, pWal->writeHead.cksumBody);
548

549
  if (pWal->cfg.level != TAOS_WAL_SKIP) {
2,228,650✔
550
    TAOS_CHECK_GOTO(walWriteIndex(pWal, index, offset, trace), &lino, _exit);
2,227,803!
551
  }
552

553
  if (pWal->cfg.level != TAOS_WAL_SKIP &&
4,456,391!
554
      taosWriteFile(pWal->pLogFile, &pWal->writeHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) {
2,227,768✔
555
    code = terrno;
×
556
    wGError(trace, "vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId,
×
557
            walGetLastFileFirstVer(pWal), strerror(ERRNO));
558
    walStopDnode(pWal);
×
559
    TAOS_CHECK_GOTO(code, &lino, _exit);
×
560
  }
561

562
  int32_t cyptedBodyLen = plainBodyLen;
2,228,623✔
563
  char   *buf = (char *)body;
2,228,623✔
564
  char   *newBody = NULL;
2,228,623✔
565
  char   *newBodyEncrypted = NULL;
2,228,623✔
566

567
  if (pWal->cfg.encryptAlgorithm == DND_CA_SM4) {
2,228,623✔
568
    cyptedBodyLen = ENCRYPTED_LEN(cyptedBodyLen);
337!
569

570
    newBody = taosMemoryMalloc(cyptedBodyLen);
337!
571
    TSDB_CHECK_NULL(newBody, code, lino, _exit, terrno);
337!
572

573
    (void)memset(newBody, 0, cyptedBodyLen);
337✔
574
    (void)memcpy(newBody, body, plainBodyLen);
337✔
575

576
    newBodyEncrypted = taosMemoryMalloc(cyptedBodyLen);
337!
577
    if (newBodyEncrypted == NULL) {
337!
578
      wGError(trace, "vgId:%d, file:%" PRId64 ".log, failed to malloc since %s", pWal->cfg.vgId,
×
579
              walGetLastFileFirstVer(pWal), strerror(ERRNO));
580

581
      if (newBody != NULL) taosMemoryFreeClear(newBody);
×
582

583
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
584
    }
585

586
    SCryptOpts opts;
587
    opts.len = cyptedBodyLen;
337✔
588
    opts.source = newBody;
337✔
589
    opts.result = newBodyEncrypted;
337✔
590
    opts.unitLen = 16;
337✔
591
    tstrncpy((char *)opts.key, pWal->cfg.encryptKey, ENCRYPT_KEY_LEN + 1);
337✔
592

593
    int32_t count = CBC_Encrypt(&opts);
337✔
594

595
    // wDebug("vgId:%d, file:%" PRId64 ".log, index:%" PRId64 ", CBC_Encrypt cryptedBodyLen:%d, plainBodyLen:%d, %s",
596
    //       pWal->cfg.vgId, walGetLastFileFirstVer(pWal), index, count, plainBodyLen, __FUNCTION__);
597

598
    buf = newBodyEncrypted;
337✔
599
  }
600

601
  if (pWal->cfg.level != TAOS_WAL_SKIP && taosWriteFile(pWal->pLogFile, (char *)buf, cyptedBodyLen) != cyptedBodyLen) {
2,228,623!
602
    code = terrno;
×
603
    wGError(trace, "vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId,
×
604
            walGetLastFileFirstVer(pWal), strerror(ERRNO));
605

606
    if (pWal->cfg.encryptAlgorithm == DND_CA_SM4) {
×
607
      taosMemoryFreeClear(newBody);
×
608
      taosMemoryFreeClear(newBodyEncrypted);
×
609
    }
610

611
    walStopDnode(pWal);
×
612

613
    TAOS_CHECK_GOTO(code, &lino, _exit);
×
614
  }
615

616
  if (pWal->cfg.encryptAlgorithm == DND_CA_SM4) {
2,228,671✔
617
    taosMemoryFreeClear(newBody);
337!
618
    taosMemoryFreeClear(newBodyEncrypted);
337!
619
  }
620

621
  // set status
622
  if (pWal->vers.firstVer == -1) {
2,228,671✔
623
    pWal->vers.firstVer = index;
11,298✔
624
  }
625
  pWal->vers.lastVer = index;
2,228,671✔
626
  pWal->totSize += sizeof(SWalCkHead) + cyptedBodyLen;
2,228,671✔
627
  pFileInfo->lastVer = index;
2,228,671✔
628
  pFileInfo->fileSize += sizeof(SWalCkHead) + cyptedBodyLen;
2,228,671✔
629

630
  return 0;
2,228,671✔
631

632
_exit:
×
633
  if (code) {
×
634
    wGError(trace, "vgId:%d, %s failed at line %d since %s", pWal->cfg.vgId, __func__, lino, tstrerror(code));
×
635
  }
636

637
  // recover in a reverse order
638
  if (taosFtruncateFile(pWal->pLogFile, offset) < 0) {
×
639
    wGFatal(trace, "vgId:%d, failed to recover WAL logfile from write error since %s, offset:%" PRId64, pWal->cfg.vgId,
×
640
            terrstr(), offset);
641
    taosMsleep(100);
×
642
    exit(EXIT_FAILURE);
×
643
  }
644

645
  int64_t idxOffset = (index - pFileInfo->firstVer) * sizeof(SWalIdxEntry);
×
646
  if (taosFtruncateFile(pWal->pIdxFile, idxOffset) < 0) {
×
647
    wGFatal(trace, "vgId:%d, failed to recover WAL idxfile from write error since %s, offset:%" PRId64, pWal->cfg.vgId,
×
648
            terrstr(), idxOffset);
649
    taosMsleep(100);
×
650
    exit(EXIT_FAILURE);
×
651
  }
652

653
  TAOS_RETURN(TSDB_CODE_FAILED);
×
654
}
655

656
static int32_t walInitWriteFile(SWal *pWal) {
2,764✔
657
  TdFilePtr     pIdxTFile, pLogTFile;
658
  int64_t       fileFirstVer = -1;
2,764✔
659
  int32_t       code = 0;
2,764✔
660
  SWalFileInfo *pRet = taosArrayGetLast(pWal->fileInfoSet);
2,764✔
661
  if (pRet == NULL) {
2,764!
662
    fileFirstVer = pWal->vers.lastVer + 1;
×
663
  }
664
  fileFirstVer = pRet->firstVer;
2,764✔
665

666
  char fnameStr[WAL_FILE_LEN];
667
  walBuildIdxName(pWal, fileFirstVer, fnameStr);
2,764✔
668
  pIdxTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_READ | TD_FILE_WRITE | TD_FILE_APPEND);
2,765✔
669
  if (pIdxTFile == NULL) {
2,765!
670
    TAOS_RETURN(terrno);
×
671
  }
672
  walBuildLogName(pWal, fileFirstVer, fnameStr);
2,765✔
673
  pLogTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_READ |TD_FILE_WRITE | TD_FILE_APPEND);
2,765✔
674
  if (pLogTFile == NULL) {
2,765!
675
    TAOS_UNUSED(taosCloseFile(&pIdxTFile));
×
676
    TAOS_RETURN(terrno);
×
677
  }
678
  // switch file
679
  pWal->pIdxFile = pIdxTFile;
2,765✔
680
  pWal->pLogFile = pLogTFile;
2,765✔
681
  if (taosArrayGetSize(pWal->fileInfoSet) == 0) {
2,765!
682
    code = walRollFileInfo(pWal);
×
683
    if (code < 0) {
×
684
      wError("vgId:%d, failed to roll file info while init write file since %s", pWal->cfg.vgId, terrstr());
×
685
      TAOS_RETURN(code);
×
686
    }
687
  }
688
  pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
2,765✔
689

690
  TAOS_RETURN(TSDB_CODE_SUCCESS);
2,765✔
691
}
692

693
int32_t walAppendLog(SWal *pWal, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body,
2,228,752✔
694
                     int32_t bodyLen, const STraceId *trace) {
695
  int32_t code = 0, lino = 0;
2,228,752✔
696

697
  TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex));
2,228,752✔
698

699
  if (index != pWal->vers.lastVer + 1) {
2,228,779✔
700
    TAOS_CHECK_GOTO(TSDB_CODE_WAL_INVALID_VER, &lino, _exit);
66!
701
  }
702

703
  TAOS_CHECK_GOTO(walCheckAndRoll(pWal), &lino, _exit);
2,228,686!
704

705
  if (pWal->pLogFile == NULL || pWal->pIdxFile == NULL || pWal->writeCur < 0) {
2,228,686!
706
    TAOS_CHECK_GOTO(walInitWriteFile(pWal), &lino, _exit);
2,758!
707
  }
708

709
  TAOS_CHECK_GOTO(walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen, trace), &lino, _exit);
4,457,364!
710

711
_exit:
2,228,671✔
712
  if (code) {
2,228,737✔
713
    wError("vgId:%d, %s failed at line %d since %s", pWal->cfg.vgId, __func__, lino, tstrerror(code));
66!
714
  }
715

716
  TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
2,228,737✔
717
  return code;
2,228,750✔
718
}
719

720
int32_t walFsync(SWal *pWal, bool forceFsync) {
2,223,402✔
721
  int32_t code = 0;
2,223,402✔
722

723
  if (pWal->cfg.level == TAOS_WAL_SKIP) {
2,223,402!
724
    return code;
×
725
  }
726

727
  TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex));
2,223,402✔
728
  if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) {
2,223,485✔
729
    wTrace("vgId:%d, fileId:%" PRId64 ".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));
119,332✔
730
    if (taosFsyncFile(pWal->pLogFile) < 0) {
119,332!
731
      wError("vgId:%d, file:%" PRId64 ".log, fsync failed since %s", pWal->cfg.vgId, walGetCurFileFirstVer(pWal),
×
732
             strerror(ERRNO));
733
      code = terrno;
×
734
    }
735
  }
736
  TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
2,223,485✔
737

738
  return code;
2,223,441✔
739
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc