• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4720

08 Sep 2025 08:43AM UTC coverage: 58.139% (-0.6%) from 58.762%
#4720

push

travis-ci

web-flow
Merge pull request #32881 from taosdata/enh/add-new-windows-ci

fix(ci): update workflow reference to use new Windows CI YAML

133181 of 292179 branches covered (45.58%)

Branch coverage included in aggregate %.

201691 of 283811 relevant lines covered (71.07%)

5442780.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.22
/source/libs/wal/src/walMgmt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "os.h"
18
#include "taoserror.h"
19
#include "tcompare.h"
20
#include "tref.h"
21
#include "walInt.h"
22

23
typedef struct {
24
  int8_t      stop;
25
  int8_t      inited;
26
  uint32_t    seq;
27
  int32_t     refSetId;
28
  TdThread    thread;
29
  stopDnodeFn stopDnode;
30
} SWalMgmt;
31

32
static SWalMgmt tsWal = {0, .seq = 1};
33
static int32_t  walCreateThread();
34
static void     walStopThread();
35
static void     walFreeObj(void *pWal);
36

37
int64_t walGetSeq() { return (int64_t)atomic_load_32((volatile int32_t *)&tsWal.seq); }
2,254,290✔
38

39
int32_t walInit(stopDnodeFn stopDnode) {
6,699✔
40
  int8_t old;
41
  while (1) {
42
    old = atomic_val_compare_exchange_8(&tsWal.inited, 0, 2);
6,699✔
43
    if (old != 2) break;
6,699!
44
  }
45

46
  if (old == 0) {
6,699✔
47
    tsWal.refSetId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj);
2,403✔
48

49
    int32_t code = walCreateThread();
2,403✔
50
    if (TSDB_CODE_SUCCESS != code) {
2,403!
51
      wError("failed to init wal module since %s", tstrerror(code));
×
52
      atomic_store_8(&tsWal.inited, 0);
×
53

54
      TAOS_RETURN(code);
×
55
    }
56

57
    wInfo("wal module is initialized, rsetId:%d", tsWal.refSetId);
2,403✔
58
    atomic_store_8(&tsWal.inited, 1);
2,403✔
59
  }
60

61
  if (stopDnode == NULL) {
6,699✔
62
    wWarn("failed to set stop dnode call back");
18!
63
  }
64
  tsWal.stopDnode = stopDnode;
6,699✔
65

66
  return 0;
6,699✔
67
}
68

69
void walCleanUp() {
4,781✔
70
  int8_t old;
71
  while (1) {
72
    old = atomic_val_compare_exchange_8(&tsWal.inited, 1, 2);
4,781✔
73
    if (old != 2) break;
4,781!
74
  }
75

76
  if (old == 1) {
4,781✔
77
    walStopThread();
2,403✔
78
    taosCloseRef(tsWal.refSetId);
2,403✔
79
    wInfo("wal module is cleaned up");
2,403✔
80
    atomic_store_8(&tsWal.inited, 0);
2,403✔
81
  }
82
}
4,781✔
83

84
static int32_t walInitLock(SWal *pWal) {
14,449✔
85
  TdThreadRwlockAttr attr;
86
  (void)taosThreadRwlockAttrInit(&attr);
14,449✔
87
  (void)taosThreadRwlockAttrSetKindNP(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
14,449✔
88
  (void)taosThreadRwlockInit(&pWal->mutex, &attr);
14,449✔
89
  (void)taosThreadRwlockAttrDestroy(&attr);
14,449✔
90
  return 0;
14,449✔
91
}
92

93
int32_t walInitWriteFileForSkip(SWal *pWal) {
12✔
94
  TdFilePtr pIdxTFile = NULL, pLogTFile = NULL;
12✔
95
  int64_t   fileFirstVer = 0;
12✔
96
  int32_t   code = 0;
12✔
97

98
  char fnameStr[WAL_FILE_LEN];
99
  walBuildIdxName(pWal, fileFirstVer, fnameStr);
12✔
100
  pIdxTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
12✔
101
  if (pIdxTFile == NULL) {
12!
102
    wError("vgId:%d, failed to open file %s since %s", pWal->cfg.vgId, fnameStr, tstrerror(terrno));
×
103
    code = terrno;
×
104
    goto _exit;
×
105
  }
106
  walBuildLogName(pWal, fileFirstVer, fnameStr);
12✔
107
  pLogTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
12✔
108
  if (pLogTFile == NULL) {
12!
109
    wError("vgId:%d, failed to open file %s since %s", pWal->cfg.vgId, fnameStr, tstrerror(terrno));
×
110
    code = terrno;
×
111
    goto _exit;
×
112
  }
113
  // switch file
114
  pWal->pIdxFile = pIdxTFile;
12✔
115
  pWal->pLogFile = pLogTFile;
12✔
116
  SWalFileInfo fileInfo;
117
  (void)memset(&fileInfo, -1, sizeof(SWalFileInfo));
12✔
118
  if (!taosArrayPush(pWal->fileInfoSet, &fileInfo)) {
24!
119
    wError("vgId:%d, failed to push fileInfo into array since %s", pWal->cfg.vgId, tstrerror(terrno));
×
120
    code = terrno;
×
121
    goto _exit;
×
122
  }
123
  pWal->writeCur = 0;
12✔
124
_exit:
12✔
125
  if (code != TSDB_CODE_SUCCESS) {
12!
126
    (void)taosCloseFile(&pIdxTFile);
×
127
    (void)taosCloseFile(&pLogTFile);
×
128
  }
129
  TAOS_RETURN(code);
12✔
130
}
131

132
SWal *walOpen(const char *path, SWalCfg *pCfg) {
14,449✔
133
  int32_t code = 0;
14,449✔
134
  SWal   *pWal = taosMemoryCalloc(1, sizeof(SWal));
14,449!
135
  if (pWal == NULL) {
14,449!
136
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
137
    return NULL;
×
138
  }
139

140
  if (walInitLock(pWal) < 0) {
14,449!
141
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
142
    taosMemoryFree(pWal);
×
143
    return NULL;
×
144
  }
145

146
  // set config
147
  (void)memcpy(&pWal->cfg, pCfg, sizeof(SWalCfg));
14,449✔
148

149
  pWal->fsyncSeq = pCfg->fsyncPeriod / 1000;
14,449✔
150
  if (pWal->cfg.retentionSize > 0) {
14,449✔
151
    pWal->cfg.retentionSize *= 1024;
167✔
152
  }
153

154
  if (pWal->cfg.segSize > 0) {
14,449!
155
    pWal->cfg.segSize *= 1024;
×
156
  }
157

158
  if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1;
14,449✔
159

160
  tstrncpy(pWal->path, path, sizeof(pWal->path));
14,449✔
161
  if (taosMkDir(pWal->path) != 0) {
14,449!
162
    wError("vgId:%d, path:%s, failed to create directory since %s", pWal->cfg.vgId, pWal->path, tstrerror(terrno));
×
163
    goto _err;
×
164
  }
165

166
  // init ref
167
  pWal->pRefHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
14,448✔
168
  if (pWal->pRefHash == NULL) {
14,447!
169
    wError("vgId:%d, failed to init hash since %s", pWal->cfg.vgId, tstrerror(terrno));
×
170
    goto _err;
×
171
  }
172

173
  // open meta
174
  walResetVer(&pWal->vers);
14,447✔
175
  pWal->pLogFile = NULL;
14,447✔
176
  pWal->pIdxFile = NULL;
14,447✔
177
  pWal->writeCur = -1;
14,447✔
178
  pWal->fileInfoSet = taosArrayInit(8, sizeof(SWalFileInfo));
14,447✔
179
  if (pWal->fileInfoSet == NULL) {
14,448!
180
    wError("vgId:%d, failed to init taosArray of fileInfoSet since %s, path:%s", pWal->cfg.vgId, strerror(ERRNO),
×
181
           pWal->path);
182
    goto _err;
×
183
  }
184

185
  // init gc
186
  pWal->toDeleteFiles = taosArrayInit(8, sizeof(SWalFileInfo));
14,448✔
187
  if (pWal->toDeleteFiles == NULL) {
14,449!
188
    wError("vgId:%d, failed to init taosArray of toDeleteFiles since %s, path:%s", pWal->cfg.vgId, strerror(ERRNO),
×
189
           pWal->path);
190
    goto _err;
×
191
  }
192

193
  // init status
194
  pWal->totSize = 0;
14,449✔
195
  pWal->lastRollSeq = -1;
14,449✔
196

197
  // init write buffer
198
  (void)memset(&pWal->writeHead, 0, sizeof(SWalCkHead));
14,449✔
199
  pWal->writeHead.head.protoVer = WAL_PROTO_VER;
14,449✔
200
  pWal->writeHead.magic = WAL_MAGIC;
14,449✔
201

202
  // load meta
203
  code = walLoadMeta(pWal);
14,449✔
204
  if (code < 0) {
14,449✔
205
    wWarn("vgId:%d, failed to load meta, code:0x%x", pWal->cfg.vgId, code);
11,424✔
206
  }
207
  if (pWal->cfg.level != TAOS_WAL_SKIP) {
14,449✔
208
    code = walCheckAndRepairMeta(pWal);
14,437✔
209
    if (code < 0) {
14,436!
210
      wError("vgId:%d, cannot open wal since repair meta file failed since %s", pWal->cfg.vgId, tstrerror(code));
×
211
      goto _err;
×
212
    }
213

214
    code = walCheckAndRepairIdx(pWal);
14,436✔
215
    if (code < 0) {
14,436!
216
      wError("vgId:%d, cannot open wal since repair idx file failed since %s", pWal->cfg.vgId, tstrerror(code));
×
217
      goto _err;
×
218
    }
219
  } else {
220
    code = walInitWriteFileForSkip(pWal);
12✔
221
    if (code < 0) {
12!
222
      wError("vgId:%d, cannot open wal since init write file for wal_level = 0 failed since %s", pWal->cfg.vgId,
×
223
             tstrerror(code));
224
      goto _err;
×
225
    }
226
  }
227

228
  // add ref
229
  pWal->refId = taosAddRef(tsWal.refSetId, pWal);
14,448✔
230
  if (pWal->refId < 0) {
14,449!
231
    wError("failed to add ref for Wal since %s", tstrerror(terrno));
×
232
    goto _err;
×
233
  }
234

235
  pWal->stopDnode = tsWal.stopDnode;
14,449✔
236

237
  wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level,
14,449✔
238
         pWal->cfg.fsyncPeriod);
239
  return pWal;
14,448✔
240

241
_err:
×
242
  taosArrayDestroy(pWal->fileInfoSet);
×
243
  taosArrayDestroy(pWal->toDeleteFiles);
×
244
  taosHashCleanup(pWal->pRefHash);
×
245
  TAOS_UNUSED(taosThreadRwlockDestroy(&pWal->mutex));
×
246
  taosMemoryFreeClear(pWal);
×
247

248
  return NULL;
×
249
}
250

251
int32_t walAlter(SWal *pWal, SWalCfg *pCfg) {
244✔
252
  if (pWal == NULL) TAOS_RETURN(TSDB_CODE_APP_ERROR);
244!
253

254
  if (pWal->cfg.level == pCfg->level && pWal->cfg.fsyncPeriod == pCfg->fsyncPeriod &&
244✔
255
      pWal->cfg.retentionPeriod == pCfg->retentionPeriod && pWal->cfg.retentionSize == pCfg->retentionSize) {
126!
256
    wDebug("vgId:%d, walLevel:%d fsync:%d walRetentionPeriod:%d walRetentionSize:%" PRId64 " not change",
×
257
           pWal->cfg.vgId, pWal->cfg.level, pWal->cfg.fsyncPeriod, pWal->cfg.retentionPeriod, pWal->cfg.retentionSize);
258

259
    TAOS_RETURN(TSDB_CODE_SUCCESS);
×
260
  }
261

262
  wInfo("vgId:%d, change old walLevel:%d fsync:%d walRetentionPeriod:%d walRetentionSize:%" PRId64
244!
263
        ", new walLevel:%d fsync:%d walRetentionPeriod:%d walRetentionSize:%" PRId64,
264
        pWal->cfg.vgId, pWal->cfg.level, pWal->cfg.fsyncPeriod, pWal->cfg.retentionPeriod, pWal->cfg.retentionSize,
265
        pCfg->level, pCfg->fsyncPeriod, pCfg->retentionPeriod, pCfg->retentionSize);
266

267
  if (pWal->cfg.level == TAOS_WAL_SKIP && pCfg->level != TAOS_WAL_SKIP) {
244!
268
    wInfo("vgId:%d, remove all wals, path:%s", pWal->cfg.vgId, pWal->path);
×
269
    taosRemoveDir(pWal->path);
×
270
    if (taosMkDir(pWal->path) != 0) {
×
271
      wError("vgId:%d, path:%s, failed to create directory since %s", pWal->cfg.vgId, pWal->path, tstrerror(terrno));
×
272
    }
273
  }
274

275
  pWal->cfg.level = pCfg->level;
244✔
276
  pWal->cfg.fsyncPeriod = pCfg->fsyncPeriod;
244✔
277
  pWal->cfg.retentionPeriod = pCfg->retentionPeriod;
244✔
278
  pWal->cfg.retentionSize = pCfg->retentionSize;
244✔
279

280
  pWal->fsyncSeq = pCfg->fsyncPeriod / 1000;
244✔
281
  if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1;
244✔
282

283
  TAOS_RETURN(TSDB_CODE_SUCCESS);
244✔
284
}
285

286
int32_t walPersist(SWal *pWal) {
22,437✔
287
  int32_t code = 0;
22,437✔
288

289
  TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex));
22,437✔
290
  code = walSaveMeta(pWal);
22,437✔
291
  TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
22,437✔
292

293
  TAOS_RETURN(code);
22,437✔
294
}
295

296
void walClose(SWal *pWal) {
14,449✔
297
  TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex));
14,449✔
298
  if (walSaveMeta(pWal) < 0) {
14,449!
299
    wError("vgId:%d, failed to save meta since %s", pWal->cfg.vgId, tstrerror(terrno));
×
300
  }
301
  TAOS_UNUSED(taosCloseFile(&pWal->pLogFile));
14,449✔
302
  pWal->pLogFile = NULL;
14,449✔
303
  (void)taosCloseFile(&pWal->pIdxFile);
14,449✔
304
  pWal->pIdxFile = NULL;
14,449✔
305
  taosArrayDestroy(pWal->fileInfoSet);
14,449✔
306
  pWal->fileInfoSet = NULL;
14,448✔
307
  taosArrayDestroy(pWal->toDeleteFiles);
14,448✔
308
  pWal->toDeleteFiles = NULL;
14,449✔
309

310
  void *pIter = NULL;
14,449✔
311
  while (1) {
3✔
312
    pIter = taosHashIterate(pWal->pRefHash, pIter);
14,452✔
313
    if (pIter == NULL) break;
14,452✔
314
    SWalRef *pRef = *(SWalRef **)pIter;
3✔
315
    taosMemoryFree(pRef);
3!
316
  }
317
  taosHashCleanup(pWal->pRefHash);
14,449✔
318
  pWal->pRefHash = NULL;
14,449✔
319
  (void)taosThreadRwlockUnlock(&pWal->mutex);
14,449✔
320

321
  if (pWal->cfg.level == TAOS_WAL_SKIP) {
14,449✔
322
    wInfo("vgId:%d, remove all wals, path:%s", pWal->cfg.vgId, pWal->path);
12!
323
    taosRemoveDir(pWal->path);
12✔
324
    if (taosMkDir(pWal->path) != 0) {
12!
325
      wError("vgId:%d, path:%s, failed to create directory since %s", pWal->cfg.vgId, pWal->path, tstrerror(terrno));
×
326
    }
327
  }
328

329
  if (taosRemoveRef(tsWal.refSetId, pWal->refId) < 0) {
14,449!
330
    wError("vgId:%d, failed to remove ref for Wal since %s", pWal->cfg.vgId, tstrerror(terrno));
×
331
  }
332
}
14,449✔
333

334
static void walFreeObj(void *wal) {
14,449✔
335
  SWal *pWal = wal;
14,449✔
336
  wDebug("vgId:%d, wal:%p is freed", pWal->cfg.vgId, pWal);
14,449✔
337

338
  (void)taosThreadRwlockDestroy(&pWal->mutex);
14,449✔
339
  taosMemoryFreeClear(pWal);
14,449!
340
}
14,449✔
341

342
static bool walNeedFsync(SWal *pWal) {
340,727✔
343
  if (pWal->cfg.fsyncPeriod <= 0 || pWal->cfg.level != TAOS_WAL_FSYNC) {
340,727✔
344
    return false;
340,042✔
345
  }
346

347
  if (atomic_load_32((volatile int32_t *)&tsWal.seq) % pWal->fsyncSeq == 0) {
685✔
348
    return true;
402✔
349
  }
350

351
  return false;
283✔
352
}
353

354
static void walUpdateSeq() {
104,434✔
355
  taosMsleep(WAL_REFRESH_MS);
104,434✔
356
  if (atomic_add_fetch_32((volatile int32_t *)&tsWal.seq, 1) < 0) {
104,434!
357
    wError("failed to update wal seq since %s", strerror(ERRNO));
×
358
  }
359
}
104,434✔
360

361
static void walFsyncAll() {
104,434✔
362
  SWal *pWal = taosIterateRef(tsWal.refSetId, 0);
104,434✔
363
  while (pWal) {
445,161✔
364
    if (walNeedFsync(pWal)) {
340,727✔
365
      wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->cfg.vgId, pWal->cfg.level, pWal->fsyncSeq,
402!
366
             atomic_load_32((volatile int32_t *)&tsWal.seq));
367
      int32_t code = taosFsyncFile(pWal->pLogFile);
402✔
368
      if (code != 0) {
402!
369
        wError("vgId:%d, file:%" PRId64 ".log, failed to fsync since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
×
370
               strerror(ERRNO));
371
      }
372
    }
373
    pWal = taosIterateRef(tsWal.refSetId, pWal->refId);
340,727✔
374
  }
375
}
104,434✔
376

377
static void *walThreadFunc(void *param) {
2,403✔
378
  setThreadName("wal");
2,403✔
379
  while (1) {
380
    walUpdateSeq();
104,434✔
381
    walFsyncAll();
104,434✔
382

383
    if (atomic_load_8(&tsWal.stop)) break;
104,434✔
384
  }
385

386
  return NULL;
2,403✔
387
}
388

389
static int32_t walCreateThread() {
2,403✔
390
  TdThreadAttr thAttr;
391
  (void)taosThreadAttrInit(&thAttr);
2,403✔
392
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
2,403✔
393
#ifdef TD_COMPACT_OS
394
    (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
395
#endif
396
  if (taosThreadCreate(&tsWal.thread, &thAttr, walThreadFunc, NULL) != 0) {
2,403!
397
    wError("failed to create wal thread since %s", strerror(ERRNO));
×
398

399
    TAOS_RETURN(TAOS_SYSTEM_ERROR(ERRNO));
×
400
  }
401

402
  (void)taosThreadAttrDestroy(&thAttr);
2,403✔
403
  wDebug("wal thread is launched, thread:0x%08" PRIx64, taosGetPthreadId(tsWal.thread));
2,403✔
404

405
  TAOS_RETURN(TSDB_CODE_SUCCESS);
2,403✔
406
}
407

408
static void walStopThread() {
2,403✔
409
  atomic_store_8(&tsWal.stop, 1);
2,403✔
410

411
  if (taosCheckPthreadValid(tsWal.thread)) {
2,403!
412
    (void)taosThreadJoin(tsWal.thread, NULL);
2,403✔
413
    taosThreadClear(&tsWal.thread);
2,403✔
414
  }
415

416
  wDebug("wal thread is stopped");
2,403✔
417
}
2,403✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc